1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.reactor;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.net.SocketAddress;
34 import java.net.UnknownHostException;
35 import java.nio.channels.CancelledKeyException;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.SocketChannel;
39 import java.util.Queue;
40 import java.util.Set;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.atomic.AtomicBoolean;
44
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.function.Callback;
47 import org.apache.hc.core5.function.Decorator;
48 import org.apache.hc.core5.io.CloseMode;
49 import org.apache.hc.core5.io.Closer;
50 import org.apache.hc.core5.net.NamedEndpoint;
51 import org.apache.hc.core5.util.Args;
52 import org.apache.hc.core5.util.Timeout;
53
54 class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator {
55
56 private static final int MAX_CHANNEL_REQUESTS = 10000;
57
58 private final IOEventHandlerFactory eventHandlerFactory;
59 private final IOReactorConfig reactorConfig;
60 private final Decorator<IOSession> ioSessionDecorator;
61 private final IOSessionListener sessionListener;
62 private final Callback<IOSession> sessionShutdownCallback;
63 private final Queue<InternalDataChannel> closedSessions;
64 private final Queue<ChannelEntry> channelQueue;
65 private final Queue<IOSessionRequest> requestQueue;
66 private final AtomicBoolean shutdownInitiated;
67 private final long selectTimeoutMillis;
68 private volatile long lastTimeoutCheckMillis;
69
70 SingleCoreIOReactor(
71 final Callback<Exception> exceptionCallback,
72 final IOEventHandlerFactory eventHandlerFactory,
73 final IOReactorConfig reactorConfig,
74 final Decorator<IOSession> ioSessionDecorator,
75 final IOSessionListener sessionListener,
76 final Callback<IOSession> sessionShutdownCallback) {
77 super(exceptionCallback);
78 this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
79 this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
80 this.ioSessionDecorator = ioSessionDecorator;
81 this.sessionListener = sessionListener;
82 this.sessionShutdownCallback = sessionShutdownCallback;
83 this.shutdownInitiated = new AtomicBoolean(false);
84 this.closedSessions = new ConcurrentLinkedQueue<>();
85 this.channelQueue = new ConcurrentLinkedQueue<>();
86 this.requestQueue = new ConcurrentLinkedQueue<>();
87 this.selectTimeoutMillis = this.reactorConfig.getSelectInterval().toMilliseconds();
88 }
89
90 void enqueueChannel(final ChannelEntry entry) throws IOReactorShutdownException {
91 if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
92 throw new IOReactorShutdownException("I/O reactor has been shut down");
93 }
94 this.channelQueue.add(entry);
95 this.selector.wakeup();
96 }
97
98 @Override
99 void doTerminate() {
100 closePendingChannels();
101 closePendingConnectionRequests();
102 processClosedSessions();
103 }
104
105 @Override
106 void doExecute() throws IOException {
107 while (!Thread.currentThread().isInterrupted()) {
108
109 final int readyCount = this.selector.select(this.selectTimeoutMillis);
110
111 if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
112 if (this.shutdownInitiated.compareAndSet(false, true)) {
113 initiateSessionShutdown();
114 }
115 closePendingChannels();
116 }
117 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
118 break;
119 }
120
121
122 if (readyCount > 0) {
123 processEvents(this.selector.selectedKeys());
124 }
125
126 validateActiveChannels();
127
128
129 processClosedSessions();
130
131
132 if (getStatus() == IOReactorStatus.ACTIVE) {
133 processPendingChannels();
134 processPendingConnectionRequests();
135 }
136
137
138 if (getStatus() == IOReactorStatus.SHUTTING_DOWN && this.selector.keys().isEmpty()) {
139 break;
140 }
141 if (getStatus() == IOReactorStatus.SHUT_DOWN) {
142 break;
143 }
144 }
145 }
146
147 private void initiateSessionShutdown() {
148 if (this.sessionShutdownCallback != null) {
149 final Set<SelectionKey> keys = this.selector.keys();
150 for (final SelectionKey key : keys) {
151 final InternalChannel channel = (InternalChannel) key.attachment();
152 if (channel instanceof InternalDataChannel) {
153 this.sessionShutdownCallback.execute((InternalDataChannel) channel);
154 }
155 }
156 }
157 }
158
159 private void validateActiveChannels() {
160 final long currentTimeMillis = System.currentTimeMillis();
161 if ((currentTimeMillis - this.lastTimeoutCheckMillis) >= this.selectTimeoutMillis) {
162 this.lastTimeoutCheckMillis = currentTimeMillis;
163 for (final SelectionKey key : this.selector.keys()) {
164 checkTimeout(key, currentTimeMillis);
165 }
166 }
167 }
168
169 private void processEvents(final Set<SelectionKey> selectedKeys) {
170 for (final SelectionKey key : selectedKeys) {
171 final InternalChannel channel = (InternalChannel) key.attachment();
172 if (channel != null) {
173 try {
174 channel.handleIOEvent(key.readyOps());
175 } catch (final CancelledKeyException ex) {
176 channel.close(CloseMode.GRACEFUL);
177 }
178 }
179 }
180 selectedKeys.clear();
181 }
182
183 private void processPendingChannels() throws IOException {
184 ChannelEntry entry;
185 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (entry = this.channelQueue.poll()) != null; i++) {
186 final SocketChannel socketChannel = entry.channel;
187 final Object attachment = entry.attachment;
188 try {
189 prepareSocket(socketChannel.socket());
190 socketChannel.configureBlocking(false);
191 } catch (final IOException ex) {
192 logException(ex);
193 try {
194 socketChannel.close();
195 } catch (final IOException ex2) {
196 logException(ex2);
197 }
198 throw ex;
199 }
200 final SelectionKey key;
201 try {
202 key = socketChannel.register(this.selector, SelectionKey.OP_READ);
203 } catch (final ClosedChannelException ex) {
204 return;
205 }
206 final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
207 final InternalDataChannel dataChannel = new InternalDataChannel(
208 ioSession,
209 null,
210 ioSessionDecorator,
211 sessionListener,
212 closedSessions);
213 dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
214 dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
215 key.attach(dataChannel);
216 dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
217 }
218 }
219
220 private void processClosedSessions() {
221 for (;;) {
222 final InternalDataChannel dataChannel = this.closedSessions.poll();
223 if (dataChannel == null) {
224 break;
225 }
226 try {
227 dataChannel.disconnected();
228 } catch (final CancelledKeyException ex) {
229
230 }
231 }
232 }
233
234 private void checkTimeout(final SelectionKey key, final long nowMillis) {
235 final InternalChannel channel = (InternalChannel) key.attachment();
236 if (channel != null) {
237 channel.checkTimeout(nowMillis);
238 }
239 }
240
241 @Override
242 public Future<IOSession> connect(
243 final NamedEndpoint remoteEndpoint,
244 final SocketAddress remoteAddress,
245 final SocketAddress localAddress,
246 final Timeout timeout,
247 final Object attachment,
248 final FutureCallback<IOSession> callback) throws IOReactorShutdownException {
249 Args.notNull(remoteEndpoint, "Remote endpoint");
250 final IOSessionRequest sessionRequest = new IOSessionRequest(
251 remoteEndpoint,
252 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(), remoteEndpoint.getPort()),
253 localAddress,
254 timeout,
255 attachment,
256 callback);
257
258 this.requestQueue.add(sessionRequest);
259 this.selector.wakeup();
260
261 return sessionRequest;
262 }
263
264 private void prepareSocket(final Socket socket) throws IOException {
265 socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
266 socket.setKeepAlive(this.reactorConfig.isSoKeepAlive());
267 if (this.reactorConfig.getSndBufSize() > 0) {
268 socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
269 }
270 if (this.reactorConfig.getRcvBufSize() > 0) {
271 socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
272 }
273 if (this.reactorConfig.getTrafficClass() > 0) {
274 socket.setTrafficClass(this.reactorConfig.getTrafficClass());
275 }
276 final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
277 if (linger >= 0) {
278 socket.setSoLinger(true, linger);
279 }
280 }
281
282 private void validateAddress(final SocketAddress address) throws UnknownHostException {
283 if (address instanceof InetSocketAddress) {
284 final InetSocketAddress endpoint = (InetSocketAddress) address;
285 if (endpoint.isUnresolved()) {
286 throw new UnknownHostException(endpoint.getHostName());
287 }
288 }
289 }
290
291 private void processPendingConnectionRequests() {
292 IOSessionRequest sessionRequest;
293 for (int i = 0; i < MAX_CHANNEL_REQUESTS && (sessionRequest = this.requestQueue.poll()) != null; i++) {
294 if (!sessionRequest.isCancelled()) {
295 final SocketChannel socketChannel;
296 try {
297 socketChannel = SocketChannel.open();
298 } catch (final IOException ex) {
299 sessionRequest.failed(ex);
300 return;
301 }
302 try {
303 processConnectionRequest(socketChannel, sessionRequest);
304 } catch (final IOException | RuntimeException ex) {
305 Closer.closeQuietly(socketChannel);
306 sessionRequest.failed(ex);
307 }
308 }
309 }
310 }
311
312 private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest sessionRequest) throws IOException {
313 socketChannel.configureBlocking(false);
314 prepareSocket(socketChannel.socket());
315
316 validateAddress(sessionRequest.localAddress);
317 if (sessionRequest.localAddress != null) {
318 final Socket sock = socketChannel.socket();
319 sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
320 sock.bind(sessionRequest.localAddress);
321 }
322
323 final SocketAddress socksProxyAddress = reactorConfig.getSocksProxyAddress();
324 final SocketAddress remoteAddress = socksProxyAddress != null ? socksProxyAddress : sessionRequest.remoteAddress;
325
326
327
328 validateAddress(remoteAddress);
329 final boolean connected = socketChannel.connect(remoteAddress);
330 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
331 final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
332 final InternalDataChannel dataChannel = new InternalDataChannel(
333 ioSession,
334 sessionRequest.remoteEndpoint,
335 ioSessionDecorator,
336 sessionListener,
337 closedSessions);
338 dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
339 final InternalChannel connectChannel = new InternalConnectChannel(
340 key,
341 socketChannel,
342 sessionRequest,
343 dataChannel,
344 eventHandlerFactory,
345 reactorConfig);
346 if (connected) {
347 connectChannel.handleIOEvent(SelectionKey.OP_CONNECT);
348 } else {
349 key.attach(connectChannel);
350 sessionRequest.assign(connectChannel);
351 }
352 }
353
354 private void closePendingChannels() {
355 ChannelEntry entry;
356 while ((entry = this.channelQueue.poll()) != null) {
357 final SocketChannel socketChannel = entry.channel;
358 try {
359 socketChannel.close();
360 } catch (final IOException ex) {
361 logException(ex);
362 }
363 }
364 }
365
366 private void closePendingConnectionRequests() {
367 IOSessionRequest sessionRequest;
368 while ((sessionRequest = this.requestQueue.poll()) != null) {
369 sessionRequest.cancel();
370 }
371 }
372
373 }