1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm;
20
21 import org.apache.giraph.bsp.CentralizedServiceWorker;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.edge.Edge;
24 import org.apache.giraph.utils.ByteArrayVertexIdEdges;
25 import org.apache.giraph.utils.PairList;
26 import org.apache.giraph.utils.VertexIdEdges;
27 import org.apache.giraph.worker.WorkerInfo;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30
31 import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_EDGE_REQUEST_SIZE;
32 import static org.apache.giraph.conf.GiraphConstants.MAX_EDGE_REQUEST_SIZE;
33
34
35
36
37
38
39
40
41 public class SendEdgeCache<I extends WritableComparable, E extends Writable>
42 extends SendVertexIdDataCache<I, Edge<I, E>, VertexIdEdges<I, E>> {
43
44
45
46
47
48
49 public SendEdgeCache(ImmutableClassesGiraphConfiguration conf,
50 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
51 super(conf, serviceWorker, MAX_EDGE_REQUEST_SIZE.get(conf),
52 ADDITIONAL_EDGE_REQUEST_SIZE.get(conf));
53 }
54
55 @Override
56 public VertexIdEdges<I, E> createVertexIdData() {
57 return new ByteArrayVertexIdEdges<I, E>();
58 }
59
60
61
62
63
64
65
66
67
68
69 public int addEdge(WorkerInfo workerInfo,
70 int partitionId, I destVertexId, Edge<I, E> edge) {
71 return addData(workerInfo, partitionId, destVertexId, edge);
72 }
73
74
75
76
77
78
79
80
81
82 public PairList<Integer, VertexIdEdges<I, E>>
83 removeWorkerEdges(WorkerInfo workerInfo) {
84 return removeWorkerData(workerInfo);
85 }
86
87
88
89
90
91
92 public PairList<WorkerInfo, PairList<Integer, VertexIdEdges<I, E>>>
93 removeAllEdges() {
94 return removeAllData();
95 }
96 }