1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.combiner;
19
20 import org.apache.giraph.types.ops.DoubleTypeOps;
21 import org.apache.giraph.types.ops.FloatTypeOps;
22 import org.apache.giraph.types.ops.IntTypeOps;
23 import org.apache.giraph.types.ops.LongTypeOps;
24 import org.apache.giraph.types.ops.NumericTypeOps;
25 import org.apache.hadoop.io.DoubleWritable;
26 import org.apache.hadoop.io.FloatWritable;
27 import org.apache.hadoop.io.IntWritable;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.io.WritableComparable;
30
31
32
33
34
35
36 public class MaxMessageCombiner<M extends WritableComparable>
37 implements MessageCombiner<WritableComparable, M> {
38
39 public static final MaxMessageCombiner<DoubleWritable> DOUBLE =
40 new MaxMessageCombiner<>(DoubleTypeOps.INSTANCE);
41
42 public static final MaxMessageCombiner<FloatWritable> FLOAT =
43 new MaxMessageCombiner<>(FloatTypeOps.INSTANCE);
44
45 public static final MaxMessageCombiner<LongWritable> LONG =
46 new MaxMessageCombiner<>(LongTypeOps.INSTANCE);
47
48 public static final MaxMessageCombiner<IntWritable> INT =
49 new MaxMessageCombiner<>(IntTypeOps.INSTANCE);
50
51
52 private final NumericTypeOps<M> typeOps;
53
54
55
56
57
58 public MaxMessageCombiner(NumericTypeOps<M> typeOps) {
59 this.typeOps = typeOps;
60 }
61
62 @Override
63 public void combine(
64 WritableComparable vertexIndex, M originalMessage, M messageToCombine) {
65 if (originalMessage.compareTo(messageToCombine) < 0) {
66 typeOps.set(originalMessage, messageToCombine);
67 }
68 }
69
70 @Override
71 public M createInitialMessage() {
72 return typeOps.createZero();
73 }
74 }