1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import org.apache.giraph.comm.ServerData;
22 import org.apache.giraph.io.InputType;
23
24 import java.io.DataInput;
25 import java.io.DataOutput;
26 import java.io.IOException;
27
28
29
30
31 public class ReplyWithInputSplitRequest extends WritableRequest
32 implements WorkerRequest {
33
34 private InputType splitType;
35
36 private byte[] serializedInputSplit;
37
38
39
40
41
42
43
44 public ReplyWithInputSplitRequest(InputType splitType,
45 byte[] serializedInputSplit) {
46 this.splitType = splitType;
47 this.serializedInputSplit = serializedInputSplit;
48 }
49
50
51
52
53 public ReplyWithInputSplitRequest() {
54 }
55
56 @Override
57 void readFieldsRequest(DataInput in) throws IOException {
58 splitType = InputType.values()[in.readInt()];
59 int size = in.readInt();
60 serializedInputSplit = new byte[size];
61 in.readFully(serializedInputSplit);
62 }
63
64 @Override
65 void writeRequest(DataOutput out) throws IOException {
66 out.writeInt(splitType.ordinal());
67 out.writeInt(serializedInputSplit.length);
68 out.write(serializedInputSplit);
69 }
70
71 @Override
72 public void doRequest(ServerData serverData) {
73 serverData.getServiceWorker().getInputSplitsHandler().receivedInputSplit(
74 splitType, serializedInputSplit);
75 }
76
77 @Override
78 public RequestType getType() {
79 return RequestType.REPLY_WITH_INPUT_SPLIT_REQUEST;
80 }
81 }