1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.client;
28
29 import java.io.IOException;
30 import java.util.concurrent.ThreadFactory;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.http.nio.NHttpClientEventHandler;
36 import org.apache.http.nio.conn.NHttpClientConnectionManager;
37 import org.apache.http.nio.reactor.IOEventDispatch;
38 import org.apache.http.util.Asserts;
39
40 abstract class CloseableHttpAsyncClientBase extends CloseableHttpPipeliningClient {
41
42 private final Log log = LogFactory.getLog(getClass());
43
44 static enum Status {INACTIVE, ACTIVE, STOPPED}
45
46 private final NHttpClientConnectionManager connmgr;
47 private final Thread reactorThread;
48
49 private final AtomicReference<Status> status;
50
51 public CloseableHttpAsyncClientBase(
52 final NHttpClientConnectionManager connmgr,
53 final ThreadFactory threadFactory,
54 final NHttpClientEventHandler handler) {
55 super();
56 this.connmgr = connmgr;
57 if (threadFactory != null && handler != null) {
58 this.reactorThread = threadFactory.newThread(new Runnable() {
59
60 @Override
61 public void run() {
62 try {
63 final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
64 connmgr.execute(ioEventDispatch);
65 } catch (final Exception ex) {
66 log.error("I/O reactor terminated abnormally", ex);
67 } finally {
68 status.set(Status.STOPPED);
69 }
70 }
71
72 });
73 } else {
74 this.reactorThread = null;
75 }
76 this.status = new AtomicReference<Status>(Status.INACTIVE);
77 }
78
79 @Override
80 public void start() {
81 if (this.status.compareAndSet(Status.INACTIVE, Status.ACTIVE)) {
82 if (this.reactorThread != null) {
83 this.reactorThread.start();
84 }
85 }
86 }
87
88 protected void ensureRunning() {
89 final Status currentStatus = this.status.get();
90 Asserts.check(currentStatus == Status.ACTIVE, "Request cannot be executed; " +
91 "I/O reactor status: %s", currentStatus);
92 }
93
94 @Override
95 public void close() {
96 if (this.status.compareAndSet(Status.ACTIVE, Status.STOPPED)) {
97 if (this.reactorThread != null) {
98 try {
99 this.connmgr.shutdown();
100 } catch (final IOException ex) {
101 this.log.error("I/O error shutting down connection manager", ex);
102 }
103 try {
104 this.reactorThread.join();
105 } catch (final InterruptedException ex) {
106 Thread.currentThread().interrupt();
107 }
108 }
109 }
110 }
111
112 @Override
113 public boolean isRunning() {
114 return this.status.get() == Status.ACTIVE;
115 }
116
117 }