1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import org.apache.giraph.comm.flow_control.FlowControl;
22 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
23 import org.apache.giraph.comm.ServerData;
24 import org.apache.giraph.comm.requests.WorkerRequest;
25 import org.apache.giraph.graph.TaskInfo;
26 import org.apache.hadoop.io.Writable;
27 import org.apache.hadoop.io.WritableComparable;
28
29
30
31
32
33
34
35
36
37 public class WorkerRequestServerHandler<I extends WritableComparable,
38 V extends Writable, E extends Writable, M extends Writable> extends
39 RequestServerHandler<WorkerRequest<I, V, E>> {
40
41 private final ServerData<I, V, E> serverData;
42
43
44
45
46
47
48
49
50
51
52
53 public WorkerRequestServerHandler(ServerData<I, V, E> serverData,
54 WorkerRequestReservedMap workerRequestReservedMap,
55 ImmutableClassesGiraphConfiguration conf,
56 TaskInfo myTaskInfo,
57 Thread.UncaughtExceptionHandler exceptionHandler,
58 FlowControl flowControl) {
59 super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
60 this.serverData = serverData;
61 this.flowControl = flowControl;
62 }
63
64 @Override
65 public void processRequest(WorkerRequest<I, V, E> request) {
66 request.doRequest(serverData);
67 }
68
69
70 public static class Factory<I extends WritableComparable,
71 V extends Writable, E extends Writable> implements
72 RequestServerHandler.Factory {
73
74 private final ServerData<I, V, E> serverData;
75
76 private FlowControl flowControl;
77
78
79
80
81
82
83 public Factory(ServerData<I, V, E> serverData) {
84 this.serverData = serverData;
85 }
86
87 @Override
88 public RequestServerHandler newHandler(
89 WorkerRequestReservedMap workerRequestReservedMap,
90 ImmutableClassesGiraphConfiguration conf,
91 TaskInfo myTaskInfo,
92 Thread.UncaughtExceptionHandler exceptionHandler) {
93 return new WorkerRequestServerHandler<I, V, E, Writable>(serverData,
94 workerRequestReservedMap, conf, myTaskInfo, exceptionHandler,
95 flowControl);
96 }
97
98 @Override
99 public void setFlowControl(FlowControl flowControl) {
100 this.flowControl = flowControl;
101 }
102 }
103 }