1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.master;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23
24 import org.apache.giraph.aggregators.Aggregator;
25 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
26 import org.apache.giraph.utils.ReflectionUtils;
27 import org.apache.giraph.utils.WritableUtils;
28 import org.apache.hadoop.io.Writable;
29
30
31
32
33
34
35 public class AggregatorBroadcast<A extends Writable>
36 extends DefaultImmutableClassesGiraphConfigurable
37 implements Writable {
38
39 private Class<? extends Aggregator<A>> aggregatorClass;
40
41 private A value;
42
43
44 public AggregatorBroadcast() {
45 }
46
47
48
49
50
51
52 public AggregatorBroadcast(
53 Class<? extends Aggregator<A>> aggregatorClass, A value) {
54 this.aggregatorClass = aggregatorClass;
55 this.value = value;
56 }
57
58 public A getValue() {
59 return value;
60 }
61
62 @Override
63 public void write(DataOutput out) throws IOException {
64 WritableUtils.writeClass(aggregatorClass, out);
65 value.write(out);
66 }
67
68 @Override
69 public void readFields(DataInput in) throws IOException {
70 aggregatorClass = WritableUtils.readClass(in);
71 value = ReflectionUtils.newInstance(aggregatorClass, getConf())
72 .createInitialValue();
73 value.readFields(in);
74 }
75 }