View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.LinkedList;
33  import java.util.ListIterator;
34  import java.util.Map;
35  import java.util.Objects;
36  import java.util.Set;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.hc.core5.annotation.Contract;
47  import org.apache.hc.core5.annotation.ThreadingBehavior;
48  import org.apache.hc.core5.concurrent.BasicFuture;
49  import org.apache.hc.core5.concurrent.FutureCallback;
50  import org.apache.hc.core5.function.Callback;
51  import org.apache.hc.core5.io.CloseMode;
52  import org.apache.hc.core5.io.ModalCloseable;
53  import org.apache.hc.core5.util.Args;
54  import org.apache.hc.core5.util.Asserts;
55  import org.apache.hc.core5.util.Deadline;
56  import org.apache.hc.core5.util.DeadlineTimeoutException;
57  import org.apache.hc.core5.util.TimeValue;
58  import org.apache.hc.core5.util.Timeout;
59  
60  /**
61   * Connection pool with strict connection limit guarantees.
62   *
63   * @param <T> route
64   * @param <C> connection object
65   *
66   * @since 4.2
67   */
68  @Contract(threading = ThreadingBehavior.SAFE)
69  public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70  
71      private final TimeValue timeToLive;
72      private final PoolReusePolicy policy;
73      private final DisposalCallback<C> disposalCallback;
74      private final ConnPoolListener<T> connPoolListener;
75      private final Map<T, PerRoutePool<T, C>> routeToPool;
76      private final LinkedList<LeaseRequest<T, C>> pendingRequests;
77      private final Set<PoolEntry<T, C>> leased;
78      private final LinkedList<PoolEntry<T, C>> available;
79      private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80      private final Map<T, Integer> maxPerRoute;
81      private final Lock lock;
82      private final AtomicBoolean isShutDown;
83  
84      private volatile int defaultMaxPerRoute;
85      private volatile int maxTotal;
86  
87      /**
88       * @since 5.0
89       */
90      public StrictConnPool(
91              final int defaultMaxPerRoute,
92              final int maxTotal,
93              final TimeValue timeToLive,
94              final PoolReusePolicy policy,
95              final DisposalCallback<C> disposalCallback,
96              final ConnPoolListener<T> connPoolListener) {
97          super();
98          Args.positive(defaultMaxPerRoute, "Max per route value");
99          Args.positive(maxTotal, "Max total value");
100         this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101         this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102         this.disposalCallback = disposalCallback;
103         this.connPoolListener = connPoolListener;
104         this.routeToPool = new HashMap<>();
105         this.pendingRequests = new LinkedList<>();
106         this.leased = new HashSet<>();
107         this.available = new LinkedList<>();
108         this.completedRequests = new ConcurrentLinkedQueue<>();
109         this.maxPerRoute = new HashMap<>();
110         this.lock = new ReentrantLock();
111         this.isShutDown = new AtomicBoolean(false);
112         this.defaultMaxPerRoute = defaultMaxPerRoute;
113         this.maxTotal = maxTotal;
114     }
115 
116     /**
117      * @since 5.0
118      */
119     public StrictConnPool(
120             final int defaultMaxPerRoute,
121             final int maxTotal,
122             final TimeValue timeToLive,
123             final PoolReusePolicy policy,
124             final ConnPoolListener<T> connPoolListener) {
125         this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126     }
127 
128     public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129         this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130     }
131 
132     public boolean isShutdown() {
133         return this.isShutDown.get();
134     }
135 
136     @Override
137     public void close(final CloseMode closeMode) {
138         if (this.isShutDown.compareAndSet(false, true)) {
139             fireCallbacks();
140             this.lock.lock();
141             try {
142                 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143                     pool.shutdown(closeMode);
144                 }
145                 this.routeToPool.clear();
146                 this.leased.clear();
147                 this.available.clear();
148                 this.pendingRequests.clear();
149             } finally {
150                 this.lock.unlock();
151             }
152         }
153     }
154 
155     @Override
156     public void close() {
157         close(CloseMode.GRACEFUL);
158     }
159 
160     private PerRoutePool<T, C> getPool(final T route) {
161         PerRoutePool<T, C> pool = this.routeToPool.get(route);
162         if (pool == null) {
163             pool = new PerRoutePool<>(route, this.disposalCallback);
164             this.routeToPool.put(route, pool);
165         }
166         return pool;
167     }
168 
169     @Override
170     public Future<PoolEntry<T, C>> lease(
171             final T route, final Object state,
172             final Timeout requestTimeout,
173             final FutureCallback<PoolEntry<T, C>> callback) {
174         Args.notNull(route, "Route");
175         Args.notNull(requestTimeout, "Request timeout");
176         Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177         final Deadline deadline = Deadline.calculate(requestTimeout);
178         final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179 
180             @Override
181             public synchronized PoolEntry<T, C> get(
182                     final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183                 try {
184                     return super.get(timeout, unit);
185                 } catch (final TimeoutException ex) {
186                     cancel();
187                     throw ex;
188                 }
189             }
190 
191         };
192         final boolean acquiredLock;
193 
194         try {
195             if (Timeout.isPositive(requestTimeout)) {
196                 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
197             } else {
198                 this.lock.lockInterruptibly();
199                 acquiredLock = true;
200             }
201         } catch (final InterruptedException interruptedException) {
202             Thread.currentThread().interrupt();
203             future.cancel();
204             return future;
205         }
206 
207         if (acquiredLock) {
208             try {
209                 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
210                 final boolean completed = processPendingRequest(request);
211                 if (!request.isDone() && !completed) {
212                     this.pendingRequests.add(request);
213                 }
214                 if (request.isDone()) {
215                     this.completedRequests.add(request);
216                 }
217             } finally {
218                 this.lock.unlock();
219             }
220             fireCallbacks();
221         } else {
222             future.failed(DeadlineTimeoutException.from(deadline));
223         }
224 
225         return future;
226     }
227 
228     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
229         return lease(route, state, Timeout.DISABLED, null);
230     }
231 
232     @Override
233     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
234         if (entry == null) {
235             return;
236         }
237         if (this.isShutDown.get()) {
238             return;
239         }
240         if (!reusable) {
241             entry.discardConnection(CloseMode.GRACEFUL);
242         }
243         this.lock.lock();
244         try {
245             if (this.leased.remove(entry)) {
246                 if (this.connPoolListener != null) {
247                     this.connPoolListener.onRelease(entry.getRoute(), this);
248                 }
249                 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
250                 final boolean keepAlive = entry.hasConnection() && reusable;
251                 pool.free(entry, keepAlive);
252                 if (keepAlive) {
253                     switch (policy) {
254                         case LIFO:
255                             this.available.addFirst(entry);
256                             break;
257                         case FIFO:
258                             this.available.addLast(entry);
259                             break;
260                         default:
261                             throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
262                     }
263                 } else {
264                     entry.discardConnection(CloseMode.GRACEFUL);
265                 }
266                 processNextPendingRequest();
267             } else {
268                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
269             }
270         } finally {
271             this.lock.unlock();
272         }
273         fireCallbacks();
274     }
275 
276     private void processPendingRequests() {
277         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
278         while (it.hasNext()) {
279             final LeaseRequest<T, C> request = it.next();
280             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
281             if (future.isCancelled()) {
282                 it.remove();
283                 continue;
284             }
285             final boolean completed = processPendingRequest(request);
286             if (request.isDone() || completed) {
287                 it.remove();
288             }
289             if (request.isDone()) {
290                 this.completedRequests.add(request);
291             }
292         }
293     }
294 
295     private void processNextPendingRequest() {
296         final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
297         while (it.hasNext()) {
298             final LeaseRequest<T, C> request = it.next();
299             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
300             if (future.isCancelled()) {
301                 it.remove();
302                 continue;
303             }
304             final boolean completed = processPendingRequest(request);
305             if (request.isDone() || completed) {
306                 it.remove();
307             }
308             if (request.isDone()) {
309                 this.completedRequests.add(request);
310             }
311             if (completed) {
312                 return;
313             }
314         }
315     }
316 
317     private boolean processPendingRequest(final LeaseRequest<T, C> request) {
318         final T route = request.getRoute();
319         final Object state = request.getState();
320         final Deadline deadline = request.getDeadline();
321 
322         if (deadline.isExpired()) {
323             request.failed(DeadlineTimeoutException.from(deadline));
324             return false;
325         }
326 
327         final PerRoutePool<T, C> pool = getPool(route);
328         PoolEntry<T, C> entry;
329         for (;;) {
330             entry = pool.getFree(state);
331             if (entry == null) {
332                 break;
333             }
334             if (entry.getExpiryDeadline().isExpired()) {
335                 entry.discardConnection(CloseMode.GRACEFUL);
336                 this.available.remove(entry);
337                 pool.free(entry, false);
338             } else {
339                 break;
340             }
341         }
342         if (entry != null) {
343             this.available.remove(entry);
344             this.leased.add(entry);
345             request.completed(entry);
346             if (this.connPoolListener != null) {
347                 this.connPoolListener.onLease(entry.getRoute(), this);
348             }
349             return true;
350         }
351 
352         // New connection is needed
353         final int maxPerRoute = getMax(route);
354         // Shrink the pool prior to allocating a new connection
355         final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
356         if (excess > 0) {
357             for (int i = 0; i < excess; i++) {
358                 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
359                 if (lastUsed == null) {
360                     break;
361                 }
362                 lastUsed.discardConnection(CloseMode.GRACEFUL);
363                 this.available.remove(lastUsed);
364                 pool.remove(lastUsed);
365             }
366         }
367 
368         if (pool.getAllocatedCount() < maxPerRoute) {
369             final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
370             if (freeCapacity == 0) {
371                 return false;
372             }
373             final int totalAvailable = this.available.size();
374             if (totalAvailable > freeCapacity - 1) {
375                 final PoolEntry<T, C> lastUsed = this.available.removeLast();
376                 lastUsed.discardConnection(CloseMode.GRACEFUL);
377                 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
378                 otherpool.remove(lastUsed);
379             }
380 
381             entry = pool.createEntry(this.timeToLive);
382             this.leased.add(entry);
383             request.completed(entry);
384             if (this.connPoolListener != null) {
385                 this.connPoolListener.onLease(entry.getRoute(), this);
386             }
387             return true;
388         }
389         return false;
390     }
391 
392     private void fireCallbacks() {
393         LeaseRequest<T, C> request;
394         while ((request = this.completedRequests.poll()) != null) {
395             final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
396             final Exception ex = request.getException();
397             final PoolEntry<T, C> result = request.getResult();
398             boolean successfullyCompleted = false;
399             if (ex != null) {
400                 future.failed(ex);
401             } else if (result != null) {
402                 if (future.completed(result)) {
403                     successfullyCompleted = true;
404                 }
405             } else {
406                 future.cancel();
407             }
408             if (!successfullyCompleted) {
409                 release(result, true);
410             }
411         }
412     }
413 
414     public void validatePendingRequests() {
415         this.lock.lock();
416         try {
417             final long now = System.currentTimeMillis();
418             final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
419             while (it.hasNext()) {
420                 final LeaseRequest<T, C> request = it.next();
421                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
422                 if (future.isCancelled() && !request.isDone()) {
423                     it.remove();
424                 } else {
425                     final Deadline deadline = request.getDeadline();
426                     if (deadline.isBefore(now)) {
427                         request.failed(DeadlineTimeoutException.from(deadline));
428                     }
429                     if (request.isDone()) {
430                         it.remove();
431                         this.completedRequests.add(request);
432                     }
433                 }
434             }
435         } finally {
436             this.lock.unlock();
437         }
438         fireCallbacks();
439     }
440 
441     private int getMax(final T route) {
442         final Integer v = this.maxPerRoute.get(route);
443         if (v != null) {
444             return v;
445         }
446         return this.defaultMaxPerRoute;
447     }
448 
449     @Override
450     public void setMaxTotal(final int max) {
451         Args.positive(max, "Max value");
452         this.lock.lock();
453         try {
454             this.maxTotal = max;
455         } finally {
456             this.lock.unlock();
457         }
458     }
459 
460     @Override
461     public int getMaxTotal() {
462         this.lock.lock();
463         try {
464             return this.maxTotal;
465         } finally {
466             this.lock.unlock();
467         }
468     }
469 
470     @Override
471     public void setDefaultMaxPerRoute(final int max) {
472         Args.positive(max, "Max value");
473         this.lock.lock();
474         try {
475             this.defaultMaxPerRoute = max;
476         } finally {
477             this.lock.unlock();
478         }
479     }
480 
481     @Override
482     public int getDefaultMaxPerRoute() {
483         this.lock.lock();
484         try {
485             return this.defaultMaxPerRoute;
486         } finally {
487             this.lock.unlock();
488         }
489     }
490 
491     @Override
492     public void setMaxPerRoute(final T route, final int max) {
493         Args.notNull(route, "Route");
494         this.lock.lock();
495         try {
496             if (max > -1) {
497                 this.maxPerRoute.put(route, max);
498             } else {
499                 this.maxPerRoute.remove(route);
500             }
501         } finally {
502             this.lock.unlock();
503         }
504     }
505 
506     @Override
507     public int getMaxPerRoute(final T route) {
508         Args.notNull(route, "Route");
509         this.lock.lock();
510         try {
511             return getMax(route);
512         } finally {
513             this.lock.unlock();
514         }
515     }
516 
517     @Override
518     public PoolStats getTotalStats() {
519         this.lock.lock();
520         try {
521             int pendingCount = 0;
522             for (final LeaseRequest<T, C> request: pendingRequests) {
523                 if (!request.isDone()) {
524                     final Deadline deadline = request.getDeadline();
525                     if (!deadline.isExpired()) {
526                         pendingCount++;
527                     }
528                 }
529             }
530             return new PoolStats(
531                     this.leased.size(),
532                     pendingCount,
533                     this.available.size(),
534                     this.maxTotal);
535         } finally {
536             this.lock.unlock();
537         }
538     }
539 
540     @Override
541     public PoolStats getStats(final T route) {
542         Args.notNull(route, "Route");
543         this.lock.lock();
544         try {
545             final PerRoutePool<T, C> pool = getPool(route);
546             int pendingCount = 0;
547             for (final LeaseRequest<T, C> request: pendingRequests) {
548                 if (!request.isDone() && Objects.equals(route, request.getRoute())) {
549                     final Deadline deadline = request.getDeadline();
550                     if (!deadline.isExpired()) {
551                         pendingCount++;
552                     }
553                 }
554             }
555             return new PoolStats(
556                     pool.getLeasedCount(),
557                     pendingCount,
558                     pool.getAvailableCount(),
559                     getMax(route));
560         } finally {
561             this.lock.unlock();
562         }
563     }
564 
565     /**
566      * Returns snapshot of all knows routes
567      *
568      * @since 4.4
569      */
570     @Override
571     public Set<T> getRoutes() {
572         this.lock.lock();
573         try {
574             return new HashSet<>(routeToPool.keySet());
575         } finally {
576             this.lock.unlock();
577         }
578     }
579 
580     /**
581      * Enumerates all available connections.
582      *
583      * @since 4.3
584      */
585     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
586         this.lock.lock();
587         try {
588             final Iterator<PoolEntry<T, C>> it = this.available.iterator();
589             while (it.hasNext()) {
590                 final PoolEntry<T, C> entry = it.next();
591                 callback.execute(entry);
592                 if (!entry.hasConnection()) {
593                     final PerRoutePool<T, C> pool = getPool(entry.getRoute());
594                     pool.remove(entry);
595                     it.remove();
596                 }
597             }
598             processPendingRequests();
599             purgePoolMap();
600         } finally {
601             this.lock.unlock();
602         }
603     }
604 
605     /**
606      * Enumerates all leased connections.
607      *
608      * @since 4.3
609      */
610     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
611         this.lock.lock();
612         try {
613             final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
614             while (it.hasNext()) {
615                 final PoolEntry<T, C> entry = it.next();
616                 callback.execute(entry);
617             }
618             processPendingRequests();
619         } finally {
620             this.lock.unlock();
621         }
622     }
623 
624     private void purgePoolMap() {
625         final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
626         while (it.hasNext()) {
627             final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
628             final PerRoutePool<T, C> pool = entry.getValue();
629             if (pool.getAllocatedCount() == 0) {
630                 it.remove();
631             }
632         }
633     }
634 
635     @Override
636     public void closeIdle(final TimeValue idleTime) {
637         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
638         enumAvailable(entry -> {
639             if (entry.getUpdated() <= deadline) {
640                 entry.discardConnection(CloseMode.GRACEFUL);
641             }
642         });
643     }
644 
645     @Override
646     public void closeExpired() {
647         final long now = System.currentTimeMillis();
648         enumAvailable(entry -> {
649             if (entry.getExpiryDeadline().isBefore(now)) {
650                 entry.discardConnection(CloseMode.GRACEFUL);
651             }
652         });
653     }
654 
655     @Override
656     public String toString() {
657         final StringBuilder buffer = new StringBuilder();
658         buffer.append("[leased: ");
659         buffer.append(this.leased.size());
660         buffer.append("][available: ");
661         buffer.append(this.available.size());
662         buffer.append("][pending: ");
663         buffer.append(this.pendingRequests.size());
664         buffer.append("]");
665         return buffer.toString();
666     }
667 
668 
669     static class LeaseRequest<T, C extends ModalCloseable> {
670 
671         private final T route;
672         private final Object state;
673         private final Deadline deadline;
674         private final BasicFuture<PoolEntry<T, C>> future;
675         // 'completed' is used internally to guard setting
676         // 'result' and 'ex', but mustn't be used by 'isDone()'.
677         private final AtomicBoolean completed;
678         private volatile PoolEntry<T, C> result;
679         private volatile Exception ex;
680 
681         /**
682          * Constructor
683          *
684          * @param route route
685          * @param state state
686          * @param requestTimeout timeout to wait in a request queue until kicked off
687          * @param future future callback
688          */
689         public LeaseRequest(
690                 final T route,
691                 final Object state,
692                 final Timeout requestTimeout,
693                 final BasicFuture<PoolEntry<T, C>> future) {
694             super();
695             this.route = route;
696             this.state = state;
697             this.deadline = Deadline.calculate(requestTimeout);
698             this.future = future;
699             this.completed = new AtomicBoolean(false);
700         }
701 
702         public T getRoute() {
703             return this.route;
704         }
705 
706         public Object getState() {
707             return this.state;
708         }
709 
710         public Deadline getDeadline() {
711             return this.deadline;
712         }
713 
714         public boolean isDone() {
715             // This method must not use 'completed.get()' which would result in a race
716             // where a caller may observe completed=true while neither result nor ex
717             // have been set yet.
718             return ex != null || result != null;
719         }
720 
721         public void failed(final Exception ex) {
722             if (this.completed.compareAndSet(false, true)) {
723                 this.ex = ex;
724             }
725         }
726 
727         public void completed(final PoolEntry<T, C> result) {
728             if (this.completed.compareAndSet(false, true)) {
729                 this.result = result;
730             }
731         }
732 
733         public BasicFuture<PoolEntry<T, C>> getFuture() {
734             return this.future;
735         }
736 
737         public PoolEntry<T, C> getResult() {
738             return this.result;
739         }
740 
741         public Exception getException() {
742             return this.ex;
743         }
744 
745         @Override
746         public String toString() {
747             final StringBuilder buffer = new StringBuilder();
748             buffer.append("[");
749             buffer.append(this.route);
750             buffer.append("][");
751             buffer.append(this.state);
752             buffer.append("]");
753             return buffer.toString();
754         }
755 
756     }
757 
758     static class PerRoutePool<T, C extends ModalCloseable> {
759 
760         private final T route;
761         private final Set<PoolEntry<T, C>> leased;
762         private final LinkedList<PoolEntry<T, C>> available;
763         private final DisposalCallback<C> disposalCallback;
764 
765         PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
766             super();
767             this.route = route;
768             this.disposalCallback = disposalCallback;
769             this.leased = new HashSet<>();
770             this.available = new LinkedList<>();
771         }
772 
773         public final T getRoute() {
774             return route;
775         }
776 
777         public int getLeasedCount() {
778             return this.leased.size();
779         }
780 
781         public int getAvailableCount() {
782             return this.available.size();
783         }
784 
785         public int getAllocatedCount() {
786             return this.available.size() + this.leased.size();
787         }
788 
789         public PoolEntry<T, C> getFree(final Object state) {
790             if (!this.available.isEmpty()) {
791                 if (state != null) {
792                     final Iterator<PoolEntry<T, C>> it = this.available.iterator();
793                     while (it.hasNext()) {
794                         final PoolEntry<T, C> entry = it.next();
795                         if (state.equals(entry.getState())) {
796                             it.remove();
797                             this.leased.add(entry);
798                             return entry;
799                         }
800                     }
801                 }
802                 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
803                 while (it.hasNext()) {
804                     final PoolEntry<T, C> entry = it.next();
805                     if (entry.getState() == null) {
806                         it.remove();
807                         this.leased.add(entry);
808                         return entry;
809                     }
810                 }
811             }
812             return null;
813         }
814 
815         public PoolEntry<T, C> getLastUsed() {
816             return this.available.peekLast();
817         }
818 
819         public boolean remove(final PoolEntry<T, C> entry) {
820             return this.available.remove(entry) || this.leased.remove(entry);
821         }
822 
823         public void free(final PoolEntry<T, C> entry, final boolean reusable) {
824             final boolean found = this.leased.remove(entry);
825             Asserts.check(found, "Entry %s has not been leased from this pool", entry);
826             if (reusable) {
827                 this.available.addFirst(entry);
828             }
829         }
830 
831         public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
832             final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
833             this.leased.add(entry);
834             return entry;
835         }
836 
837         public void shutdown(final CloseMode closeMode) {
838             PoolEntry<T, C> availableEntry;
839             while ((availableEntry = available.poll()) != null) {
840                 availableEntry.discardConnection(closeMode);
841             }
842             for (final PoolEntry<T, C> entry: this.leased) {
843                 entry.discardConnection(closeMode);
844             }
845             this.leased.clear();
846         }
847 
848         @Override
849         public String toString() {
850             final StringBuilder buffer = new StringBuilder();
851             buffer.append("[route: ");
852             buffer.append(this.route);
853             buffer.append("][leased: ");
854             buffer.append(this.leased.size());
855             buffer.append("][available: ");
856             buffer.append(this.available.size());
857             buffer.append("]");
858             return buffer.toString();
859         }
860 
861     }
862 }