1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.conn;
28
29 import java.io.IOException;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.http.concurrent.BasicFuture;
36 import org.apache.http.concurrent.FutureCallback;
37 import org.apache.http.conn.routing.HttpRoute;
38 import org.apache.http.nio.conn.ClientAsyncConnectionFactory;
39 import org.apache.http.nio.conn.ClientAsyncConnectionManager;
40 import org.apache.http.nio.conn.ManagedClientAsyncConnection;
41 import org.apache.http.nio.conn.scheme.AsyncSchemeRegistry;
42 import org.apache.http.nio.reactor.ConnectingIOReactor;
43 import org.apache.http.nio.reactor.IOEventDispatch;
44 import org.apache.http.nio.reactor.IOReactorException;
45 import org.apache.http.nio.reactor.IOReactorStatus;
46 import org.apache.http.pool.ConnPoolControl;
47 import org.apache.http.pool.PoolStats;
48 import org.apache.http.util.Args;
49
50 @Deprecated
51 public class PoolingClientAsyncConnectionManager
52 implements ClientAsyncConnectionManager, ConnPoolControl<HttpRoute> {
53
54 private final Log log = LogFactory.getLog(getClass());
55
56 private final ConnectingIOReactor ioReactor;
57 private final HttpNIOConnPool pool;
58 private final AsyncSchemeRegistry schemeRegistry;
59 private final ClientAsyncConnectionFactory connFactory;
60
61 public PoolingClientAsyncConnectionManager(
62 final ConnectingIOReactor ioReactor,
63 final AsyncSchemeRegistry schemeRegistry,
64 final long timeToLive, final TimeUnit timeUnit) {
65 super();
66 Args.notNull(ioReactor, "I/O reactor");
67 Args.notNull(schemeRegistry, "Scheme registory");
68 Args.notNull(timeUnit, "Time unit");
69 this.ioReactor = ioReactor;
70 this.pool = new HttpNIOConnPool(this.log, ioReactor, schemeRegistry, timeToLive, timeUnit);
71 this.schemeRegistry = schemeRegistry;
72 this.connFactory = createClientAsyncConnectionFactory();
73 }
74
75 public PoolingClientAsyncConnectionManager(
76 final ConnectingIOReactor ioReactor,
77 final AsyncSchemeRegistry schemeRegistry) throws IOReactorException {
78 this(ioReactor, schemeRegistry, -1, TimeUnit.MILLISECONDS);
79 }
80
81 public PoolingClientAsyncConnectionManager(
82 final ConnectingIOReactor ioReactor) throws IOReactorException {
83 this(ioReactor, AsyncSchemeRegistryFactory.createDefault());
84 }
85
86 @Override
87 protected void finalize() throws Throwable {
88 try {
89 shutdown();
90 } finally {
91 super.finalize();
92 }
93 }
94
95 protected ClientAsyncConnectionFactory createClientAsyncConnectionFactory() {
96 return new DefaultClientAsyncConnectionFactory();
97 }
98
99 @Override
100 public AsyncSchemeRegistry getSchemeRegistry() {
101 return this.schemeRegistry;
102 }
103
104 @Override
105 public void execute(final IOEventDispatch eventDispatch) throws IOException {
106 this.ioReactor.execute(eventDispatch);
107 }
108
109 @Override
110 public IOReactorStatus getStatus() {
111 return this.ioReactor.getStatus();
112 }
113
114 @Override
115 public void shutdown(final long waitMs) throws IOException {
116 this.log.debug("Connection manager is shutting down");
117 this.pool.shutdown(waitMs);
118 this.log.debug("Connection manager shut down");
119 }
120
121 @Override
122 public void shutdown() throws IOException {
123 this.log.debug("Connection manager is shutting down");
124 this.pool.shutdown(2000);
125 this.log.debug("Connection manager shut down");
126 }
127
128 private String format(final HttpRoute route, final Object state) {
129 final StringBuilder buf = new StringBuilder();
130 buf.append("[route: ").append(route).append("]");
131 if (state != null) {
132 buf.append("[state: ").append(state).append("]");
133 }
134 return buf.toString();
135 }
136
137 private String formatStats(final HttpRoute route) {
138 final StringBuilder buf = new StringBuilder();
139 final PoolStats totals = this.pool.getTotalStats();
140 final PoolStats stats = this.pool.getStats(route);
141 buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
142 buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
143 buf.append(" of ").append(stats.getMax()).append("; ");
144 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
145 buf.append(" of ").append(totals.getMax()).append("]");
146 return buf.toString();
147 }
148
149 private String format(final HttpPoolEntry entry) {
150 final StringBuilder buf = new StringBuilder();
151 buf.append("[id: ").append(entry.getId()).append("]");
152 buf.append("[route: ").append(entry.getRoute()).append("]");
153 final Object state = entry.getState();
154 if (state != null) {
155 buf.append("[state: ").append(state).append("]");
156 }
157 return buf.toString();
158 }
159
160 @Override
161 public Future<ManagedClientAsyncConnection> leaseConnection(
162 final HttpRoute route,
163 final Object state,
164 final long connectTimeout,
165 final TimeUnit timeUnit,
166 final FutureCallback<ManagedClientAsyncConnection> callback) {
167 Args.notNull(route, "HTTP route");
168 Args.notNull(timeUnit, "Time unit");
169 if (this.log.isDebugEnabled()) {
170 this.log.debug("Connection request: " + format(route, state) + formatStats(route));
171 }
172 final BasicFuture<ManagedClientAsyncConnection> future = new BasicFuture<ManagedClientAsyncConnection>(
173 callback);
174 this.pool.lease(route, state, connectTimeout, timeUnit, new InternalPoolEntryCallback(future));
175 return future;
176 }
177
178 @Override
179 public void releaseConnection(
180 final ManagedClientAsyncConnection conn,
181 final long keepalive,
182 final TimeUnit timeUnit) {
183 Args.notNull(conn, "HTTP connection");
184 if (!(conn instanceof ManagedClientAsyncConnectionImpl)) {
185 throw new IllegalArgumentException("Connection class mismatch, " +
186 "connection not obtained from this manager");
187 }
188 Args.notNull(timeUnit, "Time unit");
189 final ManagedClientAsyncConnectionImpltp/impl/nio/conn/ManagedClientAsyncConnectionImpl.html#ManagedClientAsyncConnectionImpl">ManagedClientAsyncConnectionImpl managedConn = (ManagedClientAsyncConnectionImpl) conn;
190 final ClientAsyncConnectionManager manager = managedConn.getManager();
191 if (manager != null && manager != this) {
192 throw new IllegalArgumentException("Connection not obtained from this manager");
193 }
194 if (this.pool.isShutdown()) {
195 return;
196 }
197
198 synchronized (managedConn) {
199 final HttpPoolEntry entry = managedConn.getPoolEntry();
200 if (entry == null) {
201 return;
202 }
203 try {
204 if (managedConn.isOpen() && !managedConn.isMarkedReusable()) {
205 try {
206 managedConn.shutdown();
207 } catch (final IOException iox) {
208 if (this.log.isDebugEnabled()) {
209 this.log.debug("I/O exception shutting down released connection", iox);
210 }
211 }
212 }
213 if (managedConn.isOpen()) {
214 entry.updateExpiry(keepalive, timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
215 if (this.log.isDebugEnabled()) {
216 final String s;
217 if (keepalive > 0) {
218 s = "for " + keepalive + " " + timeUnit;
219 } else {
220 s = "indefinitely";
221 }
222 this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
223 }
224
225 managedConn.setSocketTimeout(0);
226 }
227 } finally {
228 this.pool.release(managedConn.detach(), managedConn.isMarkedReusable());
229 }
230 if (this.log.isDebugEnabled()) {
231 this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
232 }
233 }
234 }
235
236 @Override
237 public PoolStats getTotalStats() {
238 return this.pool.getTotalStats();
239 }
240
241 @Override
242 public PoolStats getStats(final HttpRoute route) {
243 return this.pool.getStats(route);
244 }
245
246 @Override
247 public void setMaxTotal(final int max) {
248 this.pool.setMaxTotal(max);
249 }
250
251 @Override
252 public void setDefaultMaxPerRoute(final int max) {
253 this.pool.setDefaultMaxPerRoute(max);
254 }
255
256 @Override
257 public void setMaxPerRoute(final HttpRoute route, final int max) {
258 this.pool.setMaxPerRoute(route, max);
259 }
260
261 @Override
262 public int getMaxTotal() {
263 return this.pool.getMaxTotal();
264 }
265
266 @Override
267 public int getDefaultMaxPerRoute() {
268 return this.pool.getDefaultMaxPerRoute();
269 }
270
271 @Override
272 public int getMaxPerRoute(final HttpRoute route) {
273 return this.pool.getMaxPerRoute(route);
274 }
275
276 public void closeIdleConnections(final long idleTimeout, final TimeUnit timeUnit) {
277 if (log.isDebugEnabled()) {
278 log.debug("Closing connections idle longer than " + idleTimeout + " " + timeUnit);
279 }
280 this.pool.closeIdle(idleTimeout, timeUnit);
281 }
282
283 public void closeExpiredConnections() {
284 log.debug("Closing expired connections");
285 this.pool.closeExpired();
286 }
287
288 class InternalPoolEntryCallback implements FutureCallback<HttpPoolEntry> {
289
290 private final BasicFuture<ManagedClientAsyncConnection> future;
291
292 public InternalPoolEntryCallback(
293 final BasicFuture<ManagedClientAsyncConnection> future) {
294 super();
295 this.future = future;
296 }
297
298 @Override
299 public void completed(final HttpPoolEntry entry) {
300 if (log.isDebugEnabled()) {
301 log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
302 }
303 final ManagedClientAsyncConnection conn = new ManagedClientAsyncConnectionImpl(
304 PoolingClientAsyncConnectionManager.this,
305 PoolingClientAsyncConnectionManager.this.connFactory,
306 entry);
307 if (!this.future.completed(conn)) {
308 pool.release(entry, true);
309 }
310 }
311
312 @Override
313 public void failed(final Exception ex) {
314 if (log.isDebugEnabled()) {
315 log.debug("Connection request failed", ex);
316 }
317 this.future.failed(ex);
318 }
319
320 @Override
321 public void cancelled() {
322 log.debug("Connection request cancelled");
323 this.future.cancel(true);
324 }
325
326 }
327
328 }