1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.giraph.block_app.migration;
19
20 import java.io.DataInput;
21 import java.io.DataOutput;
22 import java.io.IOException;
23 import java.util.List;
24
25 import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi;
26 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
27 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30
31
32
33
34
35 @SuppressWarnings({"rawtypes", "unchecked"})
36 public class MigrationWorkerContext
37 extends DefaultImmutableClassesGiraphConfigurable
38 implements Writable {
39 private BlockWorkerContextApi api;
40 private List<Writable> receivedMessages;
41
42 public void setApi(BlockWorkerContextApi api) {
43 this.api = api;
44 this.setConf(api.getConf());
45 }
46
47 public void setReceivedMessages(List<Writable> receivedMessages) {
48 this.receivedMessages = receivedMessages;
49 }
50
51 public void preSuperstep() { }
52
53 public void postSuperstep() { }
54
55 @SuppressWarnings("deprecation")
56 public long getTotalNumVertices() {
57 return api.getTotalNumVertices();
58 }
59
60 @SuppressWarnings("deprecation")
61 public long getTotalNumEdges() {
62 return api.getTotalNumEdges();
63 }
64
65 @Override
66 public void readFields(DataInput in) throws IOException {
67 }
68
69 @Override
70 public void write(DataOutput out) throws IOException {
71 }
72
73 public final int getWorkerCount() {
74 return api.getWorkerCount();
75 }
76
77 public final int getMyWorkerIndex() {
78 return api.getMyWorkerIndex();
79 }
80
81 public final List<Writable> getAndClearMessagesFromOtherWorkers() {
82 List<Writable> ret = receivedMessages;
83 receivedMessages = null;
84 return ret;
85 }
86
87 public final void sendMessageToWorker(Writable message, int workerIndex) {
88 ((BlockWorkerContextSendApi<WritableComparable, Writable>) api)
89 .sendMessageToWorker(message, workerIndex);
90 }
91
92
93
94
95
96 public static class MigrationFullWorkerContext
97 extends MigrationWorkerContext {
98 public void preApplication()
99 throws InstantiationException, IllegalAccessException {
100 }
101
102 public void postApplication() { }
103 }
104 }