1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.comm;
19
20 import org.apache.giraph.bsp.CentralizedServiceWorker;
21 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.giraph.partition.PartitionOwner;
24 import org.apache.giraph.utils.ExtendedDataOutput;
25 import org.apache.giraph.utils.WritableUtils;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28 import org.apache.log4j.Logger;
29
30 import java.io.IOException;
31
32 import static org.apache.giraph.conf.GiraphConstants.ADDITIONAL_VERTEX_REQUEST_SIZE;
33 import static org.apache.giraph.conf.GiraphConstants.MAX_VERTEX_REQUEST_SIZE;
34
35
36
37
38
39
40
41
42
43 public class SendPartitionCache<I extends WritableComparable,
44 V extends Writable, E extends Writable> extends
45 SendDataCache<ExtendedDataOutput> {
46
47 private static final Logger LOG =
48 Logger.getLogger(SendPartitionCache.class);
49
50
51
52
53
54
55
56 public SendPartitionCache(ImmutableClassesGiraphConfiguration<I, V, E> conf,
57 CentralizedServiceWorker<?, ?, ?> serviceWorker) {
58 super(conf, serviceWorker, MAX_VERTEX_REQUEST_SIZE.get(conf),
59 ADDITIONAL_VERTEX_REQUEST_SIZE.get(conf));
60 }
61
62
63
64
65
66
67
68
69 public int addVertex(PartitionOwner partitionOwner,
70 Vertex<I, V, E> vertex) {
71
72 ExtendedDataOutput partitionData =
73 getData(partitionOwner.getPartitionId());
74 int taskId = partitionOwner.getWorkerInfo().getTaskId();
75 int originalSize = 0;
76 if (partitionData == null) {
77 partitionData = getConf().createExtendedDataOutput(
78 getInitialBufferSize(taskId));
79 setData(partitionOwner.getPartitionId(), partitionData);
80 } else {
81 originalSize = partitionData.getPos();
82 }
83 try {
84 WritableUtils.<I, V, E>writeVertexToDataOutput(
85 partitionData, vertex, getConf());
86 } catch (IOException e) {
87 throw new IllegalStateException("addVertex: Failed to serialize", e);
88 }
89
90
91 return incrDataSize(taskId, partitionData.getPos() - originalSize);
92 }
93 }
94