1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.combiner;
19
20 import org.apache.giraph.types.ops.DoubleTypeOps;
21 import org.apache.giraph.types.ops.FloatTypeOps;
22 import org.apache.giraph.types.ops.IntTypeOps;
23 import org.apache.giraph.types.ops.LongTypeOps;
24 import org.apache.giraph.types.ops.NumericTypeOps;
25 import org.apache.hadoop.io.DoubleWritable;
26 import org.apache.hadoop.io.FloatWritable;
27 import org.apache.hadoop.io.IntWritable;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.io.Writable;
30 import org.apache.hadoop.io.WritableComparable;
31
32
33
34
35
36
37 public class SumMessageCombiner<M extends Writable>
38 implements MessageCombiner<WritableComparable, M> {
39
40 public static final SumMessageCombiner<DoubleWritable> DOUBLE =
41 new SumMessageCombiner<>(DoubleTypeOps.INSTANCE);
42
43 public static final SumMessageCombiner<FloatWritable> FLOAT =
44 new SumMessageCombiner<>(FloatTypeOps.INSTANCE);
45
46 public static final SumMessageCombiner<LongWritable> LONG =
47 new SumMessageCombiner<>(LongTypeOps.INSTANCE);
48
49 public static final SumMessageCombiner<IntWritable> INT =
50 new SumMessageCombiner<>(IntTypeOps.INSTANCE);
51
52
53 private final NumericTypeOps<M> typeOps;
54
55
56
57
58
59 public SumMessageCombiner(NumericTypeOps<M> typeOps) {
60 this.typeOps = typeOps;
61 }
62
63 @Override
64 public void combine(
65 WritableComparable vertexIndex, M originalMessage, M messageToCombine) {
66 typeOps.plusInto(originalMessage, messageToCombine);
67 }
68
69 @Override
70 public M createInitialMessage() {
71 return typeOps.createZero();
72 }
73 }