1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.worker;
19
20 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
21 import org.apache.giraph.master.AggregatorBroadcast;
22 import org.apache.hadoop.io.Writable;
23 import org.apache.hadoop.io.WritableComparable;
24
25
26
27
28
29
30
31
32
33 public abstract class WorkerAggregatorDelegator<I extends WritableComparable,
34 V extends Writable, E extends Writable>
35 extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
36 implements WorkerAggregatorUsage, WorkerGlobalCommUsage {
37
38
39 private WorkerGlobalCommUsage workerGlobalCommUsage;
40
41
42
43
44
45
46 public void setWorkerGlobalCommUsage(
47 WorkerGlobalCommUsage workerGlobalCommUsage) {
48 this.workerGlobalCommUsage = workerGlobalCommUsage;
49 }
50
51 @Override
52 public final void reduce(String name, Object value) {
53 workerGlobalCommUsage.reduce(name, value);
54 }
55
56 @Override
57 public void reduceMerge(String name, Writable value) {
58 workerGlobalCommUsage.reduceMerge(name, value);
59 }
60
61 @Override
62 public final <B extends Writable> B getBroadcast(String name) {
63 return workerGlobalCommUsage.getBroadcast(name);
64 }
65
66 @Override
67 public final <A extends Writable> void aggregate(String name, A value) {
68 reduce(name, value);
69 }
70
71 @Override
72 public <A extends Writable> A getAggregatedValue(String name) {
73 AggregatorBroadcast<A> broadcast = workerGlobalCommUsage.getBroadcast(name);
74 return broadcast.getValue();
75 }
76 }