1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30 import java.util.concurrent.CancellationException;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.http.nio.NHttpClientEventHandler;
37 import org.apache.http.nio.conn.NHttpClientConnectionManager;
38 import org.apache.http.nio.reactor.IOEventDispatch;
39
40 abstract class CloseableHttpAsyncClientBase extends CloseableHttpPipeliningClient {
41
42 private final Log log = LogFactory.getLog(getClass());
43
44 static enum Status {INACTIVE, ACTIVE, STOPPED}
45
46 private final NHttpClientConnectionManager connmgr;
47 private final Thread reactorThread;
48
49 private final AtomicReference<Status> status;
50
51 public CloseableHttpAsyncClientBase(
52 final NHttpClientConnectionManager connmgr,
53 final ThreadFactory threadFactory,
54 final NHttpClientEventHandler handler) {
55 super();
56 this.connmgr = connmgr;
57 if (threadFactory != null && handler != null) {
58 this.reactorThread = threadFactory.newThread(new Runnable() {
59
60 @Override
61 public void run() {
62 try {
63 final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
64 connmgr.execute(ioEventDispatch);
65 } catch (final Exception ex) {
66 log.error("I/O reactor terminated abnormally", ex);
67 } finally {
68 status.set(Status.STOPPED);
69 }
70 }
71
72 });
73 } else {
74 this.reactorThread = null;
75 }
76 this.status = new AtomicReference<Status>(Status.INACTIVE);
77 }
78
79 @Override
80 public void start() {
81 if (this.status.compareAndSet(Status.INACTIVE, Status.ACTIVE)) {
82 if (this.reactorThread != null) {
83 this.reactorThread.start();
84 }
85 }
86 }
87
88 @Override
89 public void close() {
90 if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPED)) {
91 if (this.reactorThread != null) {
92 try {
93 this.connmgr.shutdown();
94 } catch (final IOException ex) {
95 this.log.error("I/O error shutting down connection manager", ex);
96 }
97 try {
98 this.reactorThread.join();
99 } catch (final InterruptedException ex) {
100 Thread.currentThread().interrupt();
101 }
102 }
103 }
104 }
105
106 @Override
107 public boolean isRunning() {
108 return this.status.get() == Status.ACTIVE;
109 }
110
111 final void execute(final AbstractClientExchangeHandler handler) {
112 try {
113 if (!isRunning()) {
114 throw new CancellationException("Request execution cancelled");
115 }
116 handler.start();
117 } catch (final Exception ex) {
118 handler.failed(ex);
119 }
120 }
121
122 }