1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.SocketChannel;
39 import java.security.AccessController;
40 import java.security.PrivilegedActionException;
41 import java.security.PrivilegedExceptionAction;
42 import java.util.Queue;
43 import java.util.Set;
44 import java.util.concurrent.ConcurrentLinkedQueue;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 import org.apache.hc.core5.concurrent.FutureCallback;
49 import org.apache.hc.core5.function.Callback;
50 import org.apache.hc.core5.function.Decorator;
51 import org.apache.hc.core5.io.CloseMode;
52 import org.apache.hc.core5.io.Closer;
53 import org.apache.hc.core5.net.NamedEndpoint;
54 import org.apache.hc.core5.util.Args;
55 import org.apache.hc.core5.util.Asserts;
56 import org.apache.hc.core5.util.Timeout;
57
58 class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
59
60 private static final int MAX_CHANNEL_REQUESTS = 10000;
61
62 private final IOEventHandlerFactory eventHandlerFactory;
63 private final IOReactorConfig reactorConfig;
64 private final Decorator<IOSession> ioSessionDecorator;
65 private final IOSessionListener sessionListener;
66 private final Callback<IOSession> sessionShutdownCallback;
67 private final Queue<InternalDataChannel> closedSessions;
68 private final Queue<ChannelEntry> channelQueue;
69 private final Queue<IOSessionRequest> requestQueue;
70 private final AtomicBoolean shutdownInitiated;
71 private final long selectTimeoutMillis;
72 private volatile long lastTimeoutCheckMillis;
73
74 SingleCoreIOReactor(
75 final Callback<Exception> exceptionCallback,
76 final IOEventHandlerFactory eventHandlerFactory,
77 final IOReactorConfig reactorConfig,
78 final Decorator<IOSession> ioSessionDecorator,
79 final IOSessionListener sessionListener,
80 final Callback<IOSession> sessionShutdownCallback) {
81 super(exceptionCallback);
82 this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
83 this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
84 this.ioSessionDecorator = ioSessionDecorator;
85 this.sessionListener = sessionListener;
86 this.sessionShutdownCallback = sessionShutdownCallback;
87 this.shutdownInitiated = new AtomicBoolean(false);
88 this.closedSessions = new ConcurrentLinkedQueue<>();
89 this.channelQueue = new ConcurrentLinkedQueue<>();
90 this.requestQueue = new ConcurrentLinkedQueue<>();
91 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
92 }
93
94 void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
95 if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
96 throw new IOReactorShutdownException("I/O reactor has been shut down");
97 }
98 this.channelQueue.add(entry);
99 this.selector.wakeup();
100 }
101
102 @Override
103 void doTerminate() {
104 closePendingChannels();
105 closePendingConnectionRequests();
106 processClosedSessions();
107 }
108
109 @Override
110 void doExecute() throws IOException {
111 while (!Thread.currentThread().isInterrupted()) {
112
113 final int readyCount = this.selector.select(this.selectTimeoutMillis);
114
115 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
116 if (this.shutdownInitiated.compareAndSet(false, true)) {
117 initiateSessionShutdown();
118 }
119 closePendingChannels();
120 }
121 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
122 break;
123 }
124
125
126 if (readyCount > 0) {
127 processEvents(this.selector.selectedKeys());
128 }
129
130 validateActiveChannels();
131
132
133 processClosedSessions();
134
135
136 if (getStatus() == IOReactorStatus.ACTIVE) {
137 processPendingChannels();
138 processPendingConnectionRequests();
139 }
140
141
142 if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
143 break;
144 }
145 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
146 break;
147 }
148 }
149 }
150
151 private void initiateSessionShutdown() {
152 if (this.sessionShutdownCallback != null) {
153 final Set<SelectionKey> keys = this.selector.keys();
154 for (final SelectionKey key : keys) {
155 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
156 if (channel instanceof InternalDataChannel) {
157 this.sessionShutdownCallback.execute((InternalDataChannel) channel);
158 }
159 }
160 }
161 }
162
163 private void validateActiveChannels() {
164 final long currentTimeMillis = System.currentTimeMillis();
165 if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
166 this.lastTimeoutCheckMillis = currentTimeMillis;
167 for (final SelectionKey key : this.selector.keys()) {
168 checkTimeout(key, currentTimeMillis);
169 }
170 }
171 }
172
173 private void processEvents(final Set<SelectionKey> selectedKeys) {
174 for (final SelectionKey key : selectedKeys) {
175 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
176 if (channel != null) {
177 try {
178 channel.handleIOEvent(key.readyOps());
179 } catch (final CancelledKeyException ex) {
180 channel.close(CloseMode.GRACEFUL);
181 }
182 }
183 }
184 selectedKeys.clear();
185 }
186
187 private void processPendingChannels() throws IOException {
188 ChannelEntry entry;
189 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
190 final SocketChannel socketChannel = entry.channel;
191 final Object attachment = entry.attachment;
192 try {
193 prepareSocket(socketChannel.socket());
194 socketChannel.configureBlocking(false);
195 } catch (final IOException ex) {
196 logException(ex);
197 try {
198 socketChannel.close();
199 } catch (final IOException ex2) {
200 logException(ex2);
201 }
202 throw ex;
203 }
204 final SelectionKey key;
205 try {
206 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
207 } catch (final ClosedChannelException ex) {
208 return;
209 }
210 final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
211 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
212 ioSession,
213 null,
214 ioSessionDecorator,
215 sessionListener,
216 closedSessions);
217 dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
218 dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
219 key.attach(dataChannel);
220 dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
221 }
222 }
223
224 private void processClosedSessions() {
225 for (;;) {
226 final InternalDataChannel dataChannel = this.closedSessions.poll();
227 if (dataChannel == null) {
228 break;
229 }
230 try {
231 dataChannel.disconnected();
232 } catch (final CancelledKeyException ex) {
233
234 }
235 }
236 }
237
238 private void checkTimeout(final SelectionKey key, final long nowMillis) {
239 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
240 if (channel != null) {
241 channel.checkTimeout(nowMillis);
242 }
243 }
244
245 @Override
246 public Future<IOSession> connect(
247 final NamedEndpoint remoteEndpoint,
248 final SocketAddress remoteAddress,
249 final SocketAddress localAddress,
250 final Timeout timeout,
251 final Object attachment,
252 final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
253 Args.notNull(remoteEndpoint, "Remote endpoint");
254 final IOSessionRequestt.html#IOSessionRequest">IOSessionRequest sessionRequest = new IOSessionRequest(
255 remoteEndpoint,
256 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
257 localAddress,
258 timeout,
259 attachment,
260 callback);
261
262 this.requestQueue.add(sessionRequest);
263 this.selector.wakeup();
264
265 return sessionRequest;
266 }
267
268 private void prepareSocket(final Socket socket) throws IOException {
269 socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
270 socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
271 if (this.reactorConfig.getSndBufSize() > 0) {
272 socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
273 }
274 if (this.reactorConfig.getRcvBufSize() > 0) {
275 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
276 }
277 if (this.reactorConfig.getTrafficClass() > 0) {
278 socket.setTrafficClass(this.reactorConfig.getTrafficClass());
279 }
280 final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
281 if (linger >= 0) {
282 socket.setSoLinger(true, linger);
283 }
284 }
285
286 private void validateAddress(final SocketAddress address) throws UnknownHostException {
287 if (address instanceof InetSocketAddress) {
288 final InetSocketAddress endpoint = (InetSocketAddress) address;
289 if (endpoint.isUnresolved()) {
290 throw new UnknownHostException(endpoint.getHostName());
291 }
292 }
293 }
294
295 private void processPendingConnectionRequests() {
296 IOSessionRequest sessionRequest;
297 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
298 if (!sessionRequest.isCancelled()) {
299 final SocketChannel socketChannel;
300 try {
301 socketChannel = SocketChannel.open();
302 } catch (final IOException ex) {
303 sessionRequest.failed(ex);
304 return;
305 }
306 try {
307 processConnectionRequest(socketChannel, sessionRequest);
308 } catch (final IOException | SecurityException ex) {
309 Closer.closeQuietly(socketChannel);
310 sessionRequest.failed(ex);
311 }
312 }
313 }
314 }
315
316 private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
317 validateAddress(sessionRequest.localAddress);
318 validateAddress(sessionRequest.remoteAddress);
319
320 socketChannel.configureBlocking(false);
321 prepareSocket(socketChannel.socket());
322
323 if (sessionRequest.localAddress != null) {
324 final Socket sock = socketChannel.socket();
325 sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
326 sock.bind(sessionRequest.localAddress);
327 }
328
329 final SocketAddress targetAddress;
330 final IOEventHandlerFactory eventHandlerFactory;
331 if (this.reactorConfig.getSocksProxyAddress() != null) {
332 targetAddress = this.reactorConfig.getSocksProxyAddress();
333 eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
334 sessionRequest.remoteAddress,
335 this.reactorConfig.getSocksProxyUsername(),
336 this.reactorConfig.getSocksProxyPassword(),
337 this.eventHandlerFactory);
338 } else {
339 targetAddress = sessionRequest.remoteAddress;
340 eventHandlerFactory = this.eventHandlerFactory;
341 }
342
343
344
345 final boolean connected;
346 try {
347 connected = AccessController.doPrivileged(
348 new PrivilegedExceptionAction<Boolean>() {
349 @Override
350 public Boolean run() throws IOException {
351 return socketChannel.connect(targetAddress);
352 }
353 });
354 } catch (final PrivilegedActionException e) {
355 Asserts.check(e.getCause() instanceof IOException,
356 "method contract violation only checked exceptions are wrapped: " + e.getCause());
357
358 throw (IOException) e.getCause();
359 }
360
361
362 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
363 final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
364
365 @Override
366 public InternalDataChannel create(
367 final SelectionKey key,
368 final SocketChannel socketChannel,
369 final NamedEndpoint namedEndpoint,
370 final Object attachment) {
371 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
372 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
373 ioSession,
374 namedEndpoint,
375 ioSessionDecorator,
376 sessionListener,
377 closedSessions);
378 dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
379 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
380 return dataChannel;
381 }
382
383 });
384 if (connected) {
385 channel.handleIOEvent(SelectionKey.OP_CONNECT);
386 } else {
387 key.attach(channel);
388 sessionRequest.assign(channel);
389 }
390 }
391
392 private void closePendingChannels() {
393 ChannelEntry entry;
394 while ((entry = this.channelQueue.poll()) != null) {
395 final SocketChannel socketChannel = entry.channel;
396 try {
397 socketChannel.close();
398 } catch (final IOException ex) {
399 logException(ex);
400 }
401 }
402 }
403
404 private void closePendingConnectionRequests() {
405 IOSessionRequest sessionRequest;
406 while ((sessionRequest = this.requestQueue.poll()) != null) {
407 sessionRequest.cancel();
408 }
409 }
410
411 }