1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.giraph.comm.flow_control;
20
21 import com.yammer.metrics.core.Counter;
22 import org.apache.giraph.comm.netty.NettyClient;
23 import org.apache.giraph.comm.netty.handler.AckSignalFlag;
24 import org.apache.giraph.comm.requests.WritableRequest;
25 import org.apache.giraph.conf.FloatConfOption;
26 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
27 import org.apache.giraph.conf.IntConfOption;
28 import org.apache.giraph.metrics.GiraphMetrics;
29 import org.apache.giraph.metrics.MetricNames;
30 import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
31 import org.apache.giraph.metrics.SuperstepMetricsRegistry;
32 import org.apache.log4j.Logger;
33
34 import java.util.concurrent.atomic.AtomicInteger;
35
36 import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;
37
38
39
40
41
42 public class StaticFlowControl implements
43 FlowControl, ResetSuperstepMetricsObserver {
44
45 public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS =
46 new IntConfOption("giraph.maxNumberOfOpenRequests", 10000,
47 "Maximum number of requests without confirmation we should have");
48
49
50
51
52 public static final FloatConfOption
53 FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING =
54 new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding",
55 0.2f, "Fraction of requests to close before proceeding");
56
57 private static final Logger LOG = Logger.getLogger(StaticFlowControl.class);
58
59
60 private final int maxNumberOfOpenRequests;
61
62
63
64
65 private final int numberOfRequestsToProceed;
66
67 private final NettyClient nettyClient;
68
69 private final int waitingRequestMsecs;
70
71 private final Object requestSpotAvailable = new Object();
72
73 private Counter timeWaitingOnOpenRequests;
74
75 private final AtomicInteger numWaitingThreads = new AtomicInteger(0);
76
77
78
79
80
81
82
83 public StaticFlowControl(ImmutableClassesGiraphConfiguration conf,
84 NettyClient nettyClient) {
85 this.nettyClient = nettyClient;
86 maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf);
87 numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests *
88 (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf)));
89 if (LOG.isInfoEnabled()) {
90 LOG.info("StaticFlowControl: Limit number of open requests to " +
91 maxNumberOfOpenRequests + " and proceed when <= " +
92 numberOfRequestsToProceed);
93 }
94 waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);
95 GiraphMetrics.get().addSuperstepResetObserver(this);
96 }
97
98 @Override
99 public void newSuperstep(SuperstepMetricsRegistry metrics) {
100 timeWaitingOnOpenRequests = metrics.getCounter(
101 MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS);
102 }
103
104 @Override
105 public void sendRequest(int destTaskId, WritableRequest request) {
106 nettyClient.doSend(destTaskId, request);
107 if (nettyClient.getNumberOfOpenRequests() > maxNumberOfOpenRequests) {
108 long startTime = System.currentTimeMillis();
109 waitSomeRequests();
110 timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime);
111 }
112 }
113
114
115
116
117
118
119 private void waitSomeRequests() {
120 numWaitingThreads.getAndIncrement();
121 while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) {
122
123 synchronized (requestSpotAvailable) {
124 if (nettyClient.getNumberOfOpenRequests() <=
125 numberOfRequestsToProceed) {
126 break;
127 }
128 try {
129 requestSpotAvailable.wait(waitingRequestMsecs);
130 } catch (InterruptedException e) {
131 throw new IllegalStateException("waitSomeRequests: Got unexpected " +
132 "InterruptedException", e);
133 }
134 }
135 nettyClient.logAndSanityCheck();
136 }
137 numWaitingThreads.getAndDecrement();
138 }
139
140 @Override
141 public void messageAckReceived(int taskId, long requestId, int response) {
142 synchronized (requestSpotAvailable) {
143 requestSpotAvailable.notifyAll();
144 }
145 }
146
147 @Override
148 public AckSignalFlag getAckSignalFlag(int response) {
149 return AckSignalFlag.values()[response];
150 }
151
152 @Override
153 public int calculateResponse(AckSignalFlag alreadyDone, int clientId) {
154 return alreadyDone.ordinal();
155 }
156
157 @Override
158 public void logInfo() {
159 if (LOG.isInfoEnabled()) {
160 LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " +
161 "until number of open requests falls below " +
162 numberOfRequestsToProceed);
163 }
164 }
165
166 @Override
167 public void waitAllRequests() {
168
169
170
171
172 }
173
174 @Override
175 public int getNumberOfUnsentRequests() {
176 return 0;
177 }
178 }