1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.nio.conn;
28
29 import java.io.IOException;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.SocketAddress;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.http.HttpHost;
44 import org.apache.http.annotation.Contract;
45 import org.apache.http.annotation.ThreadingBehavior;
46 import org.apache.http.concurrent.BasicFuture;
47 import org.apache.http.concurrent.FutureCallback;
48 import org.apache.http.config.ConnectionConfig;
49 import org.apache.http.config.Lookup;
50 import org.apache.http.config.Registry;
51 import org.apache.http.config.RegistryBuilder;
52 import org.apache.http.conn.DnsResolver;
53 import org.apache.http.conn.SchemePortResolver;
54 import org.apache.http.conn.UnsupportedSchemeException;
55 import org.apache.http.conn.routing.HttpRoute;
56 import org.apache.http.impl.conn.DefaultSchemePortResolver;
57 import org.apache.http.impl.conn.SystemDefaultDnsResolver;
58 import org.apache.http.nio.NHttpClientConnection;
59 import org.apache.http.nio.conn.ManagedNHttpClientConnection;
60 import org.apache.http.nio.conn.NHttpClientConnectionManager;
61 import org.apache.http.nio.conn.NHttpConnectionFactory;
62 import org.apache.http.nio.conn.NoopIOSessionStrategy;
63 import org.apache.http.nio.conn.SchemeIOSessionStrategy;
64 import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
65 import org.apache.http.nio.pool.NIOConnFactory;
66 import org.apache.http.nio.pool.SocketAddressResolver;
67 import org.apache.http.nio.reactor.ConnectingIOReactor;
68 import org.apache.http.nio.reactor.IOEventDispatch;
69 import org.apache.http.nio.reactor.IOSession;
70 import org.apache.http.pool.ConnPoolControl;
71 import org.apache.http.pool.PoolStats;
72 import org.apache.http.protocol.HttpContext;
73 import org.apache.http.util.Args;
74 import org.apache.http.util.Asserts;
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @Contract(threading = ThreadingBehavior.SAFE)
95 public class PoolingNHttpClientConnectionManager
96 implements NHttpClientConnectionManager, ConnPoolControl<HttpRoute> {
97
98 private final Log log = LogFactory.getLog(getClass());
99
100 static final String IOSESSION_FACTORY_REGISTRY = "http.iosession-factory-registry";
101
102 private final ConnectingIOReactor ioreactor;
103 private final ConfigData configData;
104 private final CPool pool;
105 private final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry;
106
107 private static Registry<SchemeIOSessionStrategy> getDefaultRegistry() {
108 return RegistryBuilder.<SchemeIOSessionStrategy>create()
109 .register("http", NoopIOSessionStrategy.INSTANCE)
110 .register("https", SSLIOSessionStrategy.getDefaultStrategy())
111 .build();
112 }
113
114 public PoolingNHttpClientConnectionManager(final ConnectingIOReactor ioreactor) {
115 this(ioreactor, getDefaultRegistry());
116 }
117
118 public PoolingNHttpClientConnectionManager(
119 final ConnectingIOReactor ioreactor,
120 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
121 this(ioreactor, null, iosessionFactoryRegistry, (DnsResolver) null);
122 }
123
124 public PoolingNHttpClientConnectionManager(
125 final ConnectingIOReactor ioreactor,
126 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
127 final DnsResolver dnsResolver) {
128 this(ioreactor, connFactory, getDefaultRegistry(), dnsResolver);
129 }
130
131 public PoolingNHttpClientConnectionManager(
132 final ConnectingIOReactor ioreactor,
133 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
134 final SocketAddressResolver<HttpRoute> socketAddressResolver) {
135 this(ioreactor, connFactory, getDefaultRegistry(), socketAddressResolver);
136 }
137
138 public PoolingNHttpClientConnectionManager(
139 final ConnectingIOReactor ioreactor,
140 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory) {
141 this(ioreactor, connFactory, getDefaultRegistry(), (DnsResolver) null);
142 }
143
144 public PoolingNHttpClientConnectionManager(
145 final ConnectingIOReactor ioreactor,
146 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
147 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
148 this(ioreactor, connFactory, iosessionFactoryRegistry, (DnsResolver) null);
149 }
150
151 public PoolingNHttpClientConnectionManager(
152 final ConnectingIOReactor ioreactor,
153 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
154 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
155 final DnsResolver dnsResolver) {
156 this(ioreactor, connFactory, iosessionFactoryRegistry, null, dnsResolver,
157 -1, TimeUnit.MILLISECONDS);
158 }
159
160 public PoolingNHttpClientConnectionManager(
161 final ConnectingIOReactor ioreactor,
162 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
163 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
164 final SocketAddressResolver<HttpRoute> socketAddressResolver) {
165 this(ioreactor, connFactory, iosessionFactoryRegistry, socketAddressResolver,
166 -1, TimeUnit.MILLISECONDS);
167 }
168
169 public PoolingNHttpClientConnectionManager(
170 final ConnectingIOReactor ioreactor,
171 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
172 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
173 final SchemePortResolver schemePortResolver,
174 final DnsResolver dnsResolver,
175 final long timeToLive, final TimeUnit tunit) {
176 this(ioreactor, connFactory, iosessionFactoryRegistry,
177 new InternalAddressResolver(schemePortResolver, dnsResolver), timeToLive, tunit);
178 }
179
180 public PoolingNHttpClientConnectionManager(
181 final ConnectingIOReactor ioreactor,
182 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory,
183 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry,
184 final SocketAddressResolver<HttpRoute> socketAddressResolver,
185 final long timeToLive, final TimeUnit tunit) {
186 super();
187 Args.notNull(ioreactor, "I/O reactor");
188 Args.notNull(iosessionFactoryRegistry, "I/O session factory registry");
189 Args.notNull(socketAddressResolver, "Socket address resolver");
190 this.ioreactor = ioreactor;
191 this.configData = new ConfigData();
192 this.pool = new CPool(ioreactor,
193 new InternalConnectionFactory(this.configData, connFactory),
194 socketAddressResolver,
195 2, 20, timeToLive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
196 this.iosessionFactoryRegistry = iosessionFactoryRegistry;
197 }
198
199 PoolingNHttpClientConnectionManager(
200 final ConnectingIOReactor ioreactor,
201 final CPool pool,
202 final Registry<SchemeIOSessionStrategy> iosessionFactoryRegistry) {
203 super();
204 this.ioreactor = ioreactor;
205 this.configData = new ConfigData();
206 this.pool = pool;
207 this.iosessionFactoryRegistry = iosessionFactoryRegistry;
208 }
209
210 @Override
211 protected void finalize() throws Throwable {
212 try {
213 shutdown();
214 } finally {
215 super.finalize();
216 }
217 }
218
219 @Override
220 public void execute(final IOEventDispatch eventDispatch) throws IOException {
221 this.ioreactor.execute(eventDispatch);
222 }
223
224 public void shutdown(final long waitMs) throws IOException {
225 this.log.debug("Connection manager is shutting down");
226 this.pool.shutdown(waitMs);
227 this.log.debug("Connection manager shut down");
228 }
229
230 @Override
231 public void shutdown() throws IOException {
232 this.log.debug("Connection manager is shutting down");
233 this.pool.shutdown(2000);
234 this.log.debug("Connection manager shut down");
235 }
236
237 private String format(final HttpRoute route, final Object state) {
238 final StringBuilder buf = new StringBuilder();
239 buf.append("[route: ").append(route).append("]");
240 if (state != null) {
241 buf.append("[state: ").append(state).append("]");
242 }
243 return buf.toString();
244 }
245
246 private String formatStats(final HttpRoute route) {
247 final StringBuilder buf = new StringBuilder();
248 final PoolStats totals = this.pool.getTotalStats();
249 final PoolStats stats = this.pool.getStats(route);
250 buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
251 buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable());
252 buf.append(" of ").append(stats.getMax()).append("; ");
253 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
254 buf.append(" of ").append(totals.getMax()).append("]");
255 return buf.toString();
256 }
257
258 private String format(final CPoolEntry entry) {
259 final StringBuilder buf = new StringBuilder();
260 buf.append("[id: ").append(entry.getId()).append("]");
261 buf.append("[route: ").append(entry.getRoute()).append("]");
262 final Object state = entry.getState();
263 if (state != null) {
264 buf.append("[state: ").append(state).append("]");
265 }
266 return buf.toString();
267 }
268
269 @Override
270 public Future<NHttpClientConnection> requestConnection(
271 final HttpRoute route,
272 final Object state,
273 final long connectTimeout,
274 final long leaseTimeout,
275 final TimeUnit tunit,
276 final FutureCallback<NHttpClientConnection> callback) {
277 Args.notNull(route, "HTTP route");
278 if (this.log.isDebugEnabled()) {
279 this.log.debug("Connection request: " + format(route, state) + formatStats(route));
280 }
281 final BasicFuture<NHttpClientConnection> resultFuture = new BasicFuture<NHttpClientConnection>(callback);
282 final HttpHost host;
283 if (route.getProxyHost() != null) {
284 host = route.getProxyHost();
285 } else {
286 host = route.getTargetHost();
287 }
288 final SchemeIOSessionStrategy sf = this.iosessionFactoryRegistry.lookup(
289 host.getSchemeName());
290 if (sf == null) {
291 resultFuture.failed(new UnsupportedSchemeException(host.getSchemeName() +
292 " protocol is not supported"));
293 return resultFuture;
294 }
295 final Future<CPoolEntry> leaseFuture = this.pool.lease(route, state,
296 connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS,
297 new FutureCallback<CPoolEntry>() {
298
299 @Override
300 public void completed(final CPoolEntry entry) {
301 Asserts.check(entry.getConnection() != null, "Pool entry with no connection");
302 if (log.isDebugEnabled()) {
303 log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
304 }
305 final NHttpClientConnection managedConn = CPoolProxy.newProxy(entry);
306 if (!resultFuture.completed(managedConn)) {
307 pool.release(entry, true);
308 }
309 }
310
311 @Override
312 public void failed(final Exception ex) {
313 if (log.isDebugEnabled()) {
314 log.debug("Connection request failed", ex);
315 }
316 resultFuture.failed(ex);
317 }
318
319 @Override
320 public void cancelled() {
321 log.debug("Connection request cancelled");
322 resultFuture.cancel(true);
323 }
324
325 });
326 return new Future<NHttpClientConnection>() {
327
328 @Override
329 public boolean cancel(final boolean mayInterruptIfRunning) {
330 try {
331 leaseFuture.cancel(mayInterruptIfRunning);
332 } finally {
333 return resultFuture.cancel(mayInterruptIfRunning);
334 }
335 }
336
337 @Override
338 public boolean isCancelled() {
339 return resultFuture.isCancelled();
340 }
341
342 @Override
343 public boolean isDone() {
344 return resultFuture.isDone();
345 }
346
347 @Override
348 public NHttpClientConnection get() throws InterruptedException, ExecutionException {
349 return resultFuture.get();
350 }
351
352 @Override
353 public NHttpClientConnection get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
354 return resultFuture.get(timeout, unit);
355 }
356
357 };
358 }
359
360 @Override
361 public void releaseConnection(
362 final NHttpClientConnection managedConn,
363 final Object state,
364 final long keepalive,
365 final TimeUnit tunit) {
366 Args.notNull(managedConn, "Managed connection");
367 synchronized (managedConn) {
368 final CPoolEntry entry = CPoolProxy.detach(managedConn);
369 if (entry == null) {
370 return;
371 }
372 if (this.log.isDebugEnabled()) {
373 this.log.debug("Releasing connection: " + format(entry) + formatStats(entry.getRoute()));
374 }
375 final NHttpClientConnection conn = entry.getConnection();
376 try {
377 if (conn.isOpen()) {
378 entry.setState(state);
379 entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
380 if (this.log.isDebugEnabled()) {
381 final String s;
382 if (keepalive > 0) {
383 s = "for " + (double) keepalive / 1000 + " seconds";
384 } else {
385 s = "indefinitely";
386 }
387 this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
388 }
389 }
390 } finally {
391 this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
392 if (this.log.isDebugEnabled()) {
393 this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
394 }
395 }
396 }
397 }
398
399 private Lookup<SchemeIOSessionStrategy> getIOSessionFactoryRegistry(final HttpContext context) {
400 @SuppressWarnings("unchecked")
401 Lookup<SchemeIOSessionStrategy> reg = (Lookup<SchemeIOSessionStrategy>) context.getAttribute(
402 IOSESSION_FACTORY_REGISTRY);
403 if (reg == null) {
404 reg = this.iosessionFactoryRegistry;
405 }
406 return reg;
407 }
408
409 @Override
410 public void startRoute(
411 final NHttpClientConnection managedConn,
412 final HttpRoute route,
413 final HttpContext context) throws IOException {
414 Args.notNull(managedConn, "Managed connection");
415 Args.notNull(route, "HTTP route");
416 final HttpHost host;
417 if (route.getProxyHost() != null) {
418 host = route.getProxyHost();
419 } else {
420 host = route.getTargetHost();
421 }
422 final Lookup<SchemeIOSessionStrategy> reg = getIOSessionFactoryRegistry(context);
423 final SchemeIOSessionStrategy sf = reg.lookup(host.getSchemeName());
424 if (sf == null) {
425 throw new UnsupportedSchemeException(host.getSchemeName() +
426 " protocol is not supported");
427 }
428 if (sf.isLayeringRequired()) {
429 synchronized (managedConn) {
430 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
431 final ManagedNHttpClientConnection conn = entry.getConnection();
432 final IOSession ioSession = conn.getIOSession();
433 final IOSession currentSession = sf.upgrade(host, ioSession);
434 conn.bind(currentSession);
435 }
436 }
437 }
438
439 @Override
440 public void upgrade(
441 final NHttpClientConnection managedConn,
442 final HttpRoute route,
443 final HttpContext context) throws IOException {
444 Args.notNull(managedConn, "Managed connection");
445 Args.notNull(route, "HTTP route");
446 final HttpHost host = route.getTargetHost();
447 final Lookup<SchemeIOSessionStrategy> reg = getIOSessionFactoryRegistry(context);
448 final SchemeIOSessionStrategy sf = reg.lookup(host.getSchemeName());
449 if (sf == null) {
450 throw new UnsupportedSchemeException(host.getSchemeName() +
451 " protocol is not supported");
452 }
453 if (!sf.isLayeringRequired()) {
454 throw new UnsupportedSchemeException(host.getSchemeName() +
455 " protocol does not support connection upgrade");
456 }
457 synchronized (managedConn) {
458 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
459 final ManagedNHttpClientConnection conn = entry.getConnection();
460 final IOSession currentSession = sf.upgrade(host, conn.getIOSession());
461 conn.bind(currentSession);
462 }
463 }
464
465 @Override
466 public void routeComplete(
467 final NHttpClientConnection managedConn,
468 final HttpRoute route,
469 final HttpContext context) {
470 Args.notNull(managedConn, "Managed connection");
471 Args.notNull(route, "HTTP route");
472 synchronized (managedConn) {
473 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
474 entry.markRouteComplete();
475 }
476 }
477
478 @Override
479 public boolean isRouteComplete(
480 final NHttpClientConnection managedConn) {
481 Args.notNull(managedConn, "Managed connection");
482 synchronized (managedConn) {
483 final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
484 return entry.isRouteComplete();
485 }
486 }
487
488 @Override
489 public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
490 if (this.log.isDebugEnabled()) {
491 this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
492 }
493 this.pool.closeIdle(idleTimeout, tunit);
494 }
495
496 @Override
497 public void closeExpiredConnections() {
498 log.debug("Closing expired connections");
499 this.pool.closeExpired();
500 }
501
502 public void validatePendingRequests() {
503 log.debug("Validating pending requests");
504 this.pool.validatePendingRequests();
505 }
506
507 @Override
508 public int getMaxTotal() {
509 return this.pool.getMaxTotal();
510 }
511
512 @Override
513 public void setMaxTotal(final int max) {
514 this.pool.setMaxTotal(max);
515 }
516
517 @Override
518 public int getDefaultMaxPerRoute() {
519 return this.pool.getDefaultMaxPerRoute();
520 }
521
522 @Override
523 public void setDefaultMaxPerRoute(final int max) {
524 this.pool.setDefaultMaxPerRoute(max);
525 }
526
527 @Override
528 public int getMaxPerRoute(final HttpRoute route) {
529 return this.pool.getMaxPerRoute(route);
530 }
531
532 @Override
533 public void setMaxPerRoute(final HttpRoute route, final int max) {
534 this.pool.setMaxPerRoute(route, max);
535 }
536
537 @Override
538 public PoolStats getTotalStats() {
539 return this.pool.getTotalStats();
540 }
541
542 @Override
543 public PoolStats getStats(final HttpRoute route) {
544 return this.pool.getStats(route);
545 }
546
547
548
549
550 public Set<HttpRoute> getRoutes() {
551 return this.pool.getRoutes();
552 }
553
554 public ConnectionConfig getDefaultConnectionConfig() {
555 return this.configData.getDefaultConnectionConfig();
556 }
557
558 public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
559 this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
560 }
561
562 public ConnectionConfig getConnectionConfig(final HttpHost host) {
563 return this.configData.getConnectionConfig(host);
564 }
565
566 public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
567 this.configData.setConnectionConfig(host, connectionConfig);
568 }
569
570 static class ConfigData {
571
572 private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
573 private volatile ConnectionConfig defaultConnectionConfig;
574
575 ConfigData() {
576 super();
577 this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
578 }
579
580 public ConnectionConfig getDefaultConnectionConfig() {
581 return this.defaultConnectionConfig;
582 }
583
584 public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
585 this.defaultConnectionConfig = defaultConnectionConfig;
586 }
587
588 public ConnectionConfig getConnectionConfig(final HttpHost host) {
589 return this.connectionConfigMap.get(host);
590 }
591
592 public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
593 this.connectionConfigMap.put(host, connectionConfig);
594 }
595
596 }
597
598 static class InternalConnectionFactory implements NIOConnFactory<HttpRoute, ManagedNHttpClientConnection> {
599
600 private final ConfigData configData;
601 private final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory;
602
603 InternalConnectionFactory(
604 final ConfigData configData,
605 final NHttpConnectionFactory<ManagedNHttpClientConnection> connFactory) {
606 super();
607 this.configData = configData != null ? configData : new ConfigData();
608 this.connFactory = connFactory != null ? connFactory :
609 ManagedNHttpClientConnectionFactory.INSTANCE;
610 }
611
612 @Override
613 public ManagedNHttpClientConnection create(
614 final HttpRoute route, final IOSession iosession) throws IOException {
615 ConnectionConfig config = null;
616 if (route.getProxyHost() != null) {
617 config = this.configData.getConnectionConfig(route.getProxyHost());
618 }
619 if (config == null) {
620 config = this.configData.getConnectionConfig(route.getTargetHost());
621 }
622 if (config == null) {
623 config = this.configData.getDefaultConnectionConfig();
624 }
625 if (config == null) {
626 config = ConnectionConfig.DEFAULT;
627 }
628 final ManagedNHttpClientConnection conn = this.connFactory.create(iosession, config);
629 iosession.setAttribute(IOEventDispatch.CONNECTION_KEY, conn);
630 return conn;
631 }
632
633 }
634
635 static class InternalAddressResolver implements SocketAddressResolver<HttpRoute> {
636
637 private final SchemePortResolver schemePortResolver;
638 private final DnsResolver dnsResolver;
639
640 public InternalAddressResolver(
641 final SchemePortResolver schemePortResolver,
642 final DnsResolver dnsResolver) {
643 super();
644 this.schemePortResolver = schemePortResolver != null ? schemePortResolver :
645 DefaultSchemePortResolver.INSTANCE;
646 this.dnsResolver = dnsResolver != null ? dnsResolver :
647 SystemDefaultDnsResolver.INSTANCE;
648 }
649
650 @Override
651 public SocketAddress resolveLocalAddress(final HttpRoute route) throws IOException {
652 return route.getLocalAddress() != null ? new InetSocketAddress(route.getLocalAddress(), 0) : null;
653 }
654
655 @Override
656 public SocketAddress resolveRemoteAddress(final HttpRoute route) throws IOException {
657 final HttpHost host;
658 if (route.getProxyHost() != null) {
659 host = route.getProxyHost();
660 } else {
661 host = route.getTargetHost();
662 }
663 final int port = this.schemePortResolver.resolve(host);
664 final InetAddress[] addresses = this.dnsResolver.resolve(host.getHostName());
665 return new InetSocketAddress(addresses[0], port);
666 }
667
668 }
669 }