1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.writable.kryo;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.LinkedHashMap;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Random;
28
29 import com.esotericsoftware.kryo.util.DefaultClassResolver;
30 import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
31 import org.apache.giraph.conf.GiraphConfigurationSettable;
32 import com.esotericsoftware.kryo.ClassResolver;
33 import com.esotericsoftware.kryo.ReferenceResolver;
34 import com.esotericsoftware.kryo.util.MapReferenceResolver;
35 import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
36 import org.apache.giraph.types.ops.collections.BasicSet;
37 import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
38 import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
39 import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
40 import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
41 import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
42 import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
43 import org.apache.giraph.writable.kryo.serializers.ImmutableBiMapSerializerUtils;
44 import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
45 import org.apache.hadoop.conf.Configurable;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.io.Writable;
48 import org.apache.log4j.Logger;
49 import org.objenesis.strategy.StdInstantiatorStrategy;
50
51 import com.esotericsoftware.kryo.Kryo;
52 import com.esotericsoftware.kryo.Serializer;
53 import com.esotericsoftware.kryo.factories.SerializerFactory;
54 import com.esotericsoftware.kryo.io.Input;
55 import com.esotericsoftware.kryo.io.InputChunked;
56 import com.esotericsoftware.kryo.io.Output;
57 import com.esotericsoftware.kryo.io.OutputChunked;
58 import com.esotericsoftware.kryo.pool.KryoCallback;
59 import com.esotericsoftware.kryo.pool.KryoFactory;
60 import com.esotericsoftware.kryo.pool.KryoPool;
61 import com.esotericsoftware.kryo.serializers.ClosureSerializer;
62 import com.esotericsoftware.kryo.serializers.FieldSerializer;
63 import com.esotericsoftware.kryo.util.ObjectMap;
64 import com.google.common.base.Preconditions;
65
66 import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class HadoopKryo extends Kryo {
82
83 private static final KryoPool KRYO_POOL = new KryoPool.Builder(
84 new KryoFactory() {
85 @Override
86 public Kryo create() {
87 return createKryo(true, true);
88 }
89 }).build();
90
91 private static final ThreadLocal<HadoopKryo> KRYO =
92 new ThreadLocal<HadoopKryo>() {
93 @Override protected HadoopKryo initialValue() {
94 return createKryo(false, false);
95 }
96 };
97
98
99
100
101
102
103 private static final Map<Class<?>, String> NON_SERIALIZABLE;
104
105 static {
106 NON_SERIALIZABLE = new LinkedHashMap<>();
107 NON_SERIALIZABLE.put(
108 NonKryoWritable.class,
109 "it is marked to not allow serialization, " +
110 "look at the class for more details");
111 NON_SERIALIZABLE.put(
112 KryoWritableWrapper.class, "recursion is disallowed");
113 NON_SERIALIZABLE.put(
114 Configuration.class,
115 "it cannot be supported since it contains ClassLoader");
116 NON_SERIALIZABLE.put(
117 GiraphConfigurationSettable.class, "configuration cannot be set");
118 NON_SERIALIZABLE.put(
119 Configurable.class, "configuration cannot be set");
120 NON_SERIALIZABLE.put(
121 Random.class,
122 "it should be rarely serialized, since it would create same stream " +
123 "of numbers everywhere, use TransientRandom instead");
124 NON_SERIALIZABLE.put(
125 Logger.class,
126 "Logger must be a static field");
127 }
128
129
130 private InputChunked input;
131
132 private OutputChunked output;
133
134
135 private DataInputWrapperStream dataInputWrapperStream;
136
137 private DataOutputWrapperStream dataOutputWrapperStream;
138
139
140
141
142
143 private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
144 classToIntoSerializer = new ObjectMap<>();
145
146
147 private HadoopKryo() {
148 }
149
150
151
152
153
154
155 private HadoopKryo(ClassResolver classResolver,
156 ReferenceResolver referenceResolver) {
157 super(classResolver, referenceResolver);
158 }
159
160
161
162
163
164
165
166
167
168
169 public static void writeClassAndObj(
170 final DataOutput out, final Object object) {
171 writeInternal(out, object, false);
172 }
173
174
175
176
177
178
179
180
181
182
183 public static <T> T readClassAndObj(DataInput in) {
184 return readInternal(in, null, false);
185 }
186
187
188
189
190
191
192
193 public static void writeOutOfObject(
194 final DataOutput out, final Object object) {
195 writeInternal(out, object, true);
196 }
197
198
199
200
201
202
203
204
205
206 public static void readIntoObject(DataInput in, Object object) {
207 readInternal(in, object, true);
208 }
209
210
211
212
213
214
215
216
217 public static void writeWithKryo(
218 final HadoopKryo kryo, final Output out,
219 final Object object) {
220 kryo.writeClassAndObject(out, object);
221 out.close();
222 }
223
224
225
226
227
228
229
230 public static void writeWithKryoOutOfObject(
231 final HadoopKryo kryo, final Output out,
232 final Object object) {
233 kryo.writeOutOfObject(out, object);
234 out.close();
235 }
236
237
238
239
240
241
242
243
244
245
246 public static <T> T readWithKryo(
247 final HadoopKryo kryo, final Input in) {
248 T object;
249 object = (T) kryo.readClassAndObject(in);
250 in.close();
251 return object;
252 }
253
254
255
256
257
258
259
260 public static void readWithKryoIntoObject(
261 final HadoopKryo kryo, final Input in, Object object) {
262 kryo.readIntoObject(in, object);
263 in.close();
264 }
265
266
267
268
269
270
271
272
273
274 public static <T> T createCopy(final T object) {
275 return KRYO_POOL.run(new KryoCallback<T>() {
276 @Override
277 public T execute(Kryo kryo) {
278 return kryo.copy(object);
279 }
280 });
281 }
282
283
284
285
286
287
288
289
290
291
292
293
294 public static HadoopKryo getNontrackingKryo() {
295 return KRYO.get();
296 }
297
298
299
300
301
302
303
304
305
306 private static HadoopKryo createKryo(boolean trackReferences,
307 boolean hasBuffer) {
308 HadoopKryo kryo;
309 if (trackReferences) {
310 kryo = new HadoopKryo();
311 } else {
312
313
314
315 kryo = new HadoopKryo(
316 GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
317 new DefaultClassResolver(),
318 new MapReferenceResolver());
319 }
320
321 try {
322 kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
323 kryo.register(Class.forName(
324 "com.esotericsoftware.kryo.serializers.ClosureSerializer$Closure"),
325 new ClosureSerializer());
326 } catch (ClassNotFoundException e) {
327 throw new IllegalStateException(
328 "Trying to use Kryo on Java version " +
329 System.getProperty("java.version") +
330 ", but unable to find needed classes", e);
331 }
332
333 kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
334 kryo.register(Collections.nCopies(1, new Object()).getClass(),
335 new CollectionsNCopiesSerializer());
336
337 ImmutableListSerializer.registerSerializers(kryo);
338 ImmutableMapSerializer.registerSerializers(kryo);
339 ImmutableBiMapSerializerUtils.registerSerializers(kryo);
340
341
342
343 FastUtilSerializer.registerAll(kryo);
344
345 kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
346 new StdInstantiatorStrategy()));
347
348 SerializerFactory customSerializerFactory = new SerializerFactory() {
349 @SuppressWarnings("rawtypes")
350 @Override
351 public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
352 for (final Entry<Class<?>, String> entry :
353 NON_SERIALIZABLE.entrySet()) {
354 if (entry.getKey().isAssignableFrom(type)) {
355
356 return new Serializer() {
357 @Override
358 public Object read(Kryo kryo, Input input, Class type) {
359 throw new RuntimeException("Cannot serialize " + type +
360 ". Objects being serialized cannot capture " +
361 entry.getKey() + " because " + entry.getValue() +
362 ". Either remove field in question" +
363 ", or make it transient (so that it isn't serialized)");
364 }
365
366 @Override
367 public void write(Kryo kryo, Output output, Object object) {
368 throw new RuntimeException("Cannot serialize " + type +
369 ". Objects being serialized cannot capture " +
370 entry.getKey() + " because " + entry.getValue() +
371 ". Either remove field in question" +
372 ", or make it transient (so that it isn't serialized)");
373 }
374 };
375 }
376 }
377
378 if (Writable.class.isAssignableFrom(type) &&
379 !KryoIgnoreWritable.class.isAssignableFrom(type) &&
380
381
382 !BasicSet.class.isAssignableFrom(type) &&
383 !Basic2ObjectMap.class.isAssignableFrom(type)) {
384
385 DirectWritableSerializer serializer = new DirectWritableSerializer();
386 return serializer;
387 } else {
388 FieldSerializer serializer = new FieldSerializer<>(kryo, type);
389 serializer.setIgnoreSyntheticFields(false);
390 return serializer;
391 }
392 }
393 };
394
395 kryo.addDefaultSerializer(Writable.class, customSerializerFactory);
396 kryo.setDefaultSerializer(customSerializerFactory);
397
398 if (hasBuffer) {
399 kryo.input = new InputChunked(4096);
400 kryo.output = new OutputChunked(4096);
401 kryo.dataInputWrapperStream = new DataInputWrapperStream();
402 kryo.dataOutputWrapperStream = new DataOutputWrapperStream();
403 }
404
405 if (!trackReferences) {
406 kryo.setReferences(false);
407
408
409
410 if (GiraphClassResolver.isInitialized()) {
411 kryo.setAutoReset(false);
412 }
413 }
414 return kryo;
415 }
416
417
418
419
420
421
422
423
424 private static void registerSerializer(HadoopKryo kryo, String className,
425 Serializer serializer) {
426 try {
427 kryo.register(Class.forName(className), serializer);
428 } catch (ClassNotFoundException e) {
429 throw new IllegalStateException("Class " + className + " is missing", e);
430 }
431 }
432
433
434
435
436
437
438 private void setDataInput(DataInput in) {
439 dataInputWrapperStream.setDataInput(in);
440 input.setInputStream(dataInputWrapperStream);
441 }
442
443
444
445
446
447
448 private void setDataOutput(DataOutput out) {
449 dataOutputWrapperStream.setDataOutput(out);
450 output.setOutputStream(dataOutputWrapperStream);
451 }
452
453
454
455
456
457
458
459 private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
460 Class<?> type) {
461 ReusableFieldSerializer<Object> serializer =
462 classToIntoSerializer.get(type);
463 if (serializer == null) {
464 serializer = new ReusableFieldSerializer<>(this, type);
465 classToIntoSerializer.put(type, serializer);
466 }
467 return serializer;
468 }
469
470
471
472
473
474
475
476
477
478
479 private static void writeInternal(
480 final DataOutput out, final Object object, final boolean outOf) {
481 KRYO_POOL.run(new KryoCallback<Void>() {
482 @Override
483 public Void execute(Kryo kryo) {
484 HadoopKryo hkryo = (HadoopKryo) kryo;
485 hkryo.setDataOutput(out);
486
487 if (outOf) {
488 hkryo.writeOutOfObject(hkryo.output, object);
489 } else {
490 hkryo.writeClassAndObject(hkryo.output, object);
491 }
492
493 hkryo.output.endChunks();
494 hkryo.output.close();
495
496 return null;
497 }
498 });
499 }
500
501
502
503
504
505
506
507
508
509
510
511
512 @SuppressWarnings("unchecked")
513 private static <T> T readInternal(
514 final DataInput in, final T outObject, final boolean into) {
515 return KRYO_POOL.run(new KryoCallback<T>() {
516 @Override
517 public T execute(Kryo kryo) {
518 HadoopKryo hkryo = (HadoopKryo) kryo;
519 hkryo.setDataInput(in);
520
521 T object;
522 if (into) {
523 hkryo.readIntoObject(hkryo.input, outObject);
524 object = outObject;
525 } else {
526 object = (T) hkryo.readClassAndObject(hkryo.input);
527 }
528 hkryo.input.nextChunks();
529
530 hkryo.input.close();
531 return object;
532 }
533 });
534 }
535
536
537
538
539
540
541
542
543 private void readIntoObject(Input input, Object object) {
544 Preconditions.checkNotNull(object);
545
546 Class<?> type = object.getClass();
547 ReusableFieldSerializer<Object> serializer =
548 getOrCreateReusableSerializer(type);
549
550 serializer.setReadIntoObject(object);
551 Object result = readObject(input, type, serializer);
552
553 Preconditions.checkState(result == object);
554 }
555
556
557
558
559
560
561
562 private void writeOutOfObject(Output output, Object object) {
563 ReusableFieldSerializer<Object> serializer =
564 getOrCreateReusableSerializer(object.getClass());
565 writeObject(output, object, serializer);
566 }
567
568 }