1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.comm.ServerData;
22 import org.apache.hadoop.io.Text;
23 import org.apache.hadoop.io.Writable;
24 import org.apache.hadoop.io.WritableComparable;
25
26 import java.io.DataInput;
27 import java.io.DataOutput;
28 import java.io.IOException;
29
30
31 public class SendWorkerToWorkerMessageRequest extends WritableRequest
32 implements WorkerRequest<WritableComparable, Writable, Writable> {
33
34 private Writable message;
35
36
37
38
39 public SendWorkerToWorkerMessageRequest() {
40 }
41
42
43
44
45
46
47 public SendWorkerToWorkerMessageRequest(Writable message) {
48 this.message = message;
49 }
50
51 @Override
52 public RequestType getType() {
53 return RequestType.SEND_WORKER_TO_WORKER_MESSAGE_REQUEST;
54 }
55
56 @Override
57 void writeRequest(DataOutput output) throws IOException {
58 Text.writeString(output, message.getClass().getName());
59 message.write(output);
60 }
61
62 @Override
63 void readFieldsRequest(DataInput input) throws IOException {
64 String className = Text.readString(input);
65 try {
66 message = (Writable) Class.forName(className).newInstance();
67 message.readFields(input);
68 } catch (InstantiationException | IllegalAccessException |
69 ClassNotFoundException e) {
70 throw new IllegalStateException(
71 "readFieldsRequest: Exception occurred", e);
72 }
73 }
74
75 @Override
76 public void doRequest(
77 ServerData<WritableComparable, Writable, Writable> serverData) {
78 serverData.addIncomingWorkerToWorkerMessage(message);
79 }
80 }