View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.nio.pool;
28  
29  import java.io.IOException;
30  import java.net.SocketAddress;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.ListIterator;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.concurrent.ConcurrentLinkedQueue;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.Future;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.locks.Lock;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.http.annotation.Contract;
48  import org.apache.http.annotation.ThreadingBehavior;
49  import org.apache.http.concurrent.BasicFuture;
50  import org.apache.http.concurrent.FutureCallback;
51  import org.apache.http.nio.reactor.ConnectingIOReactor;
52  import org.apache.http.nio.reactor.IOReactorStatus;
53  import org.apache.http.nio.reactor.IOSession;
54  import org.apache.http.nio.reactor.SessionRequest;
55  import org.apache.http.nio.reactor.SessionRequestCallback;
56  import org.apache.http.pool.ConnPool;
57  import org.apache.http.pool.ConnPoolControl;
58  import org.apache.http.pool.PoolEntry;
59  import org.apache.http.pool.PoolEntryCallback;
60  import org.apache.http.pool.PoolStats;
61  import org.apache.http.util.Args;
62  import org.apache.http.util.Asserts;
63  import org.apache.http.util.LangUtils;
64  
65  /**
66   * Abstract non-blocking connection pool.
67   *
68   * @param <T> route
69   * @param <C> connection object
70   * @param <E> pool entry
71   *
72   * @since 4.2
73   */
74  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
75  public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
76                                                    implements ConnPool<T, E>, ConnPoolControl<T> {
77  
78      private final ConnectingIOReactor ioReactor;
79      private final NIOConnFactory<T, C> connFactory;
80      private final SocketAddressResolver<T> addressResolver;
81      private final SessionRequestCallback sessionRequestCallback;
82      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
83      private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
84      private final Set<SessionRequest> pending;
85      private final Set<E> leased;
86      private final LinkedList<E> available;
87      private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
88      private final Map<T, Integer> maxPerRoute;
89      private final Lock lock;
90      private final AtomicBoolean isShutDown;
91  
92      private volatile int defaultMaxPerRoute;
93      private volatile int maxTotal;
94  
95      /**
96       * @deprecated use {@link AbstractNIOConnPool#AbstractNIOConnPool(ConnectingIOReactor,
97       *   NIOConnFactory, SocketAddressResolver, int, int)}
98       */
99      @Deprecated
100     public AbstractNIOConnPool(
101             final ConnectingIOReactor ioReactor,
102             final NIOConnFactory<T, C> connFactory,
103             final int defaultMaxPerRoute,
104             final int maxTotal) {
105         super();
106         Args.notNull(ioReactor, "I/O reactor");
107         Args.notNull(connFactory, "Connection factory");
108         Args.positive(defaultMaxPerRoute, "Max per route value");
109         Args.positive(maxTotal, "Max total value");
110         this.ioReactor = ioReactor;
111         this.connFactory = connFactory;
112         this.addressResolver = new SocketAddressResolver<T>() {
113 
114             @Override
115             public SocketAddress resolveLocalAddress(final T route) throws IOException {
116                 return AbstractNIOConnPool.this.resolveLocalAddress(route);
117             }
118 
119             @Override
120             public SocketAddress resolveRemoteAddress(final T route) throws IOException {
121                 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
122             }
123 
124         };
125         this.sessionRequestCallback = new InternalSessionRequestCallback();
126         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
127         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
128         this.pending = new HashSet<SessionRequest>();
129         this.leased = new HashSet<E>();
130         this.available = new LinkedList<E>();
131         this.maxPerRoute = new HashMap<T, Integer>();
132         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
133         this.lock = new ReentrantLock();
134         this.isShutDown = new AtomicBoolean(false);
135         this.defaultMaxPerRoute = defaultMaxPerRoute;
136         this.maxTotal = maxTotal;
137     }
138 
139     /**
140      * @since 4.3
141      */
142     public AbstractNIOConnPool(
143             final ConnectingIOReactor ioReactor,
144             final NIOConnFactory<T, C> connFactory,
145             final SocketAddressResolver<T> addressResolver,
146             final int defaultMaxPerRoute,
147             final int maxTotal) {
148         super();
149         Args.notNull(ioReactor, "I/O reactor");
150         Args.notNull(connFactory, "Connection factory");
151         Args.notNull(addressResolver, "Address resolver");
152         Args.positive(defaultMaxPerRoute, "Max per route value");
153         Args.positive(maxTotal, "Max total value");
154         this.ioReactor = ioReactor;
155         this.connFactory = connFactory;
156         this.addressResolver = addressResolver;
157         this.sessionRequestCallback = new InternalSessionRequestCallback();
158         this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
159         this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
160         this.pending = new HashSet<SessionRequest>();
161         this.leased = new HashSet<E>();
162         this.available = new LinkedList<E>();
163         this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
164         this.maxPerRoute = new HashMap<T, Integer>();
165         this.lock = new ReentrantLock();
166         this.isShutDown = new AtomicBoolean(false);
167         this.defaultMaxPerRoute = defaultMaxPerRoute;
168         this.maxTotal = maxTotal;
169     }
170 
171     /**
172      * @deprecated (4.3) use {@link SocketAddressResolver}
173      */
174     @Deprecated
175     protected SocketAddress resolveRemoteAddress(final T route) {
176         return null;
177     }
178 
179     /**
180      * @deprecated (4.3) use {@link SocketAddressResolver}
181      */
182     @Deprecated
183     protected SocketAddress resolveLocalAddress(final T route) {
184         return null;
185     }
186 
187     protected abstract E createEntry(T route, C conn);
188 
189     /**
190      * @since 4.3
191      */
192     protected void onLease(final E entry) {
193     }
194 
195     /**
196      * @since 4.3
197      */
198     protected void onRelease(final E entry) {
199     }
200 
201     /**
202      * @since 4.4
203      */
204     protected void onReuse(final E entry) {
205     }
206 
207     public boolean isShutdown() {
208         return this.isShutDown.get();
209     }
210 
211     public void shutdown(final long waitMs) throws IOException {
212         if (this.isShutDown.compareAndSet(false, true)) {
213             fireCallbacks();
214             this.lock.lock();
215             try {
216                 for (final SessionRequest sessionRequest: this.pending) {
217                     sessionRequest.cancel();
218                 }
219                 for (final E entry: this.available) {
220                     entry.close();
221                 }
222                 for (final E entry: this.leased) {
223                     entry.close();
224                 }
225                 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
226                     pool.shutdown();
227                 }
228                 this.routeToPool.clear();
229                 this.leased.clear();
230                 this.pending.clear();
231                 this.available.clear();
232                 this.leasingRequests.clear();
233                 this.ioReactor.shutdown(waitMs);
234             } finally {
235                 this.lock.unlock();
236             }
237         }
238     }
239 
240     private RouteSpecificPool<T, C, E> getPool(final T route) {
241         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
242         if (pool == null) {
243             pool = new RouteSpecificPool<T, C, E>(route) {
244 
245                 @Override
246                 protected E createEntry(final T route, final C conn) {
247                     return AbstractNIOConnPool.this.createEntry(route, conn);
248                 }
249 
250             };
251             this.routeToPool.put(route, pool);
252         }
253         return pool;
254     }
255 
256     public Future<E> lease(
257             final T route, final Object state,
258             final long connectTimeout, final TimeUnit timeUnit,
259             final FutureCallback<E> callback) {
260         return this.lease(route, state, connectTimeout, connectTimeout, timeUnit, callback);
261     }
262 
263     /**
264      * @since 4.3
265      */
266     public Future<E> lease(
267             final T route, final Object state,
268             final long connectTimeout, final long leaseTimeout, final TimeUnit timeUnit,
269             final FutureCallback<E> callback) {
270         Args.notNull(route, "Route");
271         Args.notNull(timeUnit, "Time unit");
272         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
273         final BasicFuture<E> future = new BasicFuture<E>(callback);
274         final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state,
275                 connectTimeout >= 0 ? timeUnit.toMillis(connectTimeout) : -1,
276                 leaseTimeout > 0 ? timeUnit.toMillis(leaseTimeout) : 0,
277                 future);
278         this.lock.lock();
279         try {
280             final boolean completed = processPendingRequest(leaseRequest);
281             if (!leaseRequest.isDone() && !completed) {
282                 this.leasingRequests.add(leaseRequest);
283             }
284             if (leaseRequest.isDone()) {
285                 this.completedRequests.add(leaseRequest);
286             }
287         } finally {
288             this.lock.unlock();
289         }
290         fireCallbacks();
291         return new Future<E>() {
292 
293             @Override
294             public E get() throws InterruptedException, ExecutionException {
295                 return future.get();
296             }
297 
298             @Override
299             public E get(
300                     final long timeout,
301                     final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
302                 return future.get(timeout, unit);
303             }
304 
305             @Override
306             public boolean cancel(final boolean mayInterruptIfRunning) {
307                 try {
308                     leaseRequest.cancel();
309                 } finally {
310                     return future.cancel(mayInterruptIfRunning);
311                 }
312             }
313 
314             @Override
315             public boolean isCancelled() {
316                 return future.isCancelled();
317             }
318 
319             @Override
320             public boolean isDone() {
321                 return future.isDone();
322             }
323 
324         };
325     }
326 
327     @Override
328     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
329         return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
330     }
331 
332     public Future<E> lease(final T route, final Object state) {
333         return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
334     }
335 
336     @Override
337     public void release(final E entry, final boolean reusable) {
338         if (entry == null) {
339             return;
340         }
341         if (this.isShutDown.get()) {
342             return;
343         }
344         this.lock.lock();
345         try {
346             if (this.leased.remove(entry)) {
347                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
348                 pool.free(entry, reusable);
349                 if (reusable) {
350                     this.available.addFirst(entry);
351                     onRelease(entry);
352                 } else {
353                     entry.close();
354                 }
355                 processNextPendingRequest();
356             }
357         } finally {
358             this.lock.unlock();
359         }
360         fireCallbacks();
361     }
362 
363     private void processPendingRequests() {
364         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
365         while (it.hasNext()) {
366             final LeaseRequest<T, C, E> request = it.next();
367             final BasicFuture<E> future = request.getFuture();
368             if (future.isCancelled()) {
369                 it.remove();
370                 continue;
371             }
372             final boolean completed = processPendingRequest(request);
373             if (request.isDone() || completed) {
374                 it.remove();
375             }
376             if (request.isDone()) {
377                 this.completedRequests.add(request);
378             }
379         }
380     }
381 
382     private void processNextPendingRequest() {
383         final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
384         while (it.hasNext()) {
385             final LeaseRequest<T, C, E> request = it.next();
386             final BasicFuture<E> future = request.getFuture();
387             if (future.isCancelled()) {
388                 it.remove();
389                 continue;
390             }
391             final boolean completed = processPendingRequest(request);
392             if (request.isDone() || completed) {
393                 it.remove();
394             }
395             if (request.isDone()) {
396                 this.completedRequests.add(request);
397             }
398             if (completed) {
399                 return;
400             }
401         }
402     }
403 
404     private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
405         final T route = request.getRoute();
406         final Object state = request.getState();
407         final long deadline = request.getDeadline();
408 
409         final long now = System.currentTimeMillis();
410         if (now > deadline) {
411             request.failed(new TimeoutException("Connection lease request time out"));
412             return false;
413         }
414 
415         final RouteSpecificPool<T, C, E> pool = getPool(route);
416         E entry;
417         for (;;) {
418             entry = pool.getFree(state);
419             if (entry == null) {
420                 break;
421             }
422             if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
423                 entry.close();
424                 this.available.remove(entry);
425                 pool.free(entry, false);
426             } else {
427                 break;
428             }
429         }
430         if (entry != null) {
431             this.available.remove(entry);
432             this.leased.add(entry);
433             request.completed(entry);
434             onReuse(entry);
435             onLease(entry);
436             return true;
437         }
438 
439         // New connection is needed
440         final int maxPerRoute = getMax(route);
441         // Shrink the pool prior to allocating a new connection
442         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
443         if (excess > 0) {
444             for (int i = 0; i < excess; i++) {
445                 final E lastUsed = pool.getLastUsed();
446                 if (lastUsed == null) {
447                     break;
448                 }
449                 lastUsed.close();
450                 this.available.remove(lastUsed);
451                 pool.remove(lastUsed);
452             }
453         }
454 
455         if (pool.getAllocatedCount() < maxPerRoute) {
456             final int totalUsed = this.pending.size() + this.leased.size();
457             final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
458             if (freeCapacity == 0) {
459                 return false;
460             }
461             final int totalAvailable = this.available.size();
462             if (totalAvailable > freeCapacity - 1) {
463                 if (!this.available.isEmpty()) {
464                     final E lastUsed = this.available.removeLast();
465                     lastUsed.close();
466                     final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
467                     otherpool.remove(lastUsed);
468                 }
469             }
470 
471             final SocketAddress localAddress;
472             final SocketAddress remoteAddress;
473             try {
474                 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
475                 localAddress = this.addressResolver.resolveLocalAddress(route);
476             } catch (final IOException ex) {
477                 request.failed(ex);
478                 return false;
479             }
480 
481             final SessionRequest sessionRequest = this.ioReactor.connect(
482                     remoteAddress, localAddress, route, this.sessionRequestCallback);
483             request.attachSessionRequest(sessionRequest);
484             final long connectTimeout = request.getConnectTimeout();
485             if (connectTimeout >= 0) {
486                 sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE);
487             }
488             this.pending.add(sessionRequest);
489             pool.addPending(sessionRequest, request.getFuture());
490             return true;
491         }
492         return false;
493     }
494 
495     private void fireCallbacks() {
496         LeaseRequest<T, C, E> request;
497         while ((request = this.completedRequests.poll()) != null) {
498             final BasicFuture<E> future = request.getFuture();
499             final Exception ex = request.getException();
500             final E result = request.getResult();
501             boolean successfullyCompleted = false;
502             if (ex != null) {
503                 future.failed(ex);
504             } else if (result != null) {
505                 if (future.completed(result)) {
506                     successfullyCompleted = true;
507                 }
508             } else {
509                 future.cancel();
510             }
511             if (!successfullyCompleted) {
512                 release(result, true);
513             }
514         }
515     }
516 
517     public void validatePendingRequests() {
518         this.lock.lock();
519         try {
520             final long now = System.currentTimeMillis();
521             final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
522             while (it.hasNext()) {
523                 final LeaseRequest<T, C, E> request = it.next();
524                 final BasicFuture<E> future = request.getFuture();
525                 if (future.isCancelled() && !request.isDone()) {
526                     it.remove();
527                 } else {
528                     final long deadline = request.getDeadline();
529                     if (now > deadline) {
530                         request.failed(new TimeoutException("Connection lease request time out"));
531                     }
532                     if (request.isDone()) {
533                         it.remove();
534                         this.completedRequests.add(request);
535                     }
536                 }
537             }
538         } finally {
539             this.lock.unlock();
540         }
541         fireCallbacks();
542     }
543 
544     protected void requestCompleted(final SessionRequest request) {
545         if (this.isShutDown.get()) {
546             final IOSession session = request.getSession();
547             if (session != null) {
548                 session.close();
549             }
550             return;
551         }
552         @SuppressWarnings("unchecked")
553         final
554         T route = (T) request.getAttachment();
555         this.lock.lock();
556         try {
557             this.pending.remove(request);
558             final RouteSpecificPool<T, C, E> pool = getPool(route);
559             final IOSession session = request.getSession();
560             try {
561                 final C conn = this.connFactory.create(route, session);
562                 final E entry = pool.createEntry(request, conn);
563                 if (pool.completed(request, entry)) {
564                     this.leased.add(entry);
565                     onLease(entry);
566                 } else {
567                     this.available.add(entry);
568                     if (this.ioReactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
569                         processNextPendingRequest();
570                     }
571                 }
572             } catch (final IOException ex) {
573                 pool.failed(request, ex);
574             }
575         } finally {
576             this.lock.unlock();
577         }
578         fireCallbacks();
579     }
580 
581     protected void requestCancelled(final SessionRequest request) {
582         if (this.isShutDown.get()) {
583             return;
584         }
585         @SuppressWarnings("unchecked")
586         final
587         T route = (T) request.getAttachment();
588         this.lock.lock();
589         try {
590             this.pending.remove(request);
591             final RouteSpecificPool<T, C, E> pool = getPool(route);
592             pool.cancelled(request);
593             if (this.ioReactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
594                 processNextPendingRequest();
595             }
596         } finally {
597             this.lock.unlock();
598         }
599         fireCallbacks();
600     }
601 
602     protected void requestFailed(final SessionRequest request) {
603         if (this.isShutDown.get()) {
604             return;
605         }
606         @SuppressWarnings("unchecked")
607         final
608         T route = (T) request.getAttachment();
609         this.lock.lock();
610         try {
611             this.pending.remove(request);
612             final RouteSpecificPool<T, C, E> pool = getPool(route);
613             pool.failed(request, request.getException());
614             processNextPendingRequest();
615         } finally {
616             this.lock.unlock();
617         }
618         fireCallbacks();
619     }
620 
621     protected void requestTimeout(final SessionRequest request) {
622         if (this.isShutDown.get()) {
623             return;
624         }
625         @SuppressWarnings("unchecked")
626         final
627         T route = (T) request.getAttachment();
628         this.lock.lock();
629         try {
630             this.pending.remove(request);
631             final RouteSpecificPool<T, C, E> pool = getPool(route);
632             pool.timeout(request);
633             processNextPendingRequest();
634         } finally {
635             this.lock.unlock();
636         }
637         fireCallbacks();
638     }
639 
640     private int getMax(final T route) {
641         final Integer v = this.maxPerRoute.get(route);
642         return v != null ? v.intValue() : this.defaultMaxPerRoute;
643     }
644 
645     @Override
646     public void setMaxTotal(final int max) {
647         Args.positive(max, "Max value");
648         this.lock.lock();
649         try {
650             this.maxTotal = max;
651         } finally {
652             this.lock.unlock();
653         }
654     }
655 
656     @Override
657     public int getMaxTotal() {
658         this.lock.lock();
659         try {
660             return this.maxTotal;
661         } finally {
662             this.lock.unlock();
663         }
664     }
665 
666     @Override
667     public void setDefaultMaxPerRoute(final int max) {
668         Args.positive(max, "Max value");
669         this.lock.lock();
670         try {
671             this.defaultMaxPerRoute = max;
672         } finally {
673             this.lock.unlock();
674         }
675     }
676 
677     @Override
678     public int getDefaultMaxPerRoute() {
679         this.lock.lock();
680         try {
681             return this.defaultMaxPerRoute;
682         } finally {
683             this.lock.unlock();
684         }
685     }
686 
687     @Override
688     public void setMaxPerRoute(final T route, final int max) {
689         Args.notNull(route, "Route");
690         this.lock.lock();
691         try {
692             if (max > -1) {
693                 this.maxPerRoute.put(route, Integer.valueOf(max));
694             } else {
695                 this.maxPerRoute.remove(route);
696             }
697         } finally {
698             this.lock.unlock();
699         }
700     }
701 
702     @Override
703     public int getMaxPerRoute(final T route) {
704         Args.notNull(route, "Route");
705         this.lock.lock();
706         try {
707             return getMax(route);
708         } finally {
709             this.lock.unlock();
710         }
711     }
712 
713     @Override
714     public PoolStats getTotalStats() {
715         this.lock.lock();
716         try {
717             return new PoolStats(
718                     this.leased.size(),
719                     this.pending.size(),
720                     this.available.size(),
721                     this.maxTotal);
722         } finally {
723             this.lock.unlock();
724         }
725     }
726 
727     @Override
728     public PoolStats getStats(final T route) {
729         Args.notNull(route, "Route");
730         this.lock.lock();
731         try {
732             final RouteSpecificPool<T, C, E> pool = getPool(route);
733             int pendingCount = 0;
734             for (final LeaseRequest<T, C, E> request: leasingRequests) {
735                 if (LangUtils.equals(route, request.getRoute())) {
736                     pendingCount++;
737                 }
738             }
739             return new PoolStats(
740                     pool.getLeasedCount(),
741                     pendingCount + pool.getPendingCount(),
742                     pool.getAvailableCount(),
743                     getMax(route));
744         } finally {
745             this.lock.unlock();
746         }
747     }
748 
749     /**
750      * Returns snapshot of all knows routes
751      *
752      * @since 4.4
753      */
754     public Set<T> getRoutes() {
755         this.lock.lock();
756         try {
757             return new HashSet<T>(routeToPool.keySet());
758         } finally {
759             this.lock.unlock();
760         }
761     }
762 
763     /**
764      * Enumerates all available connections.
765      *
766      * @since 4.3
767      */
768     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
769         this.lock.lock();
770         try {
771             final Iterator<E> it = this.available.iterator();
772             while (it.hasNext()) {
773                 final E entry = it.next();
774                 callback.process(entry);
775                 if (entry.isClosed()) {
776                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
777                     pool.remove(entry);
778                     it.remove();
779                 }
780             }
781             processPendingRequests();
782             purgePoolMap();
783         } finally {
784             this.lock.unlock();
785         }
786     }
787 
788     /**
789      * Enumerates all leased connections.
790      *
791      * @since 4.3
792      */
793     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
794         this.lock.lock();
795         try {
796             final Iterator<E> it = this.leased.iterator();
797             while (it.hasNext()) {
798                 final E entry = it.next();
799                 callback.process(entry);
800             }
801             processPendingRequests();
802         } finally {
803             this.lock.unlock();
804         }
805     }
806 
807     /**
808      * Use {@link #enumLeased(org.apache.http.pool.PoolEntryCallback)}
809      *  or {@link #enumAvailable(org.apache.http.pool.PoolEntryCallback)} instead.
810      *
811      * @deprecated (4.3.2)
812      */
813     @Deprecated
814     protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
815         while (it.hasNext()) {
816             final E entry = it.next();
817             callback.process(entry);
818         }
819         processPendingRequests();
820     }
821 
822     private void purgePoolMap() {
823         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
824         while (it.hasNext()) {
825             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
826             final RouteSpecificPool<T, C, E> pool = entry.getValue();
827             if (pool.getAllocatedCount() == 0) {
828                 it.remove();
829             }
830         }
831     }
832 
833     public void closeIdle(final long idletime, final TimeUnit timeUnit) {
834         Args.notNull(timeUnit, "Time unit");
835         long time = timeUnit.toMillis(idletime);
836         if (time < 0) {
837             time = 0;
838         }
839         final long deadline = System.currentTimeMillis() - time;
840         enumAvailable(new PoolEntryCallback<T, C>() {
841 
842             @Override
843             public void process(final PoolEntry<T, C> entry) {
844                 if (entry.getUpdated() <= deadline) {
845                     entry.close();
846                 }
847             }
848 
849         });
850     }
851 
852     public void closeExpired() {
853         final long now = System.currentTimeMillis();
854         enumAvailable(new PoolEntryCallback<T, C>() {
855 
856             @Override
857             public void process(final PoolEntry<T, C> entry) {
858                 if (entry.isExpired(now)) {
859                     entry.close();
860                 }
861             }
862 
863         });
864     }
865 
866     @Override
867     public String toString() {
868         final StringBuilder buffer = new StringBuilder();
869         buffer.append("[leased: ");
870         buffer.append(this.leased);
871         buffer.append("][available: ");
872         buffer.append(this.available);
873         buffer.append("][pending: ");
874         buffer.append(this.pending);
875         buffer.append("]");
876         return buffer.toString();
877     }
878 
879     class InternalSessionRequestCallback implements SessionRequestCallback {
880 
881         @Override
882         public void completed(final SessionRequest request) {
883             requestCompleted(request);
884         }
885 
886         @Override
887         public void cancelled(final SessionRequest request) {
888             requestCancelled(request);
889         }
890 
891         @Override
892         public void failed(final SessionRequest request) {
893             requestFailed(request);
894         }
895 
896         @Override
897         public void timeout(final SessionRequest request) {
898             requestTimeout(request);
899         }
900 
901     }
902 
903 }