1 | |
|
2 | |
|
3 | |
|
4 | |
|
5 | |
|
6 | |
|
7 | |
|
8 | |
|
9 | |
|
10 | |
|
11 | |
|
12 | |
|
13 | |
|
14 | |
|
15 | |
|
16 | |
|
17 | |
|
18 | |
|
19 | |
package org.apache.giraph.comm.flow_control; |
20 | |
|
21 | |
import com.yammer.metrics.core.Counter; |
22 | |
import org.apache.giraph.comm.netty.NettyClient; |
23 | |
import org.apache.giraph.comm.netty.handler.AckSignalFlag; |
24 | |
import org.apache.giraph.comm.requests.WritableRequest; |
25 | |
import org.apache.giraph.conf.FloatConfOption; |
26 | |
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; |
27 | |
import org.apache.giraph.conf.IntConfOption; |
28 | |
import org.apache.giraph.metrics.GiraphMetrics; |
29 | |
import org.apache.giraph.metrics.MetricNames; |
30 | |
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; |
31 | |
import org.apache.giraph.metrics.SuperstepMetricsRegistry; |
32 | |
import org.apache.log4j.Logger; |
33 | |
|
34 | |
import java.util.concurrent.atomic.AtomicInteger; |
35 | |
|
36 | |
import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; |
37 | |
|
38 | |
|
39 | |
|
40 | |
|
41 | |
|
42 | |
public class StaticFlowControl implements |
43 | |
FlowControl, ResetSuperstepMetricsObserver { |
44 | |
|
45 | 0 | public static final IntConfOption MAX_NUMBER_OF_OPEN_REQUESTS = |
46 | |
new IntConfOption("giraph.maxNumberOfOpenRequests", 10000, |
47 | |
"Maximum number of requests without confirmation we should have"); |
48 | |
|
49 | |
|
50 | |
|
51 | |
|
52 | |
public static final FloatConfOption |
53 | 0 | FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING = |
54 | |
new FloatConfOption("giraph.fractionOfRequestsToCloseBeforeProceeding", |
55 | |
0.2f, "Fraction of requests to close before proceeding"); |
56 | |
|
57 | 0 | private static final Logger LOG = Logger.getLogger(StaticFlowControl.class); |
58 | |
|
59 | |
|
60 | |
private final int maxNumberOfOpenRequests; |
61 | |
|
62 | |
|
63 | |
|
64 | |
|
65 | |
private final int numberOfRequestsToProceed; |
66 | |
|
67 | |
private final NettyClient nettyClient; |
68 | |
|
69 | |
private final int waitingRequestMsecs; |
70 | |
|
71 | 0 | private final Object requestSpotAvailable = new Object(); |
72 | |
|
73 | |
private Counter timeWaitingOnOpenRequests; |
74 | |
|
75 | 0 | private final AtomicInteger numWaitingThreads = new AtomicInteger(0); |
76 | |
|
77 | |
|
78 | |
|
79 | |
|
80 | |
|
81 | |
|
82 | |
|
83 | |
public StaticFlowControl(ImmutableClassesGiraphConfiguration conf, |
84 | 0 | NettyClient nettyClient) { |
85 | 0 | this.nettyClient = nettyClient; |
86 | 0 | maxNumberOfOpenRequests = MAX_NUMBER_OF_OPEN_REQUESTS.get(conf); |
87 | 0 | numberOfRequestsToProceed = (int) (maxNumberOfOpenRequests * |
88 | 0 | (1 - FRACTION_OF_REQUESTS_TO_CLOSE_BEFORE_PROCEEDING.get(conf))); |
89 | 0 | if (LOG.isInfoEnabled()) { |
90 | 0 | LOG.info("StaticFlowControl: Limit number of open requests to " + |
91 | |
maxNumberOfOpenRequests + " and proceed when <= " + |
92 | |
numberOfRequestsToProceed); |
93 | |
} |
94 | 0 | waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf); |
95 | 0 | GiraphMetrics.get().addSuperstepResetObserver(this); |
96 | 0 | } |
97 | |
|
98 | |
@Override |
99 | |
public void newSuperstep(SuperstepMetricsRegistry metrics) { |
100 | 0 | timeWaitingOnOpenRequests = metrics.getCounter( |
101 | |
MetricNames.TIME_SPENT_WAITING_ON_TOO_MANY_OPEN_REQUESTS_MS); |
102 | 0 | } |
103 | |
|
104 | |
@Override |
105 | |
public void sendRequest(int destTaskId, WritableRequest request) { |
106 | 0 | nettyClient.doSend(destTaskId, request); |
107 | 0 | if (nettyClient.getNumberOfOpenRequests() > maxNumberOfOpenRequests) { |
108 | 0 | long startTime = System.currentTimeMillis(); |
109 | 0 | waitSomeRequests(); |
110 | 0 | timeWaitingOnOpenRequests.inc(System.currentTimeMillis() - startTime); |
111 | |
} |
112 | 0 | } |
113 | |
|
114 | |
|
115 | |
|
116 | |
|
117 | |
|
118 | |
|
119 | |
private void waitSomeRequests() { |
120 | 0 | numWaitingThreads.getAndIncrement(); |
121 | 0 | while (nettyClient.getNumberOfOpenRequests() > numberOfRequestsToProceed) { |
122 | |
|
123 | 0 | synchronized (requestSpotAvailable) { |
124 | 0 | if (nettyClient.getNumberOfOpenRequests() <= |
125 | |
numberOfRequestsToProceed) { |
126 | 0 | break; |
127 | |
} |
128 | |
try { |
129 | 0 | requestSpotAvailable.wait(waitingRequestMsecs); |
130 | 0 | } catch (InterruptedException e) { |
131 | 0 | throw new IllegalStateException("waitSomeRequests: Got unexpected " + |
132 | |
"InterruptedException", e); |
133 | 0 | } |
134 | 0 | } |
135 | 0 | nettyClient.logAndSanityCheck(); |
136 | |
} |
137 | 0 | numWaitingThreads.getAndDecrement(); |
138 | 0 | } |
139 | |
|
140 | |
@Override |
141 | |
public void messageAckReceived(int taskId, long requestId, int response) { |
142 | 0 | synchronized (requestSpotAvailable) { |
143 | 0 | requestSpotAvailable.notifyAll(); |
144 | 0 | } |
145 | 0 | } |
146 | |
|
147 | |
@Override |
148 | |
public AckSignalFlag getAckSignalFlag(int response) { |
149 | 0 | return AckSignalFlag.values()[response]; |
150 | |
} |
151 | |
|
152 | |
@Override |
153 | |
public int calculateResponse(AckSignalFlag alreadyDone, int clientId) { |
154 | 0 | return alreadyDone.ordinal(); |
155 | |
} |
156 | |
|
157 | |
@Override |
158 | |
public void logInfo() { |
159 | 0 | if (LOG.isInfoEnabled()) { |
160 | 0 | LOG.info("logInfo: " + numWaitingThreads.get() + " threads waiting " + |
161 | |
"until number of open requests falls below " + |
162 | |
numberOfRequestsToProceed); |
163 | |
} |
164 | 0 | } |
165 | |
|
166 | |
@Override |
167 | |
public void waitAllRequests() { |
168 | |
|
169 | |
|
170 | |
|
171 | |
|
172 | 0 | } |
173 | |
|
174 | |
@Override |
175 | |
public int getNumberOfUnsentRequests() { |
176 | 0 | return 0; |
177 | |
} |
178 | |
} |