1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.comm.ServerData;
22 import org.apache.giraph.graph.VertexMutations;
23 import org.apache.giraph.metrics.GiraphMetrics;
24 import org.apache.giraph.metrics.MetricNames;
25 import org.apache.hadoop.io.Writable;
26 import org.apache.hadoop.io.WritableComparable;
27 import org.apache.log4j.Logger;
28
29 import com.google.common.collect.Maps;
30 import com.yammer.metrics.core.Histogram;
31
32 import java.io.DataInput;
33 import java.io.DataOutput;
34 import java.io.IOException;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ConcurrentMap;
39
40
41
42
43
44
45
46
47
48
49
50 @SuppressWarnings("rawtypes")
51 public class SendPartitionMutationsRequest<I extends WritableComparable,
52 V extends Writable, E extends Writable> extends
53 WritableRequest<I, V, E> implements WorkerRequest<I, V, E> {
54
55 private static final Logger LOG =
56 Logger.getLogger(SendPartitionMutationsRequest.class);
57
58 private int partitionId;
59
60 private Map<I, VertexMutations<I, V, E>> vertexIdMutations;
61
62
63
64
65 public SendPartitionMutationsRequest() { }
66
67
68
69
70
71
72
73 public SendPartitionMutationsRequest(
74 int partitionId,
75 Map<I, VertexMutations<I, V, E>> vertexIdMutations) {
76 this.partitionId = partitionId;
77 this.vertexIdMutations = vertexIdMutations;
78 }
79
80 @Override
81 public void readFieldsRequest(DataInput input) throws IOException {
82 partitionId = input.readInt();
83 int vertexIdMutationsSize = input.readInt();
84
85
86
87
88
89 vertexIdMutations = Maps.newConcurrentMap();
90 for (int i = 0; i < vertexIdMutationsSize; ++i) {
91 I vertexId = getConf().createVertexId();
92 vertexId.readFields(input);
93 VertexMutations<I, V, E> vertexMutations =
94 new VertexMutations<I, V, E>();
95 vertexMutations.setConf(getConf());
96 vertexMutations.readFields(input);
97 if (vertexIdMutations.put(vertexId, vertexMutations) != null) {
98 throw new IllegalStateException(
99 "readFields: Already has vertex id " + vertexId);
100 }
101 }
102 }
103
104 @Override
105 public void writeRequest(DataOutput output) throws IOException {
106 output.writeInt(partitionId);
107 output.writeInt(vertexIdMutations.size());
108 for (Entry<I, VertexMutations<I, V, E>> entry :
109 vertexIdMutations.entrySet()) {
110 entry.getKey().write(output);
111 entry.getValue().write(output);
112 }
113 }
114
115 @Override
116 public RequestType getType() {
117 return RequestType.SEND_PARTITION_MUTATIONS_REQUEST;
118 }
119
120 @Override
121 public void doRequest(ServerData<I, V, E> serverData) {
122 ConcurrentMap<Integer, ConcurrentMap<I, VertexMutations<I, V, E>>>
123 partitionMutations = serverData.getPartitionMutations();
124 Histogram verticesInMutationHist = GiraphMetrics.get().perSuperstep()
125 .getUniformHistogram(MetricNames.VERTICES_IN_MUTATION_REQUEST);
126 int mutationSize = 0;
127 for (Map<I, VertexMutations<I, V, E>> map : partitionMutations.values()) {
128 mutationSize += map.size();
129 }
130 verticesInMutationHist.update(mutationSize);
131
132
133
134
135
136
137 if (!(vertexIdMutations instanceof ConcurrentMap)) {
138 vertexIdMutations = new ConcurrentHashMap<>(vertexIdMutations);
139 }
140
141 ConcurrentMap<I, VertexMutations<I, V, E>> currentVertexIdMutations =
142 partitionMutations.putIfAbsent(partitionId,
143 (ConcurrentMap<I, VertexMutations<I, V, E>>) vertexIdMutations);
144
145 if (currentVertexIdMutations != null) {
146 for (Entry<I, VertexMutations<I, V, E>> entry : vertexIdMutations
147 .entrySet()) {
148 VertexMutations<I, V, E> mutations = currentVertexIdMutations
149 .putIfAbsent(entry.getKey(), entry.getValue());
150 if (mutations != null) {
151 synchronized (mutations) {
152 mutations.addVertexMutations(entry.getValue());
153 }
154 }
155 }
156 }
157 }
158
159 @Override
160 public int getSerializedSize() {
161 return WritableRequest.UNKNOWN_SIZE;
162 }
163 }