View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.Future;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.locks.Lock;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.annotation.ThreadingBehavior;
49  import org.apache.http.concurrent.BasicFuture;
50  import org.apache.http.concurrent.FutureCallback;
51  import org.apache.http.nio.reactor.ConnectingIOReactor;
52  import org.apache.http.nio.reactor.IOReactorStatus;
53  import org.apache.http.nio.reactor.IOSession;
54  import org.apache.http.nio.reactor.SessionRequest;
55  import org.apache.http.nio.reactor.SessionRequestCallback;
56  import org.apache.http.pool.ConnPool;
57  import org.apache.http.pool.ConnPoolControl;
58  import org.apache.http.pool.PoolEntry;
59  import org.apache.http.pool.PoolEntryCallback;
60  import org.apache.http.pool.PoolStats;
61  import org.apache.http.util.Args;
62  import org.apache.http.util.Asserts;
63  import org.apache.http.util.LangUtils;
64  
65  /**
66   * Abstract non-blocking connection pool.
67   *
68   * @param <T> route
69   * @param <C> connection object
70   * @param <E> pool entry
71   *
72   * @since 4.2
73   */
74  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
75  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
76                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
77  
78      private final ConnectingIOReactor ioReactor;
79      private final NIOConnFactory<T, C> connFactory;
80      private final SocketAddressResolver<T> addressResolver;
81      private final SessionRequestCallback sessionRequestCallback;
82      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
83      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
84      private final Set<SessionRequest> pending;
85      private final Set<E> leased;
86      private final LinkedList<E> available;
87      private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
88      private final Map<T, Integer> maxPerRoute;
89      private final Lock lock;
90      private final AtomicBoolean isShutDown;
91  
92      private volatile int defaultMaxPerRoute;
93      private volatile int maxTotal;
94  
95      /**
96       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
97       *   NIOConnFactory, SocketAddressResolver, int, int)}
98       */
99      @Deprecated
100     public AbstractNIOConnPool(
101             final ConnectingIOReactor ioReactor,
102             final NIOConnFactory<T, C> connFactory,
103             final int defaultMaxPerRoute,
104             final int maxTotal) {
105         super();
106         Args.notNull(ioReactor, "I/O reactor");
107         Args.notNull(connFactory, "Connection factory");
108         Args.positive(defaultMaxPerRoute, "Max per route value");
109         Args.positive(maxTotal, "Max total value");
110         this.ioReactor = ioReactor;
111         this.connFactory = connFactory;
112         this.addressResolver = new SocketAddressResolver<T>() {
113 
114             @Override
115             public SocketAddress resolveLocalAddress(final T route) throws IOException {
116                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
117             }
118 
119             @Override
120             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
121                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
122             }
123 
124         };
125         this.sessionRequestCallback = new InternalSessionRequestCallback();
126         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
127         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
128         this.pending = new HashSet<SessionRequest>();
129         this.leased = new HashSet<E>();
130         this.available = new LinkedList<E>();
131         this.maxPerRoute = new HashMap<T, Integer>();
132         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
133         this.lock = new ReentrantLock();
134         this.isShutDown = new AtomicBoolean(false);
135         this.defaultMaxPerRoute = defaultMaxPerRoute;
136         this.maxTotal = maxTotal;
137     }
138 
139     /**
140      * @since 4.3
141      */
142     public AbstractNIOConnPool(
143             final ConnectingIOReactor ioReactor,
144             final NIOConnFactory<T, C> connFactory,
145             final SocketAddressResolver<T> addressResolver,
146             final int defaultMaxPerRoute,
147             final int maxTotal) {
148         super();
149         Args.notNull(ioReactor, "I/O reactor");
150         Args.notNull(connFactory, "Connection factory");
151         Args.notNull(addressResolver, "Address resolver");
152         Args.positive(defaultMaxPerRoute, "Max per route value");
153         Args.positive(maxTotal, "Max total value");
154         this.ioReactor = ioReactor;
155         this.connFactory = connFactory;
156         this.addressResolver = addressResolver;
157         this.sessionRequestCallback = new InternalSessionRequestCallback();
158         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
159         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
160         this.pending = new HashSet<SessionRequest>();
161         this.leased = new HashSet<E>();
162         this.available = new LinkedList<E>();
163         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
164         this.maxPerRoute = new HashMap<T, Integer>();
165         this.lock = new ReentrantLock();
166         this.isShutDown = new AtomicBoolean(false);
167         this.defaultMaxPerRoute = defaultMaxPerRoute;
168         this.maxTotal = maxTotal;
169     }
170 
171     /**
172      * @deprecated (4.3) use {@link SocketAddressResolver}
173      */
174     @Deprecated
175     protected SocketAddress resolveRemoteAddress(final T route) {
176         return null;
177     }
178 
179     /**
180      * @deprecated (4.3) use {@link SocketAddressResolver}
181      */
182     @Deprecated
183     protected SocketAddress resolveLocalAddress(final T route) {
184         return null;
185     }
186 
187     protected abstract E createEntry(T route, C conn);
188 
189     /**
190      * @since 4.3
191      */
192     protected void onLease(final E entry) {
193     }
194 
195     /**
196      * @since 4.3
197      */
198     protected void onRelease(final E entry) {
199     }
200 
201     /**
202      * @since 4.4
203      */
204     protected void onReuse(final E entry) {
205     }
206 
207     public boolean isShutdown() {
208         return this.isShutDown.get();
209     }
210 
211     public void shutdown(final long waitMs) throws IOException {
212         if (this.isShutDown.compareAndSet(false, true)) {
213             fireCallbacks();
214             this.lock.lock();
215             try {
216                 for (final SessionRequest sessionRequest: this.pending) {
217                     sessionRequest.cancel();
218                 }
219                 for (final E entry: this.available) {
220                     entry.close();
221                 }
222                 for (final E entry: this.leased) {
223                     entry.close();
224                 }
225                 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
226                     pool.shutdown();
227                 }
228                 this.routeToPool.clear();
229                 this.leased.clear();
230                 this.pending.clear();
231                 this.available.clear();
232                 this.leasingRequests.clear();
233                 this.ioReactor.shutdown(waitMs);
234             } finally {
235                 this.lock.unlock();
236             }
237         }
238     }
239 
240     private RouteSpecificPool<T, C, E> getPool(final T route) {
241         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
242         if (pool == null) {
243             pool = new RouteSpecificPool<T, C, E>(route) {
244 
245                 @Override
246                 protected E createEntry(final T route, final C conn) {
247                     return AbstractNIOConnPool.this.createEntry(route, conn);
248                 }
249 
250             };
251             this.routeToPool.put(route, pool);
252         }
253         return pool;
254     }
255 
256     public Future<E> lease(
257             final T route, final Object state,
258             final long connectTimeout, final TimeUnit timeUnit,
259             final FutureCallback<E> callback) {
260         return this.lease(route, state, connectTimeout, connectTimeout, timeUnit, callback);
261     }
262 
263     /**
264      * @since 4.3
265      */
266     public Future<E> lease(
267             final T route, final Object state,
268             final long connectTimeout, final long leaseTimeout, final TimeUnit timeUnit,
269             final FutureCallback<E> callback) {
270         Args.notNull(route, "Route");
271         Args.notNull(timeUnit, "Time unit");
272         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
273         final BasicFuture<E> future = new BasicFuture<E>(callback);
274         final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state,
275                 connectTimeout >= 0 ? timeUnit.toMillis(connectTimeout) : -1,
276                 leaseTimeout > 0 ? timeUnit.toMillis(leaseTimeout) : 0,
277                 future);
278         this.lock.lock();
279         try {
280             final boolean completed = processPendingRequest(leaseRequest);
281             if (!leaseRequest.isDone() && !completed) {
282                 this.leasingRequests.add(leaseRequest);
283             }
284             if (leaseRequest.isDone()) {
285                 this.completedRequests.add(leaseRequest);
286             }
287         } finally {
288             this.lock.unlock();
289         }
290         fireCallbacks();
291         return new Future<E>() {
292 
293             @Override
294             public E get() throws InterruptedException, ExecutionException {
295                 return future.get();
296             }
297 
298             @Override
299             public E get(
300                     final long timeout,
301                     final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
302                 return future.get(timeout, unit);
303             }
304 
305             @Override
306             public boolean cancel(final boolean mayInterruptIfRunning) {
307                 try {
308                     leaseRequest.cancel();
309                 } finally {
310                     return future.cancel(mayInterruptIfRunning);
311                 }
312             }
313 
314             @Override
315             public boolean isCancelled() {
316                 return future.isCancelled();
317             }
318 
319             @Override
320             public boolean isDone() {
321                 return future.isDone();
322             }
323 
324         };
325     }
326 
327     @Override
328     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
329         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
330     }
331 
332     public Future<E> lease(final T route, final Object state) {
333         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
334     }
335 
336     @Override
337     public void release(final E entry, final boolean reusable) {
338         if (entry == null) {
339             return;
340         }
341         if (this.isShutDown.get()) {
342             return;
343         }
344         this.lock.lock();
345         try {
346             if (this.leased.remove(entry)) {
347                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
348                 pool.free(entry, reusable);
349                 if (reusable) {
350                     this.available.addFirst(entry);
351                     onRelease(entry);
352                 } else {
353                     entry.close();
354                 }
355                 processNextPendingRequest();
356             }
357         } finally {
358             this.lock.unlock();
359         }
360         fireCallbacks();
361     }
362 
363     private void processPendingRequests() {
364         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
365         while (it.hasNext()) {
366             final LeaseRequest<T, C, E> request = it.next();
367             final BasicFuture<E> future = request.getFuture();
368             if (future.isCancelled()) {
369                 it.remove();
370                 continue;
371             }
372             final boolean completed = processPendingRequest(request);
373             if (request.isDone() || completed) {
374                 it.remove();
375             }
376             if (request.isDone()) {
377                 this.completedRequests.add(request);
378             }
379         }
380     }
381 
382     private void processNextPendingRequest() {
383         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
384         while (it.hasNext()) {
385             final LeaseRequest<T, C, E> request = it.next();
386             final BasicFuture<E> future = request.getFuture();
387             if (future.isCancelled()) {
388                 it.remove();
389                 continue;
390             }
391             final boolean completed = processPendingRequest(request);
392             if (request.isDone() || completed) {
393                 it.remove();
394             }
395             if (request.isDone()) {
396                 this.completedRequests.add(request);
397             }
398             if (completed) {
399                 return;
400             }
401         }
402     }
403 
404     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
405         final T route = request.getRoute();
406         final Object state = request.getState();
407         final long deadline = request.getDeadline();
408 
409         final long now = System.currentTimeMillis();
410         if (now > deadline) {
411             request.failed(new TimeoutException("Connection lease request time out"));
412             return false;
413         }
414 
415         final RouteSpecificPool<T, C, E> pool = getPool(route);
416         E entry;
417         for (;;) {
418             entry = pool.getFree(state);
419             if (entry == null) {
420                 break;
421             }
422             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
423                 entry.close();
424                 this.available.remove(entry);
425                 pool.free(entry, false);
426             } else {
427                 break;
428             }
429         }
430         if (entry != null) {
431             this.available.remove(entry);
432             this.leased.add(entry);
433             request.completed(entry);
434             onReuse(entry);
435             onLease(entry);
436             return true;
437         }
438 
439         // New connection is needed
440         final int maxPerRoute = getMax(route);
441         // Shrink the pool prior to allocating a new connection
442         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
443         if (excess > 0) {
444             for (int i = 0; i < excess; i++) {
445                 final E lastUsed = pool.getLastUsed();
446                 if (lastUsed == null) {
447                     break;
448                 }
449                 lastUsed.close();
450                 this.available.remove(lastUsed);
451                 pool.remove(lastUsed);
452             }
453         }
454 
455         if (pool.getAllocatedCount() < maxPerRoute) {
456             final int totalUsed = this.pending.size() + this.leased.size();
457             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
458             if (freeCapacity == 0) {
459                 return false;
460             }
461             final int totalAvailable = this.available.size();
462             if (totalAvailable > freeCapacity - 1) {
463                 final E lastUsed = this.available.removeLast();
464                 lastUsed.close();
465                 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
466                 otherpool.remove(lastUsed);
467             }
468 
469             final SocketAddress localAddress;
470             final SocketAddress remoteAddress;
471             try {
472                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
473                 localAddress = this.addressResolver.resolveLocalAddress(route);
474             } catch (final IOException ex) {
475                 request.failed(ex);
476                 return false;
477             }
478 
479             final SessionRequest sessionRequest = this.ioReactor.connect(
480                     remoteAddress, localAddress, route, this.sessionRequestCallback);
481             request.attachSessionRequest(sessionRequest);
482             final long connectTimeout = request.getConnectTimeout();
483             if (connectTimeout >= 0) {
484                 sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE);
485             }
486             this.pending.add(sessionRequest);
487             pool.addPending(sessionRequest, request.getFuture());
488             return true;
489         }
490         return false;
491     }
492 
493     private void fireCallbacks() {
494         LeaseRequest<T, C, E> request;
495         while ((request = this.completedRequests.poll()) != null) {
496             final BasicFuture<E> future = request.getFuture();
497             final Exception ex = request.getException();
498             final E result = request.getResult();
499             boolean successfullyCompleted = false;
500             if (ex != null) {
501                 future.failed(ex);
502             } else if (result != null) {
503                 if (future.completed(result)) {
504                     successfullyCompleted = true;
505                 }
506             } else {
507                 future.cancel();
508             }
509             if (!successfullyCompleted) {
510                 release(result, true);
511             }
512         }
513     }
514 
515     public void validatePendingRequests() {
516         this.lock.lock();
517         try {
518             final long now = System.currentTimeMillis();
519             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
520             while (it.hasNext()) {
521                 final LeaseRequest<T, C, E> request = it.next();
522                 final BasicFuture<E> future = request.getFuture();
523                 if (future.isCancelled() && !request.isDone()) {
524                     it.remove();
525                 } else {
526                     final long deadline = request.getDeadline();
527                     if (now > deadline) {
528                         request.failed(new TimeoutException("Connection lease request time out"));
529                     }
530                     if (request.isDone()) {
531                         it.remove();
532                         this.completedRequests.add(request);
533                     }
534                 }
535             }
536         } finally {
537             this.lock.unlock();
538         }
539         fireCallbacks();
540     }
541 
542     protected void requestCompleted(final SessionRequest request) {
543         if (this.isShutDown.get()) {
544             final IOSession session = request.getSession();
545             if (session != null) {
546                 session.close();
547             }
548             return;
549         }
550         @SuppressWarnings("unchecked")
551         final
552         T route = (T) request.getAttachment();
553         this.lock.lock();
554         try {
555             this.pending.remove(request);
556             final RouteSpecificPool<T, C, E> pool = getPool(route);
557             final IOSession session = request.getSession();
558             try {
559                 final C conn = this.connFactory.create(route, session);
560                 final E entry = pool.createEntry(request, conn);
561                 if (pool.completed(request, entry)) {
562                     this.leased.add(entry);
563                     onLease(entry);
564                 } else {
565                     this.available.add(entry);
566                     if (this.ioReactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
567                         processNextPendingRequest();
568                     }
569                 }
570             } catch (final IOException ex) {
571                 pool.failed(request, ex);
572             }
573         } finally {
574             this.lock.unlock();
575         }
576         fireCallbacks();
577     }
578 
579     protected void requestCancelled(final SessionRequest request) {
580         if (this.isShutDown.get()) {
581             return;
582         }
583         @SuppressWarnings("unchecked")
584         final
585         T route = (T) request.getAttachment();
586         this.lock.lock();
587         try {
588             this.pending.remove(request);
589             final RouteSpecificPool<T, C, E> pool = getPool(route);
590             pool.cancelled(request);
591             if (this.ioReactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
592                 processNextPendingRequest();
593             }
594         } finally {
595             this.lock.unlock();
596         }
597         fireCallbacks();
598     }
599 
600     protected void requestFailed(final SessionRequest request) {
601         if (this.isShutDown.get()) {
602             return;
603         }
604         @SuppressWarnings("unchecked")
605         final
606         T route = (T) request.getAttachment();
607         this.lock.lock();
608         try {
609             this.pending.remove(request);
610             final RouteSpecificPool<T, C, E> pool = getPool(route);
611             pool.failed(request, request.getException());
612             processNextPendingRequest();
613         } finally {
614             this.lock.unlock();
615         }
616         fireCallbacks();
617     }
618 
619     protected void requestTimeout(final SessionRequest request) {
620         if (this.isShutDown.get()) {
621             return;
622         }
623         @SuppressWarnings("unchecked")
624         final
625         T route = (T) request.getAttachment();
626         this.lock.lock();
627         try {
628             this.pending.remove(request);
629             final RouteSpecificPool<T, C, E> pool = getPool(route);
630             pool.timeout(request);
631             processNextPendingRequest();
632         } finally {
633             this.lock.unlock();
634         }
635         fireCallbacks();
636     }
637 
638     private int getMax(final T route) {
639         final Integer v = this.maxPerRoute.get(route);
640         return v != null ? v.intValue() : this.defaultMaxPerRoute;
641     }
642 
643     @Override
644     public void setMaxTotal(final int max) {
645         Args.positive(max, "Max value");
646         this.lock.lock();
647         try {
648             this.maxTotal = max;
649         } finally {
650             this.lock.unlock();
651         }
652     }
653 
654     @Override
655     public int getMaxTotal() {
656         this.lock.lock();
657         try {
658             return this.maxTotal;
659         } finally {
660             this.lock.unlock();
661         }
662     }
663 
664     @Override
665     public void setDefaultMaxPerRoute(final int max) {
666         Args.positive(max, "Max value");
667         this.lock.lock();
668         try {
669             this.defaultMaxPerRoute = max;
670         } finally {
671             this.lock.unlock();
672         }
673     }
674 
675     @Override
676     public int getDefaultMaxPerRoute() {
677         this.lock.lock();
678         try {
679             return this.defaultMaxPerRoute;
680         } finally {
681             this.lock.unlock();
682         }
683     }
684 
685     @Override
686     public void setMaxPerRoute(final T route, final int max) {
687         Args.notNull(route, "Route");
688         this.lock.lock();
689         try {
690             if (max > -1) {
691                 this.maxPerRoute.put(route, Integer.valueOf(max));
692             } else {
693                 this.maxPerRoute.remove(route);
694             }
695         } finally {
696             this.lock.unlock();
697         }
698     }
699 
700     @Override
701     public int getMaxPerRoute(final T route) {
702         Args.notNull(route, "Route");
703         this.lock.lock();
704         try {
705             return getMax(route);
706         } finally {
707             this.lock.unlock();
708         }
709     }
710 
711     @Override
712     public PoolStats getTotalStats() {
713         this.lock.lock();
714         try {
715             return new PoolStats(
716                     this.leased.size(),
717                     this.pending.size(),
718                     this.available.size(),
719                     this.maxTotal);
720         } finally {
721             this.lock.unlock();
722         }
723     }
724 
725     @Override
726     public PoolStats getStats(final T route) {
727         Args.notNull(route, "Route");
728         this.lock.lock();
729         try {
730             final RouteSpecificPool<T, C, E> pool = getPool(route);
731             int pendingCount = 0;
732             for (final LeaseRequest<T, C, E> request: leasingRequests) {
733                 if (LangUtils.equals(route, request.getRoute())) {
734                     pendingCount++;
735                 }
736             }
737             return new PoolStats(
738                     pool.getLeasedCount(),
739                     pendingCount + pool.getPendingCount(),
740                     pool.getAvailableCount(),
741                     getMax(route));
742         } finally {
743             this.lock.unlock();
744         }
745     }
746 
747     /**
748      * Returns snapshot of all knows routes
749      *
750      * @since 4.4
751      */
752     public Set<T> getRoutes() {
753         this.lock.lock();
754         try {
755             return new HashSet<T>(routeToPool.keySet());
756         } finally {
757             this.lock.unlock();
758         }
759     }
760 
761     /**
762      * Enumerates all available connections.
763      *
764      * @since 4.3
765      */
766     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
767         this.lock.lock();
768         try {
769             final Iterator<E> it = this.available.iterator();
770             while (it.hasNext()) {
771                 final E entry = it.next();
772                 callback.process(entry);
773                 if (entry.isClosed()) {
774                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
775                     pool.remove(entry);
776                     it.remove();
777                 }
778             }
779             processPendingRequests();
780             purgePoolMap();
781         } finally {
782             this.lock.unlock();
783         }
784     }
785 
786     /**
787      * Enumerates all leased connections.
788      *
789      * @since 4.3
790      */
791     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
792         this.lock.lock();
793         try {
794             final Iterator<E> it = this.leased.iterator();
795             while (it.hasNext()) {
796                 final E entry = it.next();
797                 callback.process(entry);
798             }
799             processPendingRequests();
800         } finally {
801             this.lock.unlock();
802         }
803     }
804 
805     /**
806      * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
807      *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
808      *
809      * @deprecated (4.3.2)
810      */
811     @Deprecated
812     protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
813         while (it.hasNext()) {
814             final E entry = it.next();
815             callback.process(entry);
816         }
817         processPendingRequests();
818     }
819 
820     private void purgePoolMap() {
821         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
822         while (it.hasNext()) {
823             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
824             final RouteSpecificPool<T, C, E> pool = entry.getValue();
825             if (pool.getAllocatedCount() == 0) {
826                 it.remove();
827             }
828         }
829     }
830 
831     public void closeIdle(final long idletime, final TimeUnit timeUnit) {
832         Args.notNull(timeUnit, "Time unit");
833         long time = timeUnit.toMillis(idletime);
834         if (time < 0) {
835             time = 0;
836         }
837         final long deadline = System.currentTimeMillis() - time;
838         enumAvailable(new PoolEntryCallback<T, C>() {
839 
840             @Override
841             public void process(final PoolEntry<T, C> entry) {
842                 if (entry.getUpdated() <= deadline) {
843                     entry.close();
844                 }
845             }
846 
847         });
848     }
849 
850     public void closeExpired() {
851         final long now = System.currentTimeMillis();
852         enumAvailable(new PoolEntryCallback<T, C>() {
853 
854             @Override
855             public void process(final PoolEntry<T, C> entry) {
856                 if (entry.isExpired(now)) {
857                     entry.close();
858                 }
859             }
860 
861         });
862     }
863 
864     @Override
865     public String toString() {
866         final StringBuilder buffer = new StringBuilder();
867         buffer.append("[leased: ");
868         buffer.append(this.leased);
869         buffer.append("][available: ");
870         buffer.append(this.available);
871         buffer.append("][pending: ");
872         buffer.append(this.pending);
873         buffer.append("]");
874         return buffer.toString();
875     }
876 
877     class InternalSessionRequestCallback implements SessionRequestCallback {
878 
879         @Override
880         public void completed(final SessionRequest request) {
881             requestCompleted(request);
882         }
883 
884         @Override
885         public void cancelled(final SessionRequest request) {
886             requestCancelled(request);
887         }
888 
889         @Override
890         public void failed(final SessionRequest request) {
891             requestFailed(request);
892         }
893 
894         @Override
895         public void timeout(final SessionRequest request) {
896             requestTimeout(request);
897         }
898 
899     }
900 
901 }