View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.SocketChannel;
39  import java.security.AccessController;
40  import java.security.PrivilegedActionException;
41  import java.security.PrivilegedExceptionAction;
42  import java.util.Queue;
43  import java.util.Set;
44  import java.util.concurrent.ConcurrentLinkedQueue;
45  import java.util.concurrent.Future;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  
48  import org.apache.hc.core5.concurrent.FutureCallback;
49  import org.apache.hc.core5.function.Callback;
50  import org.apache.hc.core5.function.Decorator;
51  import org.apache.hc.core5.io.CloseMode;
52  import org.apache.hc.core5.io.Closer;
53  import org.apache.hc.core5.net.NamedEndpoint;
54  import org.apache.hc.core5.util.Args;
55  import org.apache.hc.core5.util.Asserts;
56  import org.apache.hc.core5.util.Timeout;
57  
58  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
59  
60      private static final int MAX_CHANNEL_REQUESTS = 10000;
61  
62      private final IOEventHandlerFactory eventHandlerFactory;
63      private final IOReactorConfig reactorConfig;
64      private final Decorator<IOSession> ioSessionDecorator;
65      private final IOSessionListener sessionListener;
66      private final Callback<IOSession> sessionShutdownCallback;
67      private final Queue<InternalDataChannel> closedSessions;
68      private final Queue<SocketChannel> channelQueue;
69      private final Queue<IOSessionRequest> requestQueue;
70      private final AtomicBoolean shutdownInitiated;
71      private final long selectTimeoutMillis;
72      private volatile long lastTimeoutCheckMillis;
73  
74      SingleCoreIOReactor(
75              final Callback<Exception> exceptionCallback,
76              final IOEventHandlerFactory eventHandlerFactory,
77              final IOReactorConfig reactorConfig,
78              final Decorator<IOSession> ioSessionDecorator,
79              final IOSessionListener sessionListener,
80              final Callback<IOSession> sessionShutdownCallback) {
81          super(exceptionCallback);
82          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
83          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
84          this.ioSessionDecorator = ioSessionDecorator;
85          this.sessionListener = sessionListener;
86          this.sessionShutdownCallback = sessionShutdownCallback;
87          this.shutdownInitiated = new AtomicBoolean(false);
88          this.closedSessions = new ConcurrentLinkedQueue<>();
89          this.channelQueue = new ConcurrentLinkedQueue<>();
90          this.requestQueue = new ConcurrentLinkedQueue<>();
91          this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
92      }
93  
94      void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException {
95          Args.notNull(socketChannel, "SocketChannel");
96          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
97              throw new IOReactorShutdownException("I/O reactor has been shut down");
98          }
99          this.channelQueue.add(socketChannel);
100         this.selector.wakeup();
101     }
102 
103     @Override
104     void doTerminate() {
105         closePendingChannels();
106         closePendingConnectionRequests();
107         processClosedSessions();
108     }
109 
110     @Override
111     void doExecute() throws IOException {
112         while (!Thread.currentThread().isInterrupted()) {
113 
114             final int readyCount = this.selector.select(this.selectTimeoutMillis);
115 
116             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
117                 if (this.shutdownInitiated.compareAndSet(false, true)) {
118                     initiateSessionShutdown();
119                 }
120                 closePendingChannels();
121             }
122             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
123                 break;
124             }
125 
126             // Process selected I/O events
127             if (readyCount > 0) {
128                 processEvents(this.selector.selectedKeys());
129             }
130 
131             validateActiveChannels();
132 
133             // Process closed sessions
134             processClosedSessions();
135 
136             // If active process new channels
137             if (getStatus() == IOReactorStatus.ACTIVE) {
138                 processPendingChannels();
139                 processPendingConnectionRequests();
140             }
141 
142             // Exit select loop if graceful shutdown has been completed
143             if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
144                 break;
145             }
146             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
147                 break;
148             }
149         }
150     }
151 
152     private void initiateSessionShutdown() {
153         if (this.sessionShutdownCallback != null) {
154             final Set<SelectionKey> keys = this.selector.keys();
155             for (final SelectionKey key : keys) {
156                 final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
157                 if (channel instanceof InternalDataChannel) {
158                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
159                 }
160             }
161         }
162     }
163 
164     private void validateActiveChannels() {
165         final long currentTimeMillis = System.currentTimeMillis();
166         if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
167             this.lastTimeoutCheckMillis = currentTimeMillis;
168             for (final SelectionKey key : this.selector.keys()) {
169                 checkTimeout(key, currentTimeMillis);
170             }
171         }
172     }
173 
174     private void processEvents(final Set<SelectionKey> selectedKeys) {
175         for (final SelectionKey key : selectedKeys) {
176             final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
177             if (channel != null) {
178                 try {
179                     channel.handleIOEvent(key.readyOps());
180                 } catch (final CancelledKeyException ex) {
181                     channel.close(CloseMode.GRACEFUL);
182                 }
183             }
184         }
185         selectedKeys.clear();
186     }
187 
188     private void processPendingChannels() throws IOException {
189         SocketChannel socketChannel;
190         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (socketChannel = this.channelQueue.poll()) != null; i++) {
191             try {
192                 prepareSocket(socketChannel.socket());
193                 socketChannel.configureBlocking(false);
194             } catch (final IOException ex) {
195                 logException(ex);
196                 try {
197                     socketChannel.close();
198                 } catch (final IOException ex2) {
199                     logException(ex2);
200                 }
201                 throw ex;
202             }
203             final SelectionKey key;
204             try {
205                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
206             } catch (final ClosedChannelException ex) {
207                 return;
208             }
209             final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
210             final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
211                     ioSession,
212                     null,
213                     ioSessionDecorator,
214                     sessionListener,
215                     closedSessions);
216             dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, null));
217             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
218             key.attach(dataChannel);
219             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
220         }
221     }
222 
223     private void processClosedSessions() {
224         for (;;) {
225             final InternalDataChannel dataChannel = this.closedSessions.poll();
226             if (dataChannel == null) {
227                 break;
228             }
229             try {
230                 dataChannel.disconnected();
231             } catch (final CancelledKeyException ex) {
232                 // ignore and move on
233             }
234         }
235     }
236 
237     private void checkTimeout(final SelectionKey key, final long nowMillis) {
238         final InternalChannel./../org/apache/hc/core5/reactor/InternalChannel.html#InternalChannel">InternalChannel channel = (InternalChannel) key.attachment();
239         if (channel != null) {
240             channel.checkTimeout(nowMillis);
241         }
242     }
243 
244     @Override
245     public Future<IOSession> connect(
246             final NamedEndpoint remoteEndpoint,
247             final SocketAddress remoteAddress,
248             final SocketAddress localAddress,
249             final Timeout timeout,
250             final Object attachment,
251             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
252         Args.notNull(remoteEndpoint, "Remote endpoint");
253         final IOSessionRequestt.html#IOSessionRequest">IOSessionRequest sessionRequest = new IOSessionRequest(
254                 remoteEndpoint,
255                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
256                 localAddress,
257                 timeout,
258                 attachment,
259                 callback);
260 
261         this.requestQueue.add(sessionRequest);
262         this.selector.wakeup();
263 
264         return sessionRequest;
265     }
266 
267     private void prepareSocket(final Socket socket) throws IOException {
268         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
269         socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
270         if (this.reactorConfig.getSndBufSize() > 0) {
271             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
272         }
273         if (this.reactorConfig.getRcvBufSize() > 0) {
274             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
275         }
276         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
277         if (linger >= 0) {
278             socket.setSoLinger(true, linger);
279         }
280     }
281 
282     private void validateAddress(final SocketAddress address) throws UnknownHostException {
283         if (address instanceof InetSocketAddress) {
284             final InetSocketAddress endpoint = (InetSocketAddress) address;
285             if (endpoint.isUnresolved()) {
286                 throw new UnknownHostException(endpoint.getHostName());
287             }
288         }
289     }
290 
291     private void processPendingConnectionRequests() {
292         IOSessionRequest sessionRequest;
293         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
294             if (!sessionRequest.isCancelled()) {
295                 final SocketChannel socketChannel;
296                 try {
297                     socketChannel = SocketChannel.open();
298                 } catch (final IOException ex) {
299                     sessionRequest.failed(ex);
300                     return;
301                 }
302                 try {
303                     processConnectionRequest(socketChannel, sessionRequest);
304                 } catch (final IOException | SecurityException ex) {
305                     Closer.closeQuietly(socketChannel);
306                     sessionRequest.failed(ex);
307                 }
308             }
309         }
310     }
311 
312     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
313         validateAddress(sessionRequest.localAddress);
314         validateAddress(sessionRequest.remoteAddress);
315 
316         socketChannel.configureBlocking(false);
317         prepareSocket(socketChannel.socket());
318 
319         if (sessionRequest.localAddress != null) {
320             final Socket sock = socketChannel.socket();
321             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
322             sock.bind(sessionRequest.localAddress);
323         }
324 
325         final SocketAddress targetAddress;
326         final IOEventHandlerFactory eventHandlerFactory;
327         if (this.reactorConfig.getSocksProxyAddress() != null) {
328             targetAddress = this.reactorConfig.getSocksProxyAddress();
329             eventHandlerFactory = new SocksProxyProtocolHandlerFactory(
330                     sessionRequest.remoteAddress,
331                     this.reactorConfig.getSocksProxyUsername(),
332                     this.reactorConfig.getSocksProxyPassword(),
333                     this.eventHandlerFactory);
334         } else {
335             targetAddress = sessionRequest.remoteAddress;
336             eventHandlerFactory = this.eventHandlerFactory;
337         }
338 
339         // Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect permissions
340         // only to this library
341         final boolean connected;
342         try {
343             connected = AccessController.doPrivileged(
344                         new PrivilegedExceptionAction<Boolean>() {
345                             @Override
346                             public Boolean run() throws IOException {
347                                 return socketChannel.connect(targetAddress);
348                             }
349                         });
350         } catch (final PrivilegedActionException e) {
351             Asserts.check(e.getCause() instanceof  IOException,
352                     "method contract violation only checked exceptions are wrapped: " + e.getCause());
353             // only checked exceptions are wrapped - error and RTExceptions are rethrown by doPrivileged
354             throw (IOException) e.getCause();
355         }
356 
357 
358         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
359         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest, new InternalDataChannelFactory() {
360 
361             @Override
362             public InternalDataChannel create(
363                     final SelectionKey key,
364                     final SocketChannel socketChannel,
365                     final NamedEndpoint namedEndpoint,
366                     final Object attachment) {
367                 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
368                 final InternalDataChannelnnel.html#InternalDataChannel">InternalDataChannel dataChannel = new InternalDataChannel(
369                         ioSession,
370                         namedEndpoint,
371                         ioSessionDecorator,
372                         sessionListener,
373                         closedSessions);
374                 dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
375                 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
376                 return dataChannel;
377             }
378 
379         });
380         if (connected) {
381             channel.handleIOEvent(SelectionKey.OP_CONNECT);
382         } else {
383             key.attach(channel);
384             sessionRequest.assign(channel);
385         }
386     }
387 
388     private void closePendingChannels() {
389         SocketChannel socketChannel;
390         while ((socketChannel = this.channelQueue.poll()) != null) {
391             try {
392                 socketChannel.close();
393             } catch (final IOException ex) {
394                 logException(ex);
395             }
396         }
397     }
398 
399     private void closePendingConnectionRequests() {
400         IOSessionRequest sessionRequest;
401         while ((sessionRequest = this.requestQueue.poll()) != null) {
402             sessionRequest.cancel();
403         }
404     }
405 
406 }