1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.comm.GlobalCommType;
24 import org.apache.giraph.comm.ServerData;
25 import org.apache.giraph.comm.aggregators.AllAggregatorServerData;
26 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
27 import org.apache.giraph.utils.WritableUtils;
28 import org.apache.hadoop.io.LongWritable;
29 import org.apache.hadoop.io.Writable;
30
31
32
33
34
35 public class SendAggregatorsToWorkerRequest extends
36 ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
37
38
39
40
41
42
43
44 public SendAggregatorsToWorkerRequest(byte[] data, int senderTaskId) {
45 super(data, senderTaskId);
46 }
47
48
49
50
51 public SendAggregatorsToWorkerRequest() {
52 }
53
54 @Override
55 public void doRequest(ServerData serverData) {
56 UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
57 AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
58 try {
59 int num = input.readInt();
60 for (int i = 0; i < num; i++) {
61 String name = input.readUTF();
62 GlobalCommType type = GlobalCommType.values()[input.readByte()];
63 Writable value = WritableUtils.readWritableObject(input, conf);
64 if (type == GlobalCommType.SPECIAL_COUNT) {
65 aggregatorData.receivedRequestCountFromWorker(
66 ((LongWritable) value).get(),
67 getSenderTaskId());
68 } else {
69 aggregatorData.receiveValueFromMaster(name, type, value);
70 }
71 }
72 } catch (IOException e) {
73 throw new IllegalStateException("doRequest: " +
74 "IOException occurred while processing request", e);
75 }
76 aggregatorData.receivedRequestFromWorker();
77 }
78
79 @Override
80 public RequestType getType() {
81 return RequestType.SEND_AGGREGATORS_TO_WORKER_REQUEST;
82 }
83 }