View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.reactor;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.net.Socket;
33  import java.net.SocketAddress;
34  import java.net.UnknownHostException;
35  import java.nio.channels.CancelledKeyException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.SocketChannel;
39  import java.util.Queue;
40  import java.util.Set;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Callback;
47  import org.apache.hc.core5.function.Decorator;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.io.Closer;
50  import org.apache.hc.core5.net.NamedEndpoint;
51  import org.apache.hc.core5.util.Args;
52  import org.apache.hc.core5.util.Timeout;
53  
54  class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
55  
56      private static final int MAX_CHANNEL_REQUESTS = 10000;
57  
58      private final IOEventHandlerFactory eventHandlerFactory;
59      private final IOReactorConfig reactorConfig;
60      private final Decorator<IOSession> ioSessionDecorator;
61      private final IOSessionListener sessionListener;
62      private final Callback<IOSession> sessionShutdownCallback;
63      private final Queue<InternalDataChannel> closedSessions;
64      private final Queue<ChannelEntry> channelQueue;
65      private final Queue<IOSessionRequest> requestQueue;
66      private final AtomicBoolean shutdownInitiated;
67      private final long selectTimeoutMillis;
68      private volatile long lastTimeoutCheckMillis;
69  
70      SingleCoreIOReactor(
71              final Callback<Exception> exceptionCallback,
72              final IOEventHandlerFactory eventHandlerFactory,
73              final IOReactorConfig reactorConfig,
74              final Decorator<IOSession> ioSessionDecorator,
75              final IOSessionListener sessionListener,
76              final Callback<IOSession> sessionShutdownCallback) {
77          super(exceptionCallback);
78          this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
79          this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
80          this.ioSessionDecorator = ioSessionDecorator;
81          this.sessionListener = sessionListener;
82          this.sessionShutdownCallback = sessionShutdownCallback;
83          this.shutdownInitiated = new AtomicBoolean(false);
84          this.closedSessions = new ConcurrentLinkedQueue<>();
85          this.channelQueue = new ConcurrentLinkedQueue<>();
86          this.requestQueue = new ConcurrentLinkedQueue<>();
87          this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
88      }
89  
90      void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
91          if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
92              throw new IOReactorShutdownException("I/O reactor has been shut down");
93          }
94          this.channelQueue.add(entry);
95          this.selector.wakeup();
96      }
97  
98      @Override
99      void doTerminate() {
100         closePendingChannels();
101         closePendingConnectionRequests();
102         processClosedSessions();
103     }
104 
105     @Override
106     void doExecute() throws IOException {
107         while (!Thread.currentThread().isInterrupted()) {
108 
109             final int readyCount = this.selector.select(this.selectTimeoutMillis);
110 
111             if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
112                 if (this.shutdownInitiated.compareAndSet(false, true)) {
113                     initiateSessionShutdown();
114                 }
115                 closePendingChannels();
116             }
117             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
118                 break;
119             }
120 
121             // Process selected I/O events
122             if (readyCount > 0) {
123                 processEvents(this.selector.selectedKeys());
124             }
125 
126             validateActiveChannels();
127 
128             // Process closed sessions
129             processClosedSessions();
130 
131             // If active process new channels
132             if (getStatus() == IOReactorStatus.ACTIVE) {
133                 processPendingChannels();
134                 processPendingConnectionRequests();
135             }
136 
137             // Exit select loop if graceful shutdown has been completed
138             if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
139                 break;
140             }
141             if (getStatus() == IOReactorStatus.SHUT_DOWN) {
142                 break;
143             }
144         }
145     }
146 
147     private void initiateSessionShutdown() {
148         if (this.sessionShutdownCallback != null) {
149             final Set<SelectionKey> keys = this.selector.keys();
150             for (final SelectionKey key : keys) {
151                 final InternalChannel channel = (InternalChannel) key.attachment();
152                 if (channel instanceof InternalDataChannel) {
153                     this.sessionShutdownCallback.execute((InternalDataChannel) channel);
154                 }
155             }
156         }
157     }
158 
159     private void validateActiveChannels() {
160         final long currentTimeMillis = System.currentTimeMillis();
161         if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
162             this.lastTimeoutCheckMillis = currentTimeMillis;
163             for (final SelectionKey key : this.selector.keys()) {
164                 checkTimeout(key, currentTimeMillis);
165             }
166         }
167     }
168 
169     private void processEvents(final Set<SelectionKey> selectedKeys) {
170         for (final SelectionKey key : selectedKeys) {
171             final InternalChannel channel = (InternalChannel) key.attachment();
172             if (channel != null) {
173                 try {
174                     channel.handleIOEvent(key.readyOps());
175                 } catch (final CancelledKeyException ex) {
176                     channel.close(CloseMode.GRACEFUL);
177                 }
178             }
179         }
180         selectedKeys.clear();
181     }
182 
183     private void processPendingChannels() throws IOException {
184         ChannelEntry entry;
185         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
186             final SocketChannel socketChannel = entry.channel;
187             final Object attachment = entry.attachment;
188             try {
189                 prepareSocket(socketChannel.socket());
190                 socketChannel.configureBlocking(false);
191             } catch (final IOException ex) {
192                 logException(ex);
193                 try {
194                     socketChannel.close();
195                 } catch (final IOException ex2) {
196                     logException(ex2);
197                 }
198                 throw ex;
199             }
200             final SelectionKey key;
201             try {
202                 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
203             } catch (final ClosedChannelException ex) {
204                 return;
205             }
206             final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
207             final InternalDataChannel dataChannel = new InternalDataChannel(
208                     ioSession,
209                     null,
210                     ioSessionDecorator,
211                     sessionListener,
212                     closedSessions);
213             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
214             dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
215             key.attach(dataChannel);
216             dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
217         }
218     }
219 
220     private void processClosedSessions() {
221         for (;;) {
222             final InternalDataChannel dataChannel = this.closedSessions.poll();
223             if (dataChannel == null) {
224                 break;
225             }
226             try {
227                 dataChannel.disconnected();
228             } catch (final CancelledKeyException ex) {
229                 // ignore and move on
230             }
231         }
232     }
233 
234     private void checkTimeout(final SelectionKey key, final long nowMillis) {
235         final InternalChannel channel = (InternalChannel) key.attachment();
236         if (channel != null) {
237             channel.checkTimeout(nowMillis);
238         }
239     }
240 
241     @Override
242     public Future<IOSession> connect(
243             final NamedEndpoint remoteEndpoint,
244             final SocketAddress remoteAddress,
245             final SocketAddress localAddress,
246             final Timeout timeout,
247             final Object attachment,
248             final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
249         Args.notNull(remoteEndpoint, "Remote endpoint");
250         final IOSessionRequest sessionRequest = new IOSessionRequest(
251                 remoteEndpoint,
252                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
253                 localAddress,
254                 timeout,
255                 attachment,
256                 callback);
257 
258         this.requestQueue.add(sessionRequest);
259         this.selector.wakeup();
260 
261         return sessionRequest;
262     }
263 
264     private void prepareSocket(final Socket socket) throws IOException {
265         socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
266         socket.setKeepAlive(this.reactorConfig.isSoKeepAlive());
267         if (this.reactorConfig.getSndBufSize() > 0) {
268             socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
269         }
270         if (this.reactorConfig.getRcvBufSize() > 0) {
271             socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
272         }
273         if (this.reactorConfig.getTrafficClass() > 0) {
274             socket.setTrafficClass(this.reactorConfig.getTrafficClass());
275         }
276         final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
277         if (linger >= 0) {
278             socket.setSoLinger(true, linger);
279         }
280     }
281 
282     private void validateAddress(final SocketAddress address) throws UnknownHostException {
283         if (address instanceof InetSocketAddress) {
284             final InetSocketAddress endpoint = (InetSocketAddress) address;
285             if (endpoint.isUnresolved()) {
286                 throw new UnknownHostException(endpoint.getHostName());
287             }
288         }
289     }
290 
291     private void processPendingConnectionRequests() {
292         IOSessionRequest sessionRequest;
293         for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
294             if (!sessionRequest.isCancelled()) {
295                 final SocketChannel socketChannel;
296                 try {
297                     socketChannel = SocketChannel.open();
298                 } catch (final IOException ex) {
299                     sessionRequest.failed(ex);
300                     return;
301                 }
302                 try {
303                     processConnectionRequest(socketChannel, sessionRequest);
304                 } catch (final IOException | RuntimeException ex) {
305                     Closer.closeQuietly(socketChannel);
306                     sessionRequest.failed(ex);
307                 }
308             }
309         }
310     }
311 
312     private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
313         socketChannel.configureBlocking(false);
314         prepareSocket(socketChannel.socket());
315 
316         validateAddress(sessionRequest.localAddress);
317         if (sessionRequest.localAddress != null) {
318             final Socket sock = socketChannel.socket();
319             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
320             sock.bind(sessionRequest.localAddress);
321         }
322 
323         final SocketAddress socksProxyAddress = reactorConfig.getSocksProxyAddress();
324         final SocketAddress remoteAddress = socksProxyAddress != null ? socksProxyAddress : sessionRequest.remoteAddress;
325 
326         // Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect permissions
327         // only to this library
328         validateAddress(remoteAddress);
329         final boolean connected = socketChannel.connect(remoteAddress);
330         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
331         final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
332         final InternalDataChannel dataChannel = new InternalDataChannel(
333                 ioSession,
334                 sessionRequest.remoteEndpoint,
335                 ioSessionDecorator,
336                 sessionListener,
337                 closedSessions);
338         dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
339         final InternalChannel connectChannel = new InternalConnectChannel(
340                 key,
341                 socketChannel,
342                 sessionRequest,
343                 dataChannel,
344                 eventHandlerFactory,
345                 reactorConfig);
346         if (connected) {
347             connectChannel.handleIOEvent(SelectionKey.OP_CONNECT);
348         } else {
349             key.attach(connectChannel);
350             sessionRequest.assign(connectChannel);
351         }
352     }
353 
354     private void closePendingChannels() {
355         ChannelEntry entry;
356         while ((entry = this.channelQueue.poll()) != null) {
357             final SocketChannel socketChannel = entry.channel;
358             try {
359                 socketChannel.close();
360             } catch (final IOException ex) {
361                 logException(ex);
362             }
363         }
364     }
365 
366     private void closePendingConnectionRequests() {
367         IOSessionRequest sessionRequest;
368         while ((sessionRequest = this.requestQueue.poll()) != null) {
369             sessionRequest.cancel();
370         }
371     }
372 
373 }