1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.SocketChannel;
39 import java.security.AccessController;
40 import java.security.PrivilegedActionException;
41 import java.security.PrivilegedExceptionAction;
42 import java.util.Queue;
43 import java.util.Set;
44 import java.util.concurrent.ConcurrentLinkedQueue;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.atomic.AtomicBoolean;
47
48 import org.apache.hc.core5.concurrent.FutureCallback;
49 import org.apache.hc.core5.function.Callback;
50 import org.apache.hc.core5.function.Decorator;
51 import org.apache.hc.core5.io.CloseMode;
52 import org.apache.hc.core5.io.Closer;
53 import org.apache.hc.core5.net.NamedEndpoint;
54 import org.apache.hc.core5.util.Args;
55 import org.apache.hc.core5.util.Asserts;
56 import org.apache.hc.core5.util.Timeout;
57
58 class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
59
60 private static final int MAX_CHANNEL_REQUESTS = 10000;
61
62 private final IOEventHandlerFactory eventHandlerFactory;
63 private final IOReactorConfig reactorConfig;
64 private final Decorator<IOSession> ioSessionDecorator;
65 private final IOSessionListener sessionListener;
66 private final Callback<IOSession> sessionShutdownCallback;
67 private final Queue<InternalDataChannel> closedSessions;
68 private final Queue<SocketChannel> channelQueue;
69 private final Queue<IOSessionRequest> requestQueue;
70 private final AtomicBoolean shutdownInitiated;
71 private final long selectTimeoutMillis;
72 private volatile long lastTimeoutCheckMillis;
73
74 SingleCoreIOReactor(
75 final Callback<Exception> exceptionCallback,
76 final IOEventHandlerFactory eventHandlerFactory,
77 final IOReactorConfig reactorConfig,
78 final Decorator<IOSession> ioSessionDecorator,
79 final IOSessionListener sessionListener,
80 final Callback<IOSession> sessionShutdownCallback) {
81 super(exceptionCallback);
82 this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
83 this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
84 this.ioSessionDecorator = ioSessionDecorator;
85 this.sessionListener = sessionListener;
86 this.sessionShutdownCallback = sessionShutdownCallback;
87 this.shutdownInitiated = new AtomicBoolean(false);
88 this.closedSessions = new ConcurrentLinkedQueue<>();
89 this.channelQueue = new ConcurrentLinkedQueue<>();
90 this.requestQueue = new ConcurrentLinkedQueue<>();
91 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
92 }
93
94 void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
95 Args.notNull(socketChannel, "SocketChannel");
96 if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
97 throw new IOReactorShutdownException("I/O reactor has been shut down");
98 }
99 this.channelQueue.add(socketChannel);
100 this.selector.wakeup();
101 }
102
103 @Override
104 void doTerminate() {
105 closePendingChannels();
106 closePendingConnectionRequests();
107 processClosedSessions();
108 }
109
110 @Override
111 void doExecute() throws IOException {
112 while (!Thread.currentThread().isInterrupted()) {
113
114 final int readyCount = this.selector.select(this.selectTimeoutMillis);
115
116 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
117 if (this.shutdownInitiated.compareAndSet(false, true)) {
118 initiateSessionShutdown();
119 }
120 closePendingChannels();
121 }
122 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
123 break;
124 }
125
126
127 if (readyCount > 0) {
128 processEvents(this.selector.selectedKeys());
129 }
130
131 validateActiveChannels();
132
133
134 processClosedSessions();
135
136
137 if (getStatus() == IOReactorStatus.ACTIVE) {
138 processPendingChannels();
139 processPendingConnectionRequests();
140 }
141
142
143 if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
144 break;
145 }
146 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
147 break;
148 }
149 }
150 }
151
152 private void initiateSessionShutdown() {
153 if (this.sessionShutdownCallback != null) {
154 final Set<SelectionKey> keys = this.selector.keys();
155 for (final SelectionKey key : keys) {
156 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
157 if (channel instanceof InternalDataChannel) {
158 this.sessionShutdownCallback.execute((InternalDataChannel) channel);
159 }
160 }
161 }
162 }
163
164 private void validateActiveChannels() {
165 final long currentTimeMillis = System.currentTimeMillis();
166 if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
167 this.lastTimeoutCheckMillis = currentTimeMillis;
168 for (final SelectionKey key : this.selector.keys()) {
169 checkTimeout(key, currentTimeMillis);
170 }
171 }
172 }
173
174 private void processEvents(final Set<SelectionKey> selectedKeys) {
175 for (final SelectionKey key : selectedKeys) {
176 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
177 if (channel != null) {
178 try {
179 channel.handleIOEvent(key.readyOps());
180 } catch (final CancelledKeyException ex) {
181 channel.close(CloseMode.GRACEFUL);
182 }
183 }
184 }
185 selectedKeys.clear();
186 }
187
188 private void processPendingChannels() throws IOException {
189 SocketChannel socketChannel;
190 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
191 try {
192 prepareSocket(socketChannel.socket());
193 socketChannel.configureBlocking(false);
194 } catch (final IOException ex) {
195 logException(ex);
196 try {
197 socketChannel.close();
198 } catch (final IOException ex2) {
199 logException(ex2);
200 }
201 throw ex;
202 }
203 final SelectionKey key;
204 try {
205 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
206 } catch (final ClosedChannelException ex) {
207 return;
208 }
209 final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
210 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
211 ioSession,
212 null,
213 ioSessionDecorator,
214 sessionListener,
215 closedSessions);
216 dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, null));
217 dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
218 key.attach(dataChannel);
219 dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
220 }
221 }
222
223 private void processClosedSessions() {
224 for (;;) {
225 final InternalDataChannel dataChannel = this.closedSessions.poll();
226 if (dataChannel == null) {
227 break;
228 }
229 try {
230 dataChannel.disconnected();
231 } catch (final CancelledKeyException ex) {
232
233 }
234 }
235 }
236
237 private void checkTimeout(final SelectionKey key, final long nowMillis) {
238 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
239 if (channel != null) {
240 channel.checkTimeout(nowMillis);
241 }
242 }
243
244 @Override
245 public Future<IOSession> connect(
246 final NamedEndpoint remoteEndpoint,
247 final SocketAddress remoteAddress,
248 final SocketAddress localAddress,
249 final Timeout timeout,
250 final Object attachment,
251 final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
252 Args.notNull(remoteEndpoint, "Remote endpoint");
253 final IOSessionRequestt.html#IOSessionRequest">IOSessionRequest sessionRequest = new IOSessionRequest(
254 remoteEndpoint,
255 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
256 localAddress,
257 timeout,
258 attachment,
259 callback);
260
261 this.requestQueue.add(sessionRequest);
262 this.selector.wakeup();
263
264 return sessionRequest;
265 }
266
267 private void prepareSocket(final Socket socket) throws IOException {
268 socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
269 socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
270 if (this.reactorConfig.getSndBufSize() > 0) {
271 socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
272 }
273 if (this.reactorConfig.getRcvBufSize() > 0) {
274 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
275 }
276 final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
277 if (linger >= 0) {
278 socket.setSoLinger(true, linger);
279 }
280 }
281
282 private void validateAddress(final SocketAddress address) throws UnknownHostException {
283 if (address instanceof InetSocketAddress) {
284 final InetSocketAddress endpoint = (InetSocketAddress) address;
285 if (endpoint.isUnresolved()) {
286 throw new UnknownHostException(endpoint.getHostName());
287 }
288 }
289 }
290
291 private void processPendingConnectionRequests() {
292 IOSessionRequest sessionRequest;
293 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
294 if (!sessionRequest.isCancelled()) {
295 final SocketChannel socketChannel;
296 try {
297 socketChannel = SocketChannel.open();
298 } catch (final IOException ex) {
299 sessionRequest.failed(ex);
300 return;
301 }
302 try {
303 processConnectionRequest(socketChannel, sessionRequest);
304 } catch (final IOException | SecurityException ex) {
305 Closer.closeQuietly(socketChannel);
306 sessionRequest.failed(ex);
307 }
308 }
309 }
310 }
311
312 private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
313 validateAddress(sessionRequest.localAddress);
314 validateAddress(sessionRequest.remoteAddress);
315
316 socketChannel.configureBlocking(false);
317 prepareSocket(socketChannel.socket());
318
319 if (sessionRequest.localAddress != null) {
320 final Socket sock = socketChannel.socket();
321 sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
322 sock.bind(sessionRequest.localAddress);
323 }
324
325 final SocketAddress targetAddress;
326 final IOEventHandlerFactory eventHandlerFactory;
327 if (this.reactorConfig.getSocksProxyAddress() != null) {
328 targetAddress = this.reactorConfig.getSocksProxyAddress();
329 eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
330 sessionRequest.remoteAddress,
331 this.reactorConfig.getSocksProxyUsername(),
332 this.reactorConfig.getSocksProxyPassword(),
333 this.eventHandlerFactory);
334 } else {
335 targetAddress = sessionRequest.remoteAddress;
336 eventHandlerFactory = this.eventHandlerFactory;
337 }
338
339
340
341 final boolean connected;
342 try {
343 connected = AccessController.doPrivileged(
344 new PrivilegedExceptionAction<Boolean>() {
345 @Override
346 public Boolean run() throws IOException {
347 return socketChannel.connect(targetAddress);
348 }
349 });
350 } catch (final PrivilegedActionException e) {
351 Asserts.check(e.getCause() instanceof IOException,
352 "method contract violation only checked exceptions are wrapped: " + e.getCause());
353
354 throw (IOException) e.getCause();
355 }
356
357
358 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
359 final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
360
361 @Override
362 public InternalDataChannel create(
363 final SelectionKey key,
364 final SocketChannel socketChannel,
365 final NamedEndpoint namedEndpoint,
366 final Object attachment) {
367 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
368 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
369 ioSession,
370 namedEndpoint,
371 ioSessionDecorator,
372 sessionListener,
373 closedSessions);
374 dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
375 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
376 return dataChannel;
377 }
378
379 });
380 if (connected) {
381 channel.handleIOEvent(SelectionKey.OP_CONNECT);
382 } else {
383 key.attach(channel);
384 sessionRequest.assign(channel);
385 }
386 }
387
388 private void closePendingChannels() {
389 SocketChannel socketChannel;
390 while ((socketChannel = this.channelQueue.poll()) != null) {
391 try {
392 socketChannel.close();
393 } catch (final IOException ex) {
394 logException(ex);
395 }
396 }
397 }
398
399 private void closePendingConnectionRequests() {
400 IOSessionRequest sessionRequest;
401 while ((sessionRequest = this.requestQueue.poll()) != null) {
402 sessionRequest.cancel();
403 }
404 }
405
406 }