View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Objects;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.ReentrantLock;
44  
45  import org.apache.hc.core5.annotation.Contract;
46  import org.apache.hc.core5.annotation.ThreadingBehavior;
47  import org.apache.hc.core5.concurrent.BasicFuture;
48  import org.apache.hc.core5.concurrent.FutureCallback;
49  import org.apache.hc.core5.function.Callback;
50  import org.apache.hc.core5.io.CloseMode;
51  import org.apache.hc.core5.io.ModalCloseable;
52  import org.apache.hc.core5.util.Args;
53  import org.apache.hc.core5.util.Asserts;
54  import org.apache.hc.core5.util.Deadline;
55  import org.apache.hc.core5.util.DeadlineTimeoutException;
56  import org.apache.hc.core5.util.TimeValue;
57  import org.apache.hc.core5.util.Timeout;
58  
59  /**
60   * Connection pool with strict connection limit guarantees.
61   *
62   * @param <T> route
63   * @param <C> connection object
64   *
65   * @since 4.2
66   */
67  @Contract(threading = ThreadingBehavior.SAFE)
68  public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
69  
70      private final TimeValue timeToLive;
71      private final PoolReusePolicy policy;
72      private final DisposalCallback<C> disposalCallback;
73      private final ConnPoolListener<T> connPoolListener;
74      private final Map<T, PerRoutePool<T, C>> routeToPool;
75      private final LinkedList<LeaseRequest<T, C>> pendingRequests;
76      private final Set<PoolEntry<T, C>> leased;
77      private final LinkedList<PoolEntry<T, C>> available;
78      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
79      private final Map<T, Integer> maxPerRoute;
80      private final ReentrantLock lock;
81      private final AtomicBoolean isShutDown;
82  
83      private volatile int defaultMaxPerRoute;
84      private volatile int maxTotal;
85  
86      /**
87       * @since 5.0
88       */
89      public StrictConnPool(
90              final int defaultMaxPerRoute,
91              final int maxTotal,
92              final TimeValue timeToLive,
93              final PoolReusePolicy policy,
94              final DisposalCallback<C> disposalCallback,
95              final ConnPoolListener<T> connPoolListener) {
96          super();
97          Args.positive(defaultMaxPerRoute, "Max per route value");
98          Args.positive(maxTotal, "Max total value");
99          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
100         this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
101         this.disposalCallback = disposalCallback;
102         this.connPoolListener = connPoolListener;
103         this.routeToPool = new HashMap<>();
104         this.pendingRequests = new LinkedList<>();
105         this.leased = new HashSet<>();
106         this.available = new LinkedList<>();
107         this.completedRequests = new ConcurrentLinkedQueue<>();
108         this.maxPerRoute = new HashMap<>();
109         this.lock = new ReentrantLock();
110         this.isShutDown = new AtomicBoolean(false);
111         this.defaultMaxPerRoute = defaultMaxPerRoute;
112         this.maxTotal = maxTotal;
113     }
114 
115     /**
116      * @since 5.0
117      */
118     public StrictConnPool(
119             final int defaultMaxPerRoute,
120             final int maxTotal,
121             final TimeValue timeToLive,
122             final PoolReusePolicy policy,
123             final ConnPoolListener<T> connPoolListener) {
124         this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
125     }
126 
127     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
128         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
129     }
130 
131     public boolean isShutdown() {
132         return this.isShutDown.get();
133     }
134 
135     @Override
136     public void close(final CloseMode closeMode) {
137         if (this.isShutDown.compareAndSet(false, true)) {
138             fireCallbacks();
139             this.lock.lock();
140             try {
141                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
142                     pool.shutdown(closeMode);
143                 }
144                 this.routeToPool.clear();
145                 this.leased.clear();
146                 this.available.clear();
147                 this.pendingRequests.clear();
148             } finally {
149                 this.lock.unlock();
150             }
151         }
152     }
153 
154     @Override
155     public void close() {
156         close(CloseMode.GRACEFUL);
157     }
158 
159     private PerRoutePool<T, C> getPool(final T route) {
160         PerRoutePool<T, C> pool = this.routeToPool.get(route);
161         if (pool == null) {
162             pool = new PerRoutePool<>(route, this.disposalCallback);
163             this.routeToPool.put(route, pool);
164         }
165         return pool;
166     }
167 
168     @Override
169     public Future<PoolEntry<T, C>> lease(
170             final T route, final Object state,
171             final Timeout requestTimeout,
172             final FutureCallback<PoolEntry<T, C>> callback) {
173         Args.notNull(route, "Route");
174         Args.notNull(requestTimeout, "Request timeout");
175         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
176         final Deadline deadline = Deadline.calculate(requestTimeout);
177         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
178 
179             @Override
180             public PoolEntry<T, C> get(
181                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
182                 try {
183                     return super.get(timeout, unit);
184                 } catch (final TimeoutException ex) {
185                     cancel();
186                     throw ex;
187                 }
188             }
189 
190         };
191         final boolean acquiredLock;
192 
193         try {
194             if (Timeout.isPositive(requestTimeout)) {
195                 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
196             } else {
197                 this.lock.lockInterruptibly();
198                 acquiredLock = true;
199             }
200         } catch (final InterruptedException interruptedException) {
201             Thread.currentThread().interrupt();
202             future.cancel();
203             return future;
204         }
205 
206         if (acquiredLock) {
207             try {
208                 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
209                 final boolean completed = processPendingRequest(request);
210                 if (!request.isDone() && !completed) {
211                     this.pendingRequests.add(request);
212                 }
213                 if (request.isDone()) {
214                     this.completedRequests.add(request);
215                 }
216             } finally {
217                 this.lock.unlock();
218             }
219             fireCallbacks();
220         } else {
221             future.failed(DeadlineTimeoutException.from(deadline));
222         }
223 
224         return future;
225     }
226 
227     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
228         return lease(route, state, Timeout.DISABLED, null);
229     }
230 
231     @Override
232     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
233         if (entry == null) {
234             return;
235         }
236         if (this.isShutDown.get()) {
237             return;
238         }
239         if (!reusable) {
240             entry.discardConnection(CloseMode.GRACEFUL);
241         }
242         this.lock.lock();
243         try {
244             if (this.leased.remove(entry)) {
245                 if (this.connPoolListener != null) {
246                     this.connPoolListener.onRelease(entry.getRoute(), this);
247                 }
248                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
249                 final boolean keepAlive = entry.hasConnection() && reusable;
250                 pool.free(entry, keepAlive);
251                 if (keepAlive) {
252                     switch (policy) {
253                         case LIFO:
254                             this.available.addFirst(entry);
255                             break;
256                         case FIFO:
257                             this.available.addLast(entry);
258                             break;
259                         default:
260                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
261                     }
262                 } else {
263                     entry.discardConnection(CloseMode.GRACEFUL);
264                 }
265                 processNextPendingRequest();
266             } else {
267                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
268             }
269         } finally {
270             this.lock.unlock();
271         }
272         fireCallbacks();
273     }
274 
275     private void processPendingRequests() {
276         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
277         while (it.hasNext()) {
278             final LeaseRequest<T, C> request = it.next();
279             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
280             if (future.isCancelled()) {
281                 it.remove();
282                 continue;
283             }
284             final boolean completed = processPendingRequest(request);
285             if (request.isDone() || completed) {
286                 it.remove();
287             }
288             if (request.isDone()) {
289                 this.completedRequests.add(request);
290             }
291         }
292     }
293 
294     private void processNextPendingRequest() {
295         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
296         while (it.hasNext()) {
297             final LeaseRequest<T, C> request = it.next();
298             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
299             if (future.isCancelled()) {
300                 it.remove();
301                 continue;
302             }
303             final boolean completed = processPendingRequest(request);
304             if (request.isDone() || completed) {
305                 it.remove();
306             }
307             if (request.isDone()) {
308                 this.completedRequests.add(request);
309             }
310             if (completed) {
311                 return;
312             }
313         }
314     }
315 
316     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
317         final T route = request.getRoute();
318         final Object state = request.getState();
319         final Deadline deadline = request.getDeadline();
320 
321         if (deadline.isExpired()) {
322             request.failed(DeadlineTimeoutException.from(deadline));
323             return false;
324         }
325 
326         final PerRoutePool<T, C> pool = getPool(route);
327         PoolEntry<T, C> entry;
328         for (;;) {
329             entry = pool.getFree(state);
330             if (entry == null) {
331                 break;
332             }
333             if (entry.getExpiryDeadline().isExpired()) {
334                 entry.discardConnection(CloseMode.GRACEFUL);
335                 this.available.remove(entry);
336                 pool.free(entry, false);
337             } else {
338                 break;
339             }
340         }
341         if (entry != null) {
342             this.available.remove(entry);
343             this.leased.add(entry);
344             request.completed(entry);
345             if (this.connPoolListener != null) {
346                 this.connPoolListener.onLease(entry.getRoute(), this);
347             }
348             return true;
349         }
350 
351         // New connection is needed
352         final int maxPerRoute = getMax(route);
353         // Shrink the pool prior to allocating a new connection
354         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
355         if (excess > 0) {
356             for (int i = 0; i < excess; i++) {
357                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
358                 if (lastUsed == null) {
359                     break;
360                 }
361                 lastUsed.discardConnection(CloseMode.GRACEFUL);
362                 this.available.remove(lastUsed);
363                 pool.remove(lastUsed);
364             }
365         }
366 
367         if (pool.getAllocatedCount() < maxPerRoute) {
368             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
369             if (freeCapacity == 0) {
370                 return false;
371             }
372             final int totalAvailable = this.available.size();
373             if (totalAvailable > freeCapacity - 1) {
374                 final PoolEntry<T, C> lastUsed = this.available.removeLast();
375                 lastUsed.discardConnection(CloseMode.GRACEFUL);
376                 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
377                 otherpool.remove(lastUsed);
378             }
379 
380             entry = pool.createEntry(this.timeToLive);
381             this.leased.add(entry);
382             request.completed(entry);
383             if (this.connPoolListener != null) {
384                 this.connPoolListener.onLease(entry.getRoute(), this);
385             }
386             return true;
387         }
388         return false;
389     }
390 
391     private void fireCallbacks() {
392         LeaseRequest<T, C> request;
393         while ((request = this.completedRequests.poll()) != null) {
394             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
395             final Exception ex = request.getException();
396             final PoolEntry<T, C> result = request.getResult();
397             boolean successfullyCompleted = false;
398             if (ex != null) {
399                 future.failed(ex);
400             } else if (result != null) {
401                 if (future.completed(result)) {
402                     successfullyCompleted = true;
403                 }
404             } else {
405                 future.cancel();
406             }
407             if (!successfullyCompleted) {
408                 release(result, true);
409             }
410         }
411     }
412 
413     public void validatePendingRequests() {
414         this.lock.lock();
415         try {
416             final long now = System.currentTimeMillis();
417             final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
418             while (it.hasNext()) {
419                 final LeaseRequest<T, C> request = it.next();
420                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
421                 if (future.isCancelled() && !request.isDone()) {
422                     it.remove();
423                 } else {
424                     final Deadline deadline = request.getDeadline();
425                     if (deadline.isBefore(now)) {
426                         request.failed(DeadlineTimeoutException.from(deadline));
427                     }
428                     if (request.isDone()) {
429                         it.remove();
430                         this.completedRequests.add(request);
431                     }
432                 }
433             }
434         } finally {
435             this.lock.unlock();
436         }
437         fireCallbacks();
438     }
439 
440     private int getMax(final T route) {
441         final Integer v = this.maxPerRoute.get(route);
442         if (v != null) {
443             return v;
444         }
445         return this.defaultMaxPerRoute;
446     }
447 
448     @Override
449     public void setMaxTotal(final int max) {
450         Args.positive(max, "Max value");
451         this.lock.lock();
452         try {
453             this.maxTotal = max;
454         } finally {
455             this.lock.unlock();
456         }
457     }
458 
459     @Override
460     public int getMaxTotal() {
461         this.lock.lock();
462         try {
463             return this.maxTotal;
464         } finally {
465             this.lock.unlock();
466         }
467     }
468 
469     @Override
470     public void setDefaultMaxPerRoute(final int max) {
471         Args.positive(max, "Max value");
472         this.lock.lock();
473         try {
474             this.defaultMaxPerRoute = max;
475         } finally {
476             this.lock.unlock();
477         }
478     }
479 
480     @Override
481     public int getDefaultMaxPerRoute() {
482         this.lock.lock();
483         try {
484             return this.defaultMaxPerRoute;
485         } finally {
486             this.lock.unlock();
487         }
488     }
489 
490     @Override
491     public void setMaxPerRoute(final T route, final int max) {
492         Args.notNull(route, "Route");
493         this.lock.lock();
494         try {
495             if (max > -1) {
496                 this.maxPerRoute.put(route, max);
497             } else {
498                 this.maxPerRoute.remove(route);
499             }
500         } finally {
501             this.lock.unlock();
502         }
503     }
504 
505     @Override
506     public int getMaxPerRoute(final T route) {
507         Args.notNull(route, "Route");
508         this.lock.lock();
509         try {
510             return getMax(route);
511         } finally {
512             this.lock.unlock();
513         }
514     }
515 
516     @Override
517     public PoolStats getTotalStats() {
518         this.lock.lock();
519         try {
520             int pendingCount = 0;
521             for (final LeaseRequest<T, C> request: pendingRequests) {
522                 if (!request.isDone()) {
523                     final Deadline deadline = request.getDeadline();
524                     if (!deadline.isExpired()) {
525                         pendingCount++;
526                     }
527                 }
528             }
529             return new PoolStats(
530                     this.leased.size(),
531                     pendingCount,
532                     this.available.size(),
533                     this.maxTotal);
534         } finally {
535             this.lock.unlock();
536         }
537     }
538 
539     @Override
540     public PoolStats getStats(final T route) {
541         Args.notNull(route, "Route");
542         this.lock.lock();
543         try {
544             final PerRoutePool<T, C> pool = getPool(route);
545             int pendingCount = 0;
546             for (final LeaseRequest<T, C> request: pendingRequests) {
547                 if (!request.isDone() && Objects.equals(route, request.getRoute())) {
548                     final Deadline deadline = request.getDeadline();
549                     if (!deadline.isExpired()) {
550                         pendingCount++;
551                     }
552                 }
553             }
554             return new PoolStats(
555                     pool.getLeasedCount(),
556                     pendingCount,
557                     pool.getAvailableCount(),
558                     getMax(route));
559         } finally {
560             this.lock.unlock();
561         }
562     }
563 
564     /**
565      * Returns snapshot of all knows routes
566      *
567      * @since 4.4
568      */
569     @Override
570     public Set<T> getRoutes() {
571         this.lock.lock();
572         try {
573             return new HashSet<>(routeToPool.keySet());
574         } finally {
575             this.lock.unlock();
576         }
577     }
578 
579     /**
580      * Enumerates all available connections.
581      *
582      * @since 4.3
583      */
584     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
585         this.lock.lock();
586         try {
587             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
588             while (it.hasNext()) {
589                 final PoolEntry<T, C> entry = it.next();
590                 callback.execute(entry);
591                 if (!entry.hasConnection()) {
592                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
593                     pool.remove(entry);
594                     it.remove();
595                 }
596             }
597             processPendingRequests();
598             purgePoolMap();
599         } finally {
600             this.lock.unlock();
601         }
602     }
603 
604     /**
605      * Enumerates all leased connections.
606      *
607      * @since 4.3
608      */
609     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
610         this.lock.lock();
611         try {
612             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
613             while (it.hasNext()) {
614                 final PoolEntry<T, C> entry = it.next();
615                 callback.execute(entry);
616             }
617             processPendingRequests();
618         } finally {
619             this.lock.unlock();
620         }
621     }
622 
623     private void purgePoolMap() {
624         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
625         while (it.hasNext()) {
626             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
627             final PerRoutePool<T, C> pool = entry.getValue();
628             if (pool.getAllocatedCount() == 0) {
629                 it.remove();
630             }
631         }
632     }
633 
634     @Override
635     public void closeIdle(final TimeValue idleTime) {
636         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
637         enumAvailable(entry -> {
638             if (entry.getUpdated() <= deadline) {
639                 entry.discardConnection(CloseMode.GRACEFUL);
640             }
641         });
642     }
643 
644     @Override
645     public void closeExpired() {
646         final long now = System.currentTimeMillis();
647         enumAvailable(entry -> {
648             if (entry.getExpiryDeadline().isBefore(now)) {
649                 entry.discardConnection(CloseMode.GRACEFUL);
650             }
651         });
652     }
653 
654     @Override
655     public String toString() {
656         final StringBuilder buffer = new StringBuilder();
657         buffer.append("[leased: ");
658         buffer.append(this.leased.size());
659         buffer.append("][available: ");
660         buffer.append(this.available.size());
661         buffer.append("][pending: ");
662         buffer.append(this.pendingRequests.size());
663         buffer.append("]");
664         return buffer.toString();
665     }
666 
667 
668     static class LeaseRequest<T, C extends ModalCloseable> {
669 
670         private final T route;
671         private final Object state;
672         private final Deadline deadline;
673         private final BasicFuture<PoolEntry<T, C>> future;
674         // 'completed' is used internally to guard setting
675         // 'result' and 'ex', but mustn't be used by 'isDone()'.
676         private final AtomicBoolean completed;
677         private volatile PoolEntry<T, C> result;
678         private volatile Exception ex;
679 
680         /**
681          * Constructor
682          *
683          * @param route route
684          * @param state state
685          * @param requestTimeout timeout to wait in a request queue until kicked off
686          * @param future future callback
687          */
688         public LeaseRequest(
689                 final T route,
690                 final Object state,
691                 final Timeout requestTimeout,
692                 final BasicFuture<PoolEntry<T, C>> future) {
693             super();
694             this.route = route;
695             this.state = state;
696             this.deadline = Deadline.calculate(requestTimeout);
697             this.future = future;
698             this.completed = new AtomicBoolean(false);
699         }
700 
701         public T getRoute() {
702             return this.route;
703         }
704 
705         public Object getState() {
706             return this.state;
707         }
708 
709         public Deadline getDeadline() {
710             return this.deadline;
711         }
712 
713         public boolean isDone() {
714             // This method must not use 'completed.get()' which would result in a race
715             // where a caller may observe completed=true while neither result nor ex
716             // have been set yet.
717             return ex != null || result != null;
718         }
719 
720         public void failed(final Exception ex) {
721             if (this.completed.compareAndSet(false, true)) {
722                 this.ex = ex;
723             }
724         }
725 
726         public void completed(final PoolEntry<T, C> result) {
727             if (this.completed.compareAndSet(false, true)) {
728                 this.result = result;
729             }
730         }
731 
732         public BasicFuture<PoolEntry<T, C>> getFuture() {
733             return this.future;
734         }
735 
736         public PoolEntry<T, C> getResult() {
737             return this.result;
738         }
739 
740         public Exception getException() {
741             return this.ex;
742         }
743 
744         @Override
745         public String toString() {
746             final StringBuilder buffer = new StringBuilder();
747             buffer.append("[");
748             buffer.append(this.route);
749             buffer.append("][");
750             buffer.append(this.state);
751             buffer.append("]");
752             return buffer.toString();
753         }
754 
755     }
756 
757     static class PerRoutePool<T, C extends ModalCloseable> {
758 
759         private final T route;
760         private final Set<PoolEntry<T, C>> leased;
761         private final LinkedList<PoolEntry<T, C>> available;
762         private final DisposalCallback<C> disposalCallback;
763 
764         PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
765             super();
766             this.route = route;
767             this.disposalCallback = disposalCallback;
768             this.leased = new HashSet<>();
769             this.available = new LinkedList<>();
770         }
771 
772         public final T getRoute() {
773             return route;
774         }
775 
776         public int getLeasedCount() {
777             return this.leased.size();
778         }
779 
780         public int getAvailableCount() {
781             return this.available.size();
782         }
783 
784         public int getAllocatedCount() {
785             return this.available.size() + this.leased.size();
786         }
787 
788         public PoolEntry<T, C> getFree(final Object state) {
789             if (!this.available.isEmpty()) {
790                 if (state != null) {
791                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
792                     while (it.hasNext()) {
793                         final PoolEntry<T, C> entry = it.next();
794                         if (state.equals(entry.getState())) {
795                             it.remove();
796                             this.leased.add(entry);
797                             return entry;
798                         }
799                     }
800                 }
801                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
802                 while (it.hasNext()) {
803                     final PoolEntry<T, C> entry = it.next();
804                     if (entry.getState() == null) {
805                         it.remove();
806                         this.leased.add(entry);
807                         return entry;
808                     }
809                 }
810             }
811             return null;
812         }
813 
814         public PoolEntry<T, C> getLastUsed() {
815             return this.available.peekLast();
816         }
817 
818         public boolean remove(final PoolEntry<T, C> entry) {
819             return this.available.remove(entry) || this.leased.remove(entry);
820         }
821 
822         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
823             final boolean found = this.leased.remove(entry);
824             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
825             if (reusable) {
826                 this.available.addFirst(entry);
827             }
828         }
829 
830         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
831             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
832             this.leased.add(entry);
833             return entry;
834         }
835 
836         public void shutdown(final CloseMode closeMode) {
837             PoolEntry<T, C> availableEntry;
838             while ((availableEntry = available.poll()) != null) {
839                 availableEntry.discardConnection(closeMode);
840             }
841             for (final PoolEntry<T, C> entry: this.leased) {
842                 entry.discardConnection(closeMode);
843             }
844             this.leased.clear();
845         }
846 
847         @Override
848         public String toString() {
849             final StringBuilder buffer = new StringBuilder();
850             buffer.append("[route: ");
851             buffer.append(this.route);
852             buffer.append("][leased: ");
853             buffer.append(this.leased.size());
854             buffer.append("][available: ");
855             buffer.append(this.available.size());
856             buffer.append("]");
857             return buffer.toString();
858         }
859 
860     }
861 }