1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.requests;
20
21 import java.io.IOException;
22
23 import org.apache.giraph.comm.GlobalCommType;
24 import org.apache.giraph.comm.ServerData;
25 import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData;
26 import org.apache.giraph.utils.UnsafeByteArrayInputStream;
27 import org.apache.hadoop.io.LongWritable;
28 import org.apache.hadoop.io.Writable;
29
30
31
32
33
34 public class SendWorkerAggregatorsRequest extends
35 ByteArrayWithSenderTaskIdRequest implements WorkerRequest {
36
37
38
39
40
41
42
43 public SendWorkerAggregatorsRequest(byte[] data, int senderTaskId) {
44 super(data, senderTaskId);
45 }
46
47
48
49
50 public SendWorkerAggregatorsRequest() {
51 }
52
53 @Override
54 public void doRequest(ServerData serverData) {
55 UnsafeByteArrayInputStream input = getUnsafeByteArrayInput();
56 OwnerAggregatorServerData aggregatorData =
57 serverData.getOwnerAggregatorData();
58 try {
59 int num = input.readInt();
60 for (int i = 0; i < num; i++) {
61 String name = input.readUTF();
62 GlobalCommType type = GlobalCommType.values()[input.readByte()];
63 if (type == GlobalCommType.SPECIAL_COUNT) {
64 LongWritable value = new LongWritable();
65 value.readFields(input);
66 aggregatorData.receivedRequestCountFromWorker(
67 value.get(),
68 getSenderTaskId());
69 } else if (type == GlobalCommType.REDUCED_VALUE) {
70 Writable value = aggregatorData.createInitialValue(name);
71 value.readFields(input);
72 aggregatorData.reduce(name, value);
73 } else {
74 throw new IllegalStateException(
75 "SendWorkerAggregatorsRequest received " + type);
76 }
77 }
78 } catch (IOException e) {
79 throw new IllegalStateException("doRequest: " +
80 "IOException occurred while processing request", e);
81 }
82 aggregatorData.receivedRequestFromWorker();
83 }
84
85 @Override
86 public RequestType getType() {
87 return RequestType.SEND_WORKER_AGGREGATORS_REQUEST;
88 }
89 }