1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.netty.handler;
20
21 import org.apache.giraph.comm.flow_control.FlowControl;
22 import org.apache.giraph.comm.requests.MasterRequest;
23 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
24 import org.apache.giraph.graph.TaskInfo;
25 import org.apache.giraph.master.MasterGlobalCommHandler;
26
27
28 public class MasterRequestServerHandler extends
29 RequestServerHandler<MasterRequest> {
30
31 private final MasterGlobalCommHandler commHandler;
32
33
34
35
36
37
38
39
40
41
42
43 public MasterRequestServerHandler(
44 WorkerRequestReservedMap workerRequestReservedMap,
45 ImmutableClassesGiraphConfiguration conf,
46 TaskInfo myTaskInfo,
47 MasterGlobalCommHandler commHandler,
48 Thread.UncaughtExceptionHandler exceptionHandler,
49 FlowControl flowControl) {
50 super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
51 this.commHandler = commHandler;
52 this.flowControl = flowControl;
53 }
54
55 @Override
56 public void processRequest(MasterRequest request) {
57 request.doRequest(commHandler);
58 }
59
60
61
62
63 public static class Factory implements RequestServerHandler.Factory {
64
65 private final MasterGlobalCommHandler commHandler;
66
67 private FlowControl flowControl;
68
69
70
71
72
73
74 public Factory(MasterGlobalCommHandler commHandler) {
75 this.commHandler = commHandler;
76 }
77
78 @Override
79 public RequestServerHandler newHandler(
80 WorkerRequestReservedMap workerRequestReservedMap,
81 ImmutableClassesGiraphConfiguration conf,
82 TaskInfo myTaskInfo,
83 Thread.UncaughtExceptionHandler exceptionHandler) {
84 return new MasterRequestServerHandler(workerRequestReservedMap, conf,
85 myTaskInfo, commHandler, exceptionHandler, flowControl);
86 }
87
88 @Override
89 public void setFlowControl(FlowControl flowControl) {
90 this.flowControl = flowControl;
91 }
92 }
93 }