1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.comm.GlobalCommType;
24 import org.apache.giraph.comm.ServerData;
25 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
26 import org.apache.giraph.reducers.ReduceOperation;
27 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
28 import org.apache.giraph.utils.UnsafeByteArrayOutputStream;
29 import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
30 import org.apache.giraph.utils.WritableUtils;
31 import org.apache.hadoop.io.LongWritable;
32 import org.apache.hadoop.io.Writable;
33
34
35
36
37
38 public class SendAggregatorsToOwnerRequest
39 extends ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
40
41
42
43
44
45
46
47 public SendAggregatorsToOwnerRequest(byte[] data, int senderTaskId) {
48 super(data, senderTaskId);
49 }
50
51
52
53
54 public SendAggregatorsToOwnerRequest() {
55 }
56
57 @Override
58 public void doRequest(ServerData serverData) {
59 UnsafeByteArrayOutputStream reusedOut = new UnsafeByteArrayOutputStream();
60 UnsafeReusableByteArrayInput reusedIn = new UnsafeReusableByteArrayInput();
61
62 UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
63 AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
64 try {
65 int num = input.readInt();
66 for (int i = 0; i < num; i++) {
67 String name = input.readUTF();
68 GlobalCommType type = GlobalCommType.values()[input.readByte()];
69 Writable value = WritableUtils.readWritableObject(input, conf);
70 if (type == GlobalCommType.SPECIAL_COUNT) {
71 aggregatorData.receivedRequestCountFromMaster(
72 ((LongWritable) value).get(),
73 getSenderTaskId());
74 } else {
75 aggregatorData.receiveValueFromMaster(name, type, value);
76
77 if (type == GlobalCommType.REDUCE_OPERATIONS) {
78 ReduceOperation<Object, Writable> reduceOpCopy =
79 (ReduceOperation<Object, Writable>)
80 WritableUtils.createCopy(reusedOut, reusedIn, value, conf);
81
82 serverData.getOwnerAggregatorData().registerReducer(
83 name, reduceOpCopy);
84 }
85 }
86 }
87 } catch (IOException e) {
88 throw new IllegalStateException("doRequest: " +
89 "IOException occurred while processing request", e);
90 }
91 aggregatorData.receivedRequestFromMaster(getData());
92 }
93
94 @Override
95 public RequestType getType() {
96 return RequestType.SEND_AGGREGATORS_TO_OWNER_REQUEST;
97 }
98 }