1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.comm.ServerData;
22 import org.apache.giraph.edge.Edge;
23 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
24 import org.apache.giraph.utils.PairList;
25 import org.apache.giraph.utils.VertexIdEdges;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28
29
30
31
32
33
34
35 @SuppressWarnings("unchecked")
36 public class SendWorkerEdgesRequest<I extends WritableComparable,
37 E extends Writable>
38 extends SendWorkerDataRequest<I, Edge<I, E>,
39 VertexIdEdges<I, E>> {
40
41
42
43 public SendWorkerEdgesRequest() { }
44
45
46
47
48
49
50
51 public SendWorkerEdgesRequest(
52 PairList<Integer, VertexIdEdges<I, E>> partVertEdges) {
53 this.partitionVertexData = partVertEdges;
54 }
55
56 @Override
57 public VertexIdEdges<I, E> createVertexIdData() {
58 return new ByteArrayVertexIdEdges<>();
59 }
60
61 @Override
62 public RequestType getType() {
63 return RequestType.SEND_WORKER_EDGES_REQUEST;
64 }
65
66 @Override
67 public void doRequest(ServerData serverData) {
68 PairList<Integer, VertexIdEdges<I, E>>.Iterator
69 iterator = partitionVertexData.getIterator();
70 while (iterator.hasNext()) {
71 iterator.next();
72 serverData.getEdgeStore()
73 .addPartitionEdges(iterator.getCurrentFirst(),
74 iterator.getCurrentSecond());
75 }
76 }
77 }