1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.aggregators.DoubleSumAggregator;
22 import org.apache.giraph.aggregators.LongSumAggregator;
23 import org.apache.giraph.master.DefaultMasterCompute;
24 import org.apache.hadoop.io.DoubleWritable;
25 import org.apache.hadoop.io.LongWritable;
26 import org.apache.log4j.Logger;
27
28
29
30
31
32 public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
33
34
35 static final double CONVERGENCE_THRESHOLD = 0.00001;
36
37
38 private static final Logger LOG =
39 Logger.getLogger(RandomWalkVertexMasterCompute.class);
40
41 @Override
42 public void compute() {
43 double danglingContribution =
44 this.<DoubleWritable>getAggregatedValue(
45 RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
46 double cumulativeProbability =
47 this.<DoubleWritable>getAggregatedValue(
48 RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
49 double l1NormOfStateDiff =
50 this.<DoubleWritable>getAggregatedValue(
51 RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
52 long numDanglingVertices =
53 this.<LongWritable>getAggregatedValue(
54 RandomWalkComputation.NUM_DANGLING_VERTICES).get();
55
56 LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
57 danglingContribution + ", number of dangling vertices = " +
58 numDanglingVertices + ", cumulative probability = " +
59 cumulativeProbability + ", L1 Norm of state vector difference = " +
60 l1NormOfStateDiff);
61
62
63
64 if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
65 haltComputation();
66 }
67 }
68
69 @Override
70 public void initialize() throws InstantiationException,
71 IllegalAccessException {
72 registerAggregator(RandomWalkComputation.NUM_DANGLING_VERTICES,
73 LongSumAggregator.class);
74 registerAggregator(RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY,
75 DoubleSumAggregator.class);
76 registerAggregator(RandomWalkComputation.CUMULATIVE_PROBABILITY,
77 DoubleSumAggregator.class);
78 registerAggregator(RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE,
79 DoubleSumAggregator.class);
80 }
81 }