1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.examples;
20
21 import org.apache.giraph.graph.BasicComputation;
22 import org.apache.giraph.graph.Vertex;
23 import org.apache.giraph.worker.DefaultWorkerContext;
24 import org.apache.hadoop.io.DoubleWritable;
25 import org.apache.hadoop.io.FloatWritable;
26 import org.apache.hadoop.io.LongWritable;
27
28 import java.io.IOException;
29 import java.util.concurrent.atomic.AtomicLong;
30
31
32
33
34
35 public class TestComputationStateComputation extends BasicComputation<
36 LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
37
38 public static final int NUM_COMPUTE_THREADS = 10;
39
40 public static final int NUM_VERTICES = 100;
41
42 public static final int NUM_PARTITIONS = 25;
43
44
45
46
47
48 private long counter;
49
50 @Override
51 public void compute(
52 Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
53 Iterable<DoubleWritable> messages) throws IOException {
54 counter++;
55 if (getSuperstep() > 5) {
56 vertex.voteToHalt();
57 }
58 }
59
60 @Override
61 public void preSuperstep() {
62 counter =
63 ((TestComputationStateWorkerContext) getWorkerContext()).superstepCounter;
64 }
65
66 @Override
67 public void postSuperstep() {
68 ((TestComputationStateWorkerContext) getWorkerContext()).totalCounter
69 .addAndGet(counter);
70 }
71
72
73
74
75 public static class TestComputationStateWorkerContext extends
76 DefaultWorkerContext {
77
78 private long superstepCounter;
79
80
81
82 private AtomicLong totalCounter;
83
84 @Override
85 public void preSuperstep() {
86 superstepCounter = getSuperstep();
87 totalCounter = new AtomicLong(0);
88 }
89
90 @Override
91 public void postSuperstep() {
92 assertEquals(totalCounter.get(),
93 NUM_COMPUTE_THREADS * superstepCounter + getTotalNumVertices());
94 }
95 }
96
97
98
99
100
101
102
103 private static void assertEquals(long expected, long actual) {
104 if (expected != actual) {
105 throw new RuntimeException("expected: " + expected +
106 ", actual: " + actual);
107 }
108 }
109 }