1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
package org.apache.giraph.writable.kryo; |
19 | |
|
20 | |
import java.io.DataInput; |
21 | |
import java.io.DataOutput; |
22 | |
import java.util.Arrays; |
23 | |
import java.util.Collections; |
24 | |
import java.util.LinkedHashMap; |
25 | |
import java.util.Map; |
26 | |
import java.util.Map.Entry; |
27 | |
import java.util.Random; |
28 | |
|
29 | |
import com.esotericsoftware.kryo.util.DefaultClassResolver; |
30 | |
import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer; |
31 | |
import org.apache.giraph.conf.GiraphConfigurationSettable; |
32 | |
import com.esotericsoftware.kryo.ClassResolver; |
33 | |
import com.esotericsoftware.kryo.ReferenceResolver; |
34 | |
import com.esotericsoftware.kryo.util.MapReferenceResolver; |
35 | |
import org.apache.giraph.types.ops.collections.Basic2ObjectMap; |
36 | |
import org.apache.giraph.types.ops.collections.BasicSet; |
37 | |
import org.apache.giraph.writable.kryo.markers.NonKryoWritable; |
38 | |
import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable; |
39 | |
import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer; |
40 | |
import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer; |
41 | |
import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer; |
42 | |
import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer; |
43 | |
import org.apache.giraph.writable.kryo.serializers.ImmutableBiMapSerializerUtils; |
44 | |
import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer; |
45 | |
import org.apache.hadoop.conf.Configurable; |
46 | |
import org.apache.hadoop.conf.Configuration; |
47 | |
import org.apache.hadoop.io.Writable; |
48 | |
import org.apache.log4j.Logger; |
49 | |
import org.objenesis.strategy.StdInstantiatorStrategy; |
50 | |
|
51 | |
import com.esotericsoftware.kryo.Kryo; |
52 | |
import com.esotericsoftware.kryo.Serializer; |
53 | |
import com.esotericsoftware.kryo.factories.SerializerFactory; |
54 | |
import com.esotericsoftware.kryo.io.Input; |
55 | |
import com.esotericsoftware.kryo.io.InputChunked; |
56 | |
import com.esotericsoftware.kryo.io.Output; |
57 | |
import com.esotericsoftware.kryo.io.OutputChunked; |
58 | |
import com.esotericsoftware.kryo.pool.KryoCallback; |
59 | |
import com.esotericsoftware.kryo.pool.KryoFactory; |
60 | |
import com.esotericsoftware.kryo.pool.KryoPool; |
61 | |
import com.esotericsoftware.kryo.serializers.ClosureSerializer; |
62 | |
import com.esotericsoftware.kryo.serializers.FieldSerializer; |
63 | |
import com.esotericsoftware.kryo.util.ObjectMap; |
64 | |
import com.google.common.base.Preconditions; |
65 | |
|
66 | |
import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; |
67 | |
|
68 | |
|
69 | |
|
70 | |
|
71 | |
|
72 | |
|
73 | |
|
74 | |
|
75 | |
|
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | 0 | public class HadoopKryo extends Kryo { |
82 | |
|
83 | 0 | private static final KryoPool KRYO_POOL = new KryoPool.Builder( |
84 | 0 | new KryoFactory() { |
85 | |
@Override |
86 | |
public Kryo create() { |
87 | 0 | return createKryo(true, true); |
88 | |
} |
89 | 0 | }).build(); |
90 | |
|
91 | 0 | private static final ThreadLocal<HadoopKryo> KRYO = |
92 | 0 | new ThreadLocal<HadoopKryo>() { |
93 | |
@Override protected HadoopKryo initialValue() { |
94 | 0 | return createKryo(false, false); |
95 | |
} |
96 | |
}; |
97 | |
|
98 | |
|
99 | |
|
100 | |
|
101 | |
|
102 | |
|
103 | |
private static final Map<Class<?>, String> NON_SERIALIZABLE; |
104 | |
|
105 | |
static { |
106 | 0 | NON_SERIALIZABLE = new LinkedHashMap<>(); |
107 | 0 | NON_SERIALIZABLE.put( |
108 | |
NonKryoWritable.class, |
109 | |
"it is marked to not allow serialization, " + |
110 | |
"look at the class for more details"); |
111 | 0 | NON_SERIALIZABLE.put( |
112 | |
KryoWritableWrapper.class, "recursion is disallowed"); |
113 | 0 | NON_SERIALIZABLE.put( |
114 | |
Configuration.class, |
115 | |
"it cannot be supported since it contains ClassLoader"); |
116 | 0 | NON_SERIALIZABLE.put( |
117 | |
GiraphConfigurationSettable.class, "configuration cannot be set"); |
118 | 0 | NON_SERIALIZABLE.put( |
119 | |
Configurable.class, "configuration cannot be set"); |
120 | 0 | NON_SERIALIZABLE.put( |
121 | |
Random.class, |
122 | |
"it should be rarely serialized, since it would create same stream " + |
123 | |
"of numbers everywhere, use TransientRandom instead"); |
124 | 0 | NON_SERIALIZABLE.put( |
125 | |
Logger.class, |
126 | |
"Logger must be a static field"); |
127 | 0 | } |
128 | |
|
129 | |
|
130 | |
private InputChunked input; |
131 | |
|
132 | |
private OutputChunked output; |
133 | |
|
134 | |
|
135 | |
private DataInputWrapperStream dataInputWrapperStream; |
136 | |
|
137 | |
private DataOutputWrapperStream dataOutputWrapperStream; |
138 | |
|
139 | |
|
140 | |
|
141 | |
|
142 | |
|
143 | 0 | private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>> |
144 | |
classToIntoSerializer = new ObjectMap<>(); |
145 | |
|
146 | |
|
147 | 0 | private HadoopKryo() { |
148 | 0 | } |
149 | |
|
150 | |
|
151 | |
|
152 | |
|
153 | |
|
154 | |
|
155 | |
private HadoopKryo(ClassResolver classResolver, |
156 | |
ReferenceResolver referenceResolver) { |
157 | 0 | super(classResolver, referenceResolver); |
158 | 0 | } |
159 | |
|
160 | |
|
161 | |
|
162 | |
|
163 | |
|
164 | |
|
165 | |
|
166 | |
|
167 | |
|
168 | |
|
169 | |
public static void writeClassAndObj( |
170 | |
final DataOutput out, final Object object) { |
171 | 0 | writeInternal(out, object, false); |
172 | 0 | } |
173 | |
|
174 | |
|
175 | |
|
176 | |
|
177 | |
|
178 | |
|
179 | |
|
180 | |
|
181 | |
|
182 | |
|
183 | |
public static <T> T readClassAndObj(DataInput in) { |
184 | 0 | return readInternal(in, null, false); |
185 | |
} |
186 | |
|
187 | |
|
188 | |
|
189 | |
|
190 | |
|
191 | |
|
192 | |
|
193 | |
public static void writeOutOfObject( |
194 | |
final DataOutput out, final Object object) { |
195 | 0 | writeInternal(out, object, true); |
196 | 0 | } |
197 | |
|
198 | |
|
199 | |
|
200 | |
|
201 | |
|
202 | |
|
203 | |
|
204 | |
|
205 | |
|
206 | |
public static void readIntoObject(DataInput in, Object object) { |
207 | 0 | readInternal(in, object, true); |
208 | 0 | } |
209 | |
|
210 | |
|
211 | |
|
212 | |
|
213 | |
|
214 | |
|
215 | |
|
216 | |
|
217 | |
public static void writeWithKryo( |
218 | |
final HadoopKryo kryo, final Output out, |
219 | |
final Object object) { |
220 | 0 | kryo.writeClassAndObject(out, object); |
221 | 0 | out.close(); |
222 | 0 | } |
223 | |
|
224 | |
|
225 | |
|
226 | |
|
227 | |
|
228 | |
|
229 | |
|
230 | |
public static void writeWithKryoOutOfObject( |
231 | |
final HadoopKryo kryo, final Output out, |
232 | |
final Object object) { |
233 | 0 | kryo.writeOutOfObject(out, object); |
234 | 0 | out.close(); |
235 | 0 | } |
236 | |
|
237 | |
|
238 | |
|
239 | |
|
240 | |
|
241 | |
|
242 | |
|
243 | |
|
244 | |
|
245 | |
|
246 | |
public static <T> T readWithKryo( |
247 | |
final HadoopKryo kryo, final Input in) { |
248 | |
T object; |
249 | 0 | object = (T) kryo.readClassAndObject(in); |
250 | 0 | in.close(); |
251 | 0 | return object; |
252 | |
} |
253 | |
|
254 | |
|
255 | |
|
256 | |
|
257 | |
|
258 | |
|
259 | |
|
260 | |
public static void readWithKryoIntoObject( |
261 | |
final HadoopKryo kryo, final Input in, Object object) { |
262 | 0 | kryo.readIntoObject(in, object); |
263 | 0 | in.close(); |
264 | 0 | } |
265 | |
|
266 | |
|
267 | |
|
268 | |
|
269 | |
|
270 | |
|
271 | |
|
272 | |
|
273 | |
|
274 | |
public static <T> T createCopy(final T object) { |
275 | 0 | return KRYO_POOL.run(new KryoCallback<T>() { |
276 | |
@Override |
277 | |
public T execute(Kryo kryo) { |
278 | 0 | return kryo.copy(object); |
279 | |
} |
280 | |
}); |
281 | |
} |
282 | |
|
283 | |
|
284 | |
|
285 | |
|
286 | |
|
287 | |
|
288 | |
|
289 | |
|
290 | |
|
291 | |
|
292 | |
|
293 | |
|
294 | |
public static HadoopKryo getNontrackingKryo() { |
295 | 0 | return KRYO.get(); |
296 | |
} |
297 | |
|
298 | |
|
299 | |
|
300 | |
|
301 | |
|
302 | |
|
303 | |
|
304 | |
|
305 | |
|
306 | |
private static HadoopKryo createKryo(boolean trackReferences, |
307 | |
boolean hasBuffer) { |
308 | |
HadoopKryo kryo; |
309 | 0 | if (trackReferences) { |
310 | 0 | kryo = new HadoopKryo(); |
311 | |
} else { |
312 | |
|
313 | |
|
314 | |
|
315 | 0 | kryo = new HadoopKryo( |
316 | 0 | GiraphClassResolver.isInitialized() ? new GiraphClassResolver() : |
317 | |
new DefaultClassResolver(), |
318 | |
new MapReferenceResolver()); |
319 | |
} |
320 | |
|
321 | |
try { |
322 | 0 | kryo.register(Class.forName("java.lang.invoke.SerializedLambda")); |
323 | 0 | kryo.register(Class.forName( |
324 | |
"com.esotericsoftware.kryo.serializers.ClosureSerializer$Closure"), |
325 | |
new ClosureSerializer()); |
326 | 0 | } catch (ClassNotFoundException e) { |
327 | 0 | throw new IllegalStateException( |
328 | |
"Trying to use Kryo on Java version " + |
329 | 0 | System.getProperty("java.version") + |
330 | |
", but unable to find needed classes", e); |
331 | 0 | } |
332 | |
|
333 | 0 | kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer()); |
334 | 0 | kryo.register(Collections.nCopies(1, new Object()).getClass(), |
335 | |
new CollectionsNCopiesSerializer()); |
336 | |
|
337 | 0 | ImmutableListSerializer.registerSerializers(kryo); |
338 | 0 | ImmutableMapSerializer.registerSerializers(kryo); |
339 | 0 | ImmutableBiMapSerializerUtils.registerSerializers(kryo); |
340 | |
|
341 | |
|
342 | |
|
343 | 0 | FastUtilSerializer.registerAll(kryo); |
344 | |
|
345 | 0 | kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy( |
346 | |
new StdInstantiatorStrategy())); |
347 | |
|
348 | 0 | SerializerFactory customSerializerFactory = new SerializerFactory() { |
349 | |
@SuppressWarnings("rawtypes") |
350 | |
@Override |
351 | |
public Serializer makeSerializer(Kryo kryo, final Class<?> type) { |
352 | |
for (final Entry<Class<?>, String> entry : |
353 | 0 | NON_SERIALIZABLE.entrySet()) { |
354 | 0 | if (entry.getKey().isAssignableFrom(type)) { |
355 | |
|
356 | 0 | return new Serializer() { |
357 | |
@Override |
358 | |
public Object read(Kryo kryo, Input input, Class type) { |
359 | 0 | throw new RuntimeException("Cannot serialize " + type + |
360 | |
". Objects being serialized cannot capture " + |
361 | 0 | entry.getKey() + " because " + entry.getValue() + |
362 | |
". Either remove field in question" + |
363 | |
", or make it transient (so that it isn't serialized)"); |
364 | |
} |
365 | |
|
366 | |
@Override |
367 | |
public void write(Kryo kryo, Output output, Object object) { |
368 | 0 | throw new RuntimeException("Cannot serialize " + type + |
369 | |
". Objects being serialized cannot capture " + |
370 | 0 | entry.getKey() + " because " + entry.getValue() + |
371 | |
". Either remove field in question" + |
372 | |
", or make it transient (so that it isn't serialized)"); |
373 | |
} |
374 | |
}; |
375 | |
} |
376 | 0 | } |
377 | |
|
378 | 0 | if (Writable.class.isAssignableFrom(type) && |
379 | 0 | !KryoIgnoreWritable.class.isAssignableFrom(type) && |
380 | |
|
381 | |
|
382 | 0 | !BasicSet.class.isAssignableFrom(type) && |
383 | 0 | !Basic2ObjectMap.class.isAssignableFrom(type)) { |
384 | |
|
385 | 0 | DirectWritableSerializer serializer = new DirectWritableSerializer(); |
386 | 0 | return serializer; |
387 | |
} else { |
388 | 0 | FieldSerializer serializer = new FieldSerializer<>(kryo, type); |
389 | 0 | serializer.setIgnoreSyntheticFields(false); |
390 | 0 | return serializer; |
391 | |
} |
392 | |
} |
393 | |
}; |
394 | |
|
395 | 0 | kryo.addDefaultSerializer(Writable.class, customSerializerFactory); |
396 | 0 | kryo.setDefaultSerializer(customSerializerFactory); |
397 | |
|
398 | 0 | if (hasBuffer) { |
399 | 0 | kryo.input = new InputChunked(4096); |
400 | 0 | kryo.output = new OutputChunked(4096); |
401 | 0 | kryo.dataInputWrapperStream = new DataInputWrapperStream(); |
402 | 0 | kryo.dataOutputWrapperStream = new DataOutputWrapperStream(); |
403 | |
} |
404 | |
|
405 | 0 | if (!trackReferences) { |
406 | 0 | kryo.setReferences(false); |
407 | |
|
408 | |
|
409 | |
|
410 | 0 | if (GiraphClassResolver.isInitialized()) { |
411 | 0 | kryo.setAutoReset(false); |
412 | |
} |
413 | |
} |
414 | 0 | return kryo; |
415 | |
} |
416 | |
|
417 | |
|
418 | |
|
419 | |
|
420 | |
|
421 | |
|
422 | |
|
423 | |
|
424 | |
private static void registerSerializer(HadoopKryo kryo, String className, |
425 | |
Serializer serializer) { |
426 | |
try { |
427 | 0 | kryo.register(Class.forName(className), serializer); |
428 | 0 | } catch (ClassNotFoundException e) { |
429 | 0 | throw new IllegalStateException("Class " + className + " is missing", e); |
430 | 0 | } |
431 | 0 | } |
432 | |
|
433 | |
|
434 | |
|
435 | |
|
436 | |
|
437 | |
|
438 | |
private void setDataInput(DataInput in) { |
439 | 0 | dataInputWrapperStream.setDataInput(in); |
440 | 0 | input.setInputStream(dataInputWrapperStream); |
441 | 0 | } |
442 | |
|
443 | |
|
444 | |
|
445 | |
|
446 | |
|
447 | |
|
448 | |
private void setDataOutput(DataOutput out) { |
449 | 0 | dataOutputWrapperStream.setDataOutput(out); |
450 | 0 | output.setOutputStream(dataOutputWrapperStream); |
451 | 0 | } |
452 | |
|
453 | |
|
454 | |
|
455 | |
|
456 | |
|
457 | |
|
458 | |
|
459 | |
private ReusableFieldSerializer<Object> getOrCreateReusableSerializer( |
460 | |
Class<?> type) { |
461 | 0 | ReusableFieldSerializer<Object> serializer = |
462 | 0 | classToIntoSerializer.get(type); |
463 | 0 | if (serializer == null) { |
464 | 0 | serializer = new ReusableFieldSerializer<>(this, type); |
465 | 0 | classToIntoSerializer.put(type, serializer); |
466 | |
} |
467 | 0 | return serializer; |
468 | |
} |
469 | |
|
470 | |
|
471 | |
|
472 | |
|
473 | |
|
474 | |
|
475 | |
|
476 | |
|
477 | |
|
478 | |
|
479 | |
private static void writeInternal( |
480 | |
final DataOutput out, final Object object, final boolean outOf) { |
481 | 0 | KRYO_POOL.run(new KryoCallback<Void>() { |
482 | |
@Override |
483 | |
public Void execute(Kryo kryo) { |
484 | 0 | HadoopKryo hkryo = (HadoopKryo) kryo; |
485 | 0 | hkryo.setDataOutput(out); |
486 | |
|
487 | 0 | if (outOf) { |
488 | 0 | hkryo.writeOutOfObject(hkryo.output, object); |
489 | |
} else { |
490 | 0 | hkryo.writeClassAndObject(hkryo.output, object); |
491 | |
} |
492 | |
|
493 | 0 | hkryo.output.endChunks(); |
494 | 0 | hkryo.output.close(); |
495 | |
|
496 | 0 | return null; |
497 | |
} |
498 | |
}); |
499 | 0 | } |
500 | |
|
501 | |
|
502 | |
|
503 | |
|
504 | |
|
505 | |
|
506 | |
|
507 | |
|
508 | |
|
509 | |
|
510 | |
|
511 | |
|
512 | |
@SuppressWarnings("unchecked") |
513 | |
private static <T> T readInternal( |
514 | |
final DataInput in, final T outObject, final boolean into) { |
515 | 0 | return KRYO_POOL.run(new KryoCallback<T>() { |
516 | |
@Override |
517 | |
public T execute(Kryo kryo) { |
518 | 0 | HadoopKryo hkryo = (HadoopKryo) kryo; |
519 | 0 | hkryo.setDataInput(in); |
520 | |
|
521 | |
T object; |
522 | 0 | if (into) { |
523 | 0 | hkryo.readIntoObject(hkryo.input, outObject); |
524 | 0 | object = outObject; |
525 | |
} else { |
526 | 0 | object = (T) hkryo.readClassAndObject(hkryo.input); |
527 | |
} |
528 | 0 | hkryo.input.nextChunks(); |
529 | |
|
530 | 0 | hkryo.input.close(); |
531 | 0 | return object; |
532 | |
} |
533 | |
}); |
534 | |
} |
535 | |
|
536 | |
|
537 | |
|
538 | |
|
539 | |
|
540 | |
|
541 | |
|
542 | |
|
543 | |
private void readIntoObject(Input input, Object object) { |
544 | 0 | Preconditions.checkNotNull(object); |
545 | |
|
546 | 0 | Class<?> type = object.getClass(); |
547 | 0 | ReusableFieldSerializer<Object> serializer = |
548 | 0 | getOrCreateReusableSerializer(type); |
549 | |
|
550 | 0 | serializer.setReadIntoObject(object); |
551 | 0 | Object result = readObject(input, type, serializer); |
552 | |
|
553 | 0 | Preconditions.checkState(result == object); |
554 | 0 | } |
555 | |
|
556 | |
|
557 | |
|
558 | |
|
559 | |
|
560 | |
|
561 | |
|
562 | |
private void writeOutOfObject(Output output, Object object) { |
563 | 0 | ReusableFieldSerializer<Object> serializer = |
564 | 0 | getOrCreateReusableSerializer(object.getClass()); |
565 | 0 | writeObject(output, object, serializer); |
566 | 0 | } |
567 | |
|
568 | |
} |