1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio.reactor;
29
30 import java.io.IOException;
31 import java.io.InterruptedIOException;
32 import java.nio.channels.CancelledKeyException;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.ClosedSelectorException;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.Selector;
37 import java.nio.channels.SocketChannel;
38 import java.util.Collections;
39 import java.util.HashSet;
40 import java.util.Queue;
41 import java.util.Set;
42 import java.util.concurrent.ConcurrentLinkedQueue;
43
44 import org.apache.http.nio.reactor.IOReactor;
45 import org.apache.http.nio.reactor.IOReactorException;
46 import org.apache.http.nio.reactor.IOReactorStatus;
47 import org.apache.http.nio.reactor.IOSession;
48 import org.apache.http.util.Args;
49 import org.apache.http.util.Asserts;
50
51
52
53
54
55
56
57
58 public abstract class AbstractIOReactor implements IOReactor {
59
60 private volatile IOReactorStatus status;
61
62 private final Object statusMutex;
63 private final long selectTimeout;
64 private final boolean interestOpsQueueing;
65 private final Selector selector;
66 private final Set<IOSession> sessions;
67 private final Queue<InterestOpEntry> interestOpsQueue;
68 private final Queue<IOSession> closedSessions;
69 private final Queue<ChannelEntry> newChannels;
70
71
72
73
74
75
76
77 public AbstractIOReactor(final long selectTimeout) throws IOReactorException {
78 this(selectTimeout, false);
79 }
80
81
82
83
84
85
86
87
88
89
90
91 public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException {
92 super();
93 Args.positive(selectTimeout, "Select timeout");
94 this.selectTimeout = selectTimeout;
95 this.interestOpsQueueing = interestOpsQueueing;
96 this.sessions = Collections.synchronizedSet(new HashSet<IOSession>());
97 this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>();
98 this.closedSessions = new ConcurrentLinkedQueue<IOSession>();
99 this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>();
100 try {
101 this.selector = Selector.open();
102 } catch (final IOException ex) {
103 throw new IOReactorException("Failure opening selector", ex);
104 }
105 this.statusMutex = new Object();
106 this.status = IOReactorStatus.INACTIVE;
107 }
108
109
110
111
112
113
114
115
116 protected abstract void acceptable(SelectionKey key);
117
118
119
120
121
122
123
124
125 protected abstract void connectable(SelectionKey key);
126
127
128
129
130
131
132
133
134 protected abstract void readable(SelectionKey key);
135
136
137
138
139
140
141
142
143 protected abstract void writable(SelectionKey key);
144
145
146
147
148
149
150
151
152
153
154
155 protected abstract void validate(Set<SelectionKey> keys);
156
157
158
159
160
161
162
163
164
165 protected void sessionCreated(final SelectionKey key, final IOSession session) {
166 }
167
168
169
170
171
172
173
174
175 protected void sessionClosed(final IOSession session) {
176 }
177
178
179
180
181
182
183
184
185 protected void sessionTimedOut(final IOSession session) {
186 }
187
188
189
190
191
192
193
194
195 protected IOSession getSession(final SelectionKey key) {
196 return (IOSession) key.attachment();
197 }
198
199 @Override
200 public IOReactorStatus getStatus() {
201 return this.status;
202 }
203
204
205
206
207
208
209 public boolean getInterestOpsQueueing() {
210 return this.interestOpsQueueing;
211 }
212
213
214
215
216
217
218
219 public void addChannel(final ChannelEntry channelEntry) {
220 Args.notNull(channelEntry, "Channel entry");
221 this.newChannels.add(channelEntry);
222 this.selector.wakeup();
223 }
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 protected void execute() throws InterruptedIOException, IOReactorException {
248 this.status = IOReactorStatus.ACTIVE;
249
250 try {
251 for (;;) {
252
253 final int readyCount;
254 try {
255 readyCount = this.selector.select(this.selectTimeout);
256 } catch (final InterruptedIOException ex) {
257 throw ex;
258 } catch (final IOException ex) {
259 throw new IOReactorException("Unexpected selector failure", ex);
260 }
261
262 if (this.status == IOReactorStatus.SHUT_DOWN) {
263
264 break;
265 }
266
267 if (this.status == IOReactorStatus.SHUTTING_DOWN) {
268
269
270 closeSessions();
271 closeNewChannels();
272 }
273
274
275 if (readyCount > 0) {
276 processEvents(this.selector.selectedKeys());
277 }
278
279
280 validate(this.selector.keys());
281
282
283 processClosedSessions();
284
285
286 if (this.status == IOReactorStatus.ACTIVE) {
287 processNewChannels();
288 }
289
290
291 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0
292 && this.sessions.isEmpty()) {
293 break;
294 }
295
296 if (this.interestOpsQueueing) {
297
298 processPendingInterestOps();
299 }
300
301 }
302
303 } catch (final ClosedSelectorException ignore) {
304 } finally {
305 hardShutdown();
306 synchronized (this.statusMutex) {
307 this.statusMutex.notifyAll();
308 }
309 }
310 }
311
312 private void processEvents(final Set<SelectionKey> selectedKeys) {
313 for (final SelectionKey key : selectedKeys) {
314
315 processEvent(key);
316
317 }
318 selectedKeys.clear();
319 }
320
321
322
323
324
325
326 protected void processEvent(final SelectionKey key) {
327 final IOSessionImpl/../../../org/apache/http/impl/nio/reactor/IOSessionImpl.html#IOSessionImpl">IOSessionImpl session = (IOSessionImpl) key.attachment();
328 try {
329 if (key.isAcceptable()) {
330 acceptable(key);
331 }
332 if (key.isConnectable()) {
333 connectable(key);
334 }
335 if (key.isReadable()) {
336 session.resetLastRead();
337 readable(key);
338 }
339 if (key.isWritable()) {
340 session.resetLastWrite();
341 writable(key);
342 }
343 } catch (final CancelledKeyException ex) {
344 queueClosedSession(session);
345 key.attach(null);
346 }
347 }
348
349
350
351
352
353
354 protected void queueClosedSession(final IOSession session) {
355 if (session != null) {
356 this.closedSessions.add(session);
357 }
358 }
359
360 private void processNewChannels() throws IOReactorException {
361 ChannelEntry entry;
362 while ((entry = this.newChannels.poll()) != null) {
363
364 final SocketChannel channel;
365 final SelectionKey key;
366 try {
367 channel = entry.getChannel();
368 channel.configureBlocking(false);
369 key = channel.register(this.selector, SelectionKey.OP_READ);
370 } catch (final ClosedChannelException ex) {
371 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
372 if (sessionRequest != null) {
373 sessionRequest.failed(ex);
374 }
375 return;
376
377 } catch (final IOException ex) {
378 throw new IOReactorException("Failure registering channel " +
379 "with the selector", ex);
380 }
381
382 final SessionClosedCallbackck.html#SessionClosedCallback">SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
383
384 @Override
385 public void sessionClosed(final IOSession session) {
386 queueClosedSession(session);
387 }
388
389 };
390
391 InterestOpsCallback interestOpsCallback = null;
392 if (this.interestOpsQueueing) {
393 interestOpsCallback = new InterestOpsCallback() {
394
395 @Override
396 public void addInterestOps(final InterestOpEntry entry) {
397 queueInterestOps(entry);
398 }
399
400 };
401 }
402
403 final IOSession session;
404 try {
405 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
406 int timeout = 0;
407 try {
408 timeout = channel.socket().getSoTimeout();
409 } catch (final IOException ex) {
410
411
412
413 }
414
415 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
416 session.setSocketTimeout(timeout);
417 } catch (final CancelledKeyException ex) {
418 continue;
419 }
420 try {
421 this.sessions.add(session);
422 key.attach(session);
423 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
424 if (sessionRequest != null) {
425 if (!sessionRequest.isTerminated()) {
426 sessionRequest.completed(session);
427 }
428 if (!sessionRequest.isTerminated() && !session.isClosed()) {
429 sessionCreated(key, session);
430 }
431 if (sessionRequest.isTerminated()) {
432 throw new CancelledKeyException();
433 }
434 } else {
435 sessionCreated(key, session);
436 }
437 } catch (final CancelledKeyException ex) {
438 session.close();
439 key.attach(null);
440 }
441 }
442 }
443
444 private void processClosedSessions() {
445 IOSession session;
446 while ((session = this.closedSessions.poll()) != null) {
447 if (this.sessions.remove(session)) {
448 try {
449 sessionClosed(session);
450 } catch (final CancelledKeyException ex) {
451
452 }
453 }
454 }
455 }
456
457 private void processPendingInterestOps() {
458
459 if (!this.interestOpsQueueing) {
460 return;
461 }
462 InterestOpEntry entry;
463 while ((entry = this.interestOpsQueue.poll()) != null) {
464
465 final SelectionKey key = entry.getSelectionKey();
466 final int eventMask = entry.getEventMask();
467 if (key.isValid()) {
468 key.interestOps(eventMask);
469 }
470 }
471 }
472
473 private boolean queueInterestOps(final InterestOpEntry entry) {
474
475 Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled");
476 if (entry == null) {
477 return false;
478 }
479
480
481 this.interestOpsQueue.add(entry);
482
483 return true;
484 }
485
486
487
488
489
490
491
492
493
494
495 protected void timeoutCheck(final SelectionKey key, final long now) {
496 final IOSessionImpl/../../../org/apache/http/impl/nio/reactor/IOSessionImpl.html#IOSessionImpl">IOSessionImpl session = (IOSessionImpl) key.attachment();
497 if (session != null) {
498 final int timeout = session.getSocketTimeout();
499 if (timeout > 0) {
500 if (session.getLastAccessTime() + timeout < now) {
501 try {
502 sessionTimedOut(session);
503 } catch (final CancelledKeyException ex) {
504 session.close();
505 key.attach(null);
506 }
507 }
508 }
509 }
510 }
511
512
513
514
515 protected void closeSessions() {
516 synchronized (this.sessions) {
517 for (final IOSession session : this.sessions) {
518 session.close();
519 }
520 }
521 }
522
523
524
525
526
527
528 protected void closeNewChannels() throws IOReactorException {
529 ChannelEntry entry;
530 while ((entry = this.newChannels.poll()) != null) {
531 final SessionRequestImpl sessionRequest = entry.getSessionRequest();
532 if (sessionRequest != null) {
533 sessionRequest.cancel();
534 }
535 final SocketChannel channel = entry.getChannel();
536 try {
537 channel.close();
538 } catch (final IOException ignore) {
539 }
540 }
541 }
542
543
544
545
546
547
548 protected void closeActiveChannels() throws IOReactorException {
549 try {
550 final Set<SelectionKey> keys = this.selector.keys();
551 for (final SelectionKey key : keys) {
552 final IOSession session = getSession(key);
553 if (session != null) {
554 session.close();
555 }
556 }
557 this.selector.close();
558 } catch (final IOException ignore) {
559 }
560 }
561
562
563
564
565 public void gracefulShutdown() {
566 synchronized (this.statusMutex) {
567 if (this.status != IOReactorStatus.ACTIVE) {
568
569 return;
570 }
571 this.status = IOReactorStatus.SHUTTING_DOWN;
572 }
573 this.selector.wakeup();
574 }
575
576
577
578
579 public void hardShutdown() throws IOReactorException {
580 synchronized (this.statusMutex) {
581 if (this.status == IOReactorStatus.SHUT_DOWN) {
582
583 return;
584 }
585 this.status = IOReactorStatus.SHUT_DOWN;
586 }
587
588 closeNewChannels();
589 closeActiveChannels();
590 processClosedSessions();
591 }
592
593
594
595
596
597
598
599
600 public void awaitShutdown(final long timeout) throws InterruptedException {
601 synchronized (this.statusMutex) {
602 final long deadline = System.currentTimeMillis() + timeout;
603 long remaining = timeout;
604 while (this.status != IOReactorStatus.SHUT_DOWN) {
605 this.statusMutex.wait(remaining);
606 if (timeout > 0) {
607 remaining = deadline - System.currentTimeMillis();
608 if (remaining <= 0) {
609 break;
610 }
611 }
612 }
613 }
614 }
615
616 @Override
617 public void shutdown(final long gracePeriod) throws IOReactorException {
618 if (this.status != IOReactorStatus.INACTIVE) {
619 gracefulShutdown();
620 try {
621 awaitShutdown(gracePeriod);
622 } catch (final InterruptedException ignore) {
623 }
624 }
625 if (this.status != IOReactorStatus.SHUT_DOWN) {
626 hardShutdown();
627 }
628 }
629
630 @Override
631 public void shutdown() throws IOReactorException {
632 shutdown(1000);
633 }
634
635 }