1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
22 import org.apache.giraph.graph.BasicComputation;
23 import org.apache.giraph.master.DefaultMasterCompute;
24 import org.apache.giraph.graph.Vertex;
25 import org.apache.giraph.worker.WorkerContext;
26 import org.apache.hadoop.io.DoubleWritable;
27 import org.apache.hadoop.io.FloatWritable;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.log4j.Logger;
30
31 import java.io.IOException;
32
33
34
35
36
37 public class SimpleMasterComputeComputation extends BasicComputation<
38 LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
39
40 public static final String SMC_AGG = "simplemastercompute.aggregator";
41
42 private static final Logger LOG =
43 Logger.getLogger(SimpleMasterComputeComputation.class);
44
45 @Override
46 public void compute(
47 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
48 Iterable<DoubleWritable> messages) throws IOException {
49 double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
50 double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
51 double newSum = oldSum + newValue;
52 vertex.setValue(new DoubleWritable(newSum));
53 SimpleMasterComputeWorkerContext workerContext = getWorkerContext();
54 workerContext.setFinalSum(newSum);
55 LOG.info("Current sum: " + newSum);
56 }
57
58
59
60
61 public static class SimpleMasterComputeWorkerContext
62 extends WorkerContext {
63
64 private static double FINAL_SUM;
65
66 @Override
67 public void preApplication()
68 throws InstantiationException, IllegalAccessException {
69 }
70
71 @Override
72 public void preSuperstep() {
73 }
74
75 @Override
76 public void postSuperstep() {
77 }
78
79 @Override
80 public void postApplication() {
81 }
82
83 public static void setFinalSum(double sum) {
84 FINAL_SUM = sum;
85 }
86
87 public static double getFinalSum() {
88 return FINAL_SUM;
89 }
90 }
91
92
93
94
95 public static class SimpleMasterCompute
96 extends DefaultMasterCompute {
97 @Override
98 public void compute() {
99 setAggregatedValue(SMC_AGG,
100 new DoubleWritable(((double) getSuperstep()) / 2 + 1));
101 if (getSuperstep() == 10) {
102 haltComputation();
103 }
104 }
105
106 @Override
107 public void initialize() throws InstantiationException,
108 IllegalAccessException {
109 registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
110 }
111 }
112 }