1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.partition; |
19 | |
|
20 | |
import com.google.common.collect.MapMaker; |
21 | |
import com.google.common.primitives.Ints; |
22 | |
import org.apache.giraph.edge.Edge; |
23 | |
import org.apache.giraph.graph.Vertex; |
24 | |
import org.apache.giraph.utils.UnsafeByteArrayInputStream; |
25 | |
import org.apache.giraph.utils.WritableUtils; |
26 | |
import org.apache.hadoop.io.Writable; |
27 | |
import org.apache.hadoop.io.WritableComparable; |
28 | |
import org.apache.hadoop.util.Progressable; |
29 | |
|
30 | |
import javax.annotation.concurrent.NotThreadSafe; |
31 | |
import java.io.DataInput; |
32 | |
import java.io.DataOutput; |
33 | |
import java.io.IOException; |
34 | |
import java.util.Iterator; |
35 | |
import java.util.Map; |
36 | |
import java.util.concurrent.ConcurrentMap; |
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | |
|
42 | |
|
43 | |
|
44 | |
|
45 | |
|
46 | |
|
47 | |
@NotThreadSafe |
48 | 0 | public class ByteArrayPartition<I extends WritableComparable, |
49 | |
V extends Writable, E extends Writable> |
50 | |
extends BasicPartition<I, V, E> |
51 | |
implements ReusesObjectsPartition<I, V, E> { |
52 | |
|
53 | |
|
54 | |
|
55 | |
|
56 | |
|
57 | |
private ConcurrentMap<I, byte[]> vertexMap; |
58 | |
|
59 | |
private Vertex<I, V, E> representativeVertex; |
60 | |
|
61 | |
private Vertex<I, V, E> representativeCombinerVertex; |
62 | |
|
63 | |
private boolean useUnsafeSerialization; |
64 | |
|
65 | |
|
66 | |
|
67 | |
|
68 | 0 | public ByteArrayPartition() { } |
69 | |
|
70 | |
@Override |
71 | |
public void initialize(int partitionId, Progressable progressable) { |
72 | 0 | super.initialize(partitionId, progressable); |
73 | 0 | vertexMap = new MapMaker().concurrencyLevel( |
74 | 0 | getConf().getNettyServerExecutionConcurrency()).makeMap(); |
75 | 0 | representativeVertex = getConf().createVertex(); |
76 | 0 | representativeVertex.initialize( |
77 | 0 | getConf().createVertexId(), |
78 | 0 | getConf().createVertexValue(), |
79 | 0 | getConf().createOutEdges()); |
80 | 0 | representativeCombinerVertex = getConf().createVertex(); |
81 | 0 | representativeCombinerVertex.initialize( |
82 | 0 | getConf().createVertexId(), |
83 | 0 | getConf().createVertexValue(), |
84 | 0 | getConf().createOutEdges()); |
85 | 0 | useUnsafeSerialization = getConf().useUnsafeSerialization(); |
86 | 0 | } |
87 | |
|
88 | |
@Override |
89 | |
public Vertex<I, V, E> getVertex(I vertexIndex) { |
90 | 0 | byte[] vertexData = vertexMap.get(vertexIndex); |
91 | 0 | if (vertexData == null) { |
92 | 0 | return null; |
93 | |
} |
94 | 0 | WritableUtils.reinitializeVertexFromByteArray( |
95 | 0 | vertexData, representativeVertex, useUnsafeSerialization, getConf()); |
96 | 0 | return representativeVertex; |
97 | |
} |
98 | |
|
99 | |
@Override |
100 | |
public Vertex<I, V, E> putVertex(Vertex<I, V, E> vertex) { |
101 | 0 | byte[] vertexData = |
102 | 0 | WritableUtils.writeVertexToByteArray( |
103 | 0 | vertex, useUnsafeSerialization, getConf()); |
104 | 0 | byte[] oldVertexBytes = vertexMap.put(vertex.getId(), vertexData); |
105 | 0 | if (oldVertexBytes == null) { |
106 | 0 | return null; |
107 | |
} else { |
108 | 0 | WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes, |
109 | 0 | representativeVertex, useUnsafeSerialization, getConf()); |
110 | 0 | return representativeVertex; |
111 | |
} |
112 | |
} |
113 | |
|
114 | |
@Override |
115 | |
public Vertex<I, V, E> removeVertex(I vertexIndex) { |
116 | 0 | byte[] vertexBytes = vertexMap.remove(vertexIndex); |
117 | 0 | if (vertexBytes == null) { |
118 | 0 | return null; |
119 | |
} |
120 | 0 | WritableUtils.reinitializeVertexFromByteArray(vertexBytes, |
121 | 0 | representativeVertex, useUnsafeSerialization, getConf()); |
122 | 0 | return representativeVertex; |
123 | |
} |
124 | |
|
125 | |
@Override |
126 | |
public void addPartition(Partition<I, V, E> partition) { |
127 | |
|
128 | 0 | if (!(partition instanceof ByteArrayPartition)) { |
129 | 0 | throw new IllegalStateException("addPartition: Cannot add partition " + |
130 | 0 | "of type " + partition.getClass()); |
131 | |
} |
132 | |
|
133 | 0 | ByteArrayPartition<I, V, E> byteArrayPartition = |
134 | |
(ByteArrayPartition<I, V, E>) partition; |
135 | |
for (Map.Entry<I, byte[]> entry : |
136 | 0 | byteArrayPartition.vertexMap.entrySet()) { |
137 | |
|
138 | 0 | byte[] oldVertexBytes = |
139 | 0 | vertexMap.putIfAbsent(entry.getKey(), entry.getValue()); |
140 | 0 | if (oldVertexBytes == null) { |
141 | 0 | continue; |
142 | |
} |
143 | |
|
144 | |
|
145 | |
|
146 | |
|
147 | |
|
148 | 0 | synchronized (this) { |
149 | |
|
150 | 0 | WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes, |
151 | 0 | representativeVertex, useUnsafeSerialization, getConf()); |
152 | 0 | WritableUtils.reinitializeVertexFromByteArray(entry.getValue(), |
153 | 0 | representativeCombinerVertex, useUnsafeSerialization, getConf()); |
154 | 0 | combine(representativeVertex, representativeCombinerVertex); |
155 | 0 | } |
156 | 0 | } |
157 | 0 | } |
158 | |
|
159 | |
@Override |
160 | |
public synchronized boolean putOrCombine(Vertex<I, V, E> vertex) { |
161 | |
|
162 | 0 | byte[] vertexData = |
163 | 0 | WritableUtils.writeVertexToByteArray( |
164 | 0 | vertex, useUnsafeSerialization, getConf()); |
165 | 0 | byte[] oldVertexBytes = vertexMap.putIfAbsent(vertex.getId(), vertexData); |
166 | 0 | if (oldVertexBytes == null) { |
167 | 0 | return true; |
168 | |
} |
169 | |
|
170 | 0 | WritableUtils.reinitializeVertexFromByteArray(oldVertexBytes, |
171 | 0 | representativeVertex, useUnsafeSerialization, getConf()); |
172 | 0 | combine(representativeVertex, vertex); |
173 | 0 | return false; |
174 | |
} |
175 | |
|
176 | |
|
177 | |
|
178 | |
|
179 | |
|
180 | |
|
181 | |
|
182 | |
|
183 | |
private void combine(Vertex<I, V, E> representativeVertex, |
184 | |
Vertex<I, V, E> representativeCombinerVertex) { |
185 | 0 | getVertexValueCombiner().combine(representativeVertex.getValue(), |
186 | 0 | representativeCombinerVertex.getValue()); |
187 | |
|
188 | 0 | for (Edge<I, E> edge : representativeCombinerVertex.getEdges()) { |
189 | 0 | representativeVertex.addEdge(edge); |
190 | 0 | } |
191 | 0 | vertexMap.put(representativeCombinerVertex.getId(), |
192 | 0 | WritableUtils.writeVertexToByteArray( |
193 | 0 | representativeVertex, useUnsafeSerialization, getConf())); |
194 | 0 | } |
195 | |
|
196 | |
@Override |
197 | |
public long getVertexCount() { |
198 | 0 | return vertexMap.size(); |
199 | |
} |
200 | |
|
201 | |
@Override |
202 | |
public long getEdgeCount() { |
203 | 0 | long edges = 0; |
204 | 0 | for (byte[] vertexBytes : vertexMap.values()) { |
205 | 0 | WritableUtils.reinitializeVertexFromByteArray(vertexBytes, |
206 | 0 | representativeVertex, useUnsafeSerialization, getConf()); |
207 | 0 | edges += representativeVertex.getNumEdges(); |
208 | 0 | } |
209 | 0 | return edges; |
210 | |
} |
211 | |
|
212 | |
@Override |
213 | |
public void saveVertex(Vertex<I, V, E> vertex) { |
214 | |
|
215 | 0 | byte[] oldVertexData = vertexMap.get(vertex.getId()); |
216 | 0 | if (oldVertexData != null) { |
217 | 0 | vertexMap.put(vertex.getId(), |
218 | 0 | WritableUtils.writeVertexToByteArray( |
219 | 0 | vertex, oldVertexData, useUnsafeSerialization, getConf())); |
220 | |
} else { |
221 | 0 | vertexMap.put(vertex.getId(), |
222 | 0 | WritableUtils.writeVertexToByteArray( |
223 | 0 | vertex, useUnsafeSerialization, getConf())); |
224 | |
} |
225 | 0 | } |
226 | |
|
227 | |
@Override |
228 | |
public void write(DataOutput output) throws IOException { |
229 | 0 | super.write(output); |
230 | 0 | output.writeInt(vertexMap.size()); |
231 | 0 | for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) { |
232 | 0 | progress(); |
233 | 0 | entry.getKey().write(output); |
234 | |
|
235 | |
|
236 | |
int vertexDataSize; |
237 | 0 | if (useUnsafeSerialization) { |
238 | 0 | vertexDataSize = UnsafeByteArrayInputStream.getInt(entry.getValue(), |
239 | |
0); |
240 | |
} else { |
241 | 0 | vertexDataSize = Ints.fromByteArray(entry.getValue()); |
242 | |
} |
243 | |
|
244 | 0 | output.writeInt(vertexDataSize); |
245 | 0 | output.write(entry.getValue(), 0, vertexDataSize); |
246 | 0 | } |
247 | 0 | } |
248 | |
|
249 | |
@Override |
250 | |
public void readFields(DataInput input) throws IOException { |
251 | 0 | super.readFields(input); |
252 | 0 | int size = input.readInt(); |
253 | 0 | vertexMap = new MapMaker().concurrencyLevel( |
254 | 0 | getConf().getNettyServerExecutionConcurrency()).initialCapacity( |
255 | 0 | size).makeMap(); |
256 | 0 | representativeVertex = getConf().createVertex(); |
257 | 0 | representativeVertex.initialize( |
258 | 0 | getConf().createVertexId(), |
259 | 0 | getConf().createVertexValue(), |
260 | 0 | getConf().createOutEdges()); |
261 | 0 | useUnsafeSerialization = getConf().useUnsafeSerialization(); |
262 | 0 | for (int i = 0; i < size; ++i) { |
263 | 0 | progress(); |
264 | 0 | I vertexId = getConf().createVertexId(); |
265 | 0 | vertexId.readFields(input); |
266 | 0 | int vertexDataSize = input.readInt(); |
267 | 0 | byte[] vertexData = new byte[vertexDataSize]; |
268 | 0 | input.readFully(vertexData); |
269 | 0 | if (vertexMap.put(vertexId, vertexData) != null) { |
270 | 0 | throw new IllegalStateException("readFields: Already saw vertex " + |
271 | |
vertexId); |
272 | |
} |
273 | |
} |
274 | 0 | } |
275 | |
|
276 | |
@Override |
277 | |
public Iterator<Vertex<I, V, E>> iterator() { |
278 | 0 | return new RepresentativeVertexIterator(); |
279 | |
} |
280 | |
|
281 | |
|
282 | |
|
283 | |
|
284 | |
|
285 | 0 | private class RepresentativeVertexIterator implements |
286 | |
Iterator<Vertex<I, V, E>> { |
287 | |
|
288 | 0 | private Iterator<byte[]> vertexDataIterator = |
289 | 0 | vertexMap.values().iterator(); |
290 | |
|
291 | |
@Override |
292 | |
public boolean hasNext() { |
293 | 0 | return vertexDataIterator.hasNext(); |
294 | |
} |
295 | |
|
296 | |
@Override |
297 | |
public Vertex<I, V, E> next() { |
298 | 0 | WritableUtils.reinitializeVertexFromByteArray( |
299 | 0 | vertexDataIterator.next(), representativeVertex, |
300 | 0 | useUnsafeSerialization, getConf()); |
301 | 0 | return representativeVertex; |
302 | |
} |
303 | |
|
304 | |
@Override |
305 | |
public void remove() { |
306 | 0 | throw new IllegalAccessError("remove: This method is not supported."); |
307 | |
} |
308 | |
} |
309 | |
} |