1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.nio.pool;
28
29 import java.io.IOException;
30 import java.net.SocketAddress;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.ListIterator;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.ConcurrentLinkedQueue;
39 import java.util.concurrent.ExecutionException;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.locks.Lock;
45 import java.util.concurrent.locks.ReentrantLock;
46
47 import org.apache.http.annotation.Contract;
48 import org.apache.http.annotation.ThreadingBehavior;
49 import org.apache.http.concurrent.BasicFuture;
50 import org.apache.http.concurrent.FutureCallback;
51 import org.apache.http.nio.reactor.ConnectingIOReactor;
52 import org.apache.http.nio.reactor.IOReactorStatus;
53 import org.apache.http.nio.reactor.IOSession;
54 import org.apache.http.nio.reactor.SessionRequest;
55 import org.apache.http.nio.reactor.SessionRequestCallback;
56 import org.apache.http.pool.ConnPool;
57 import org.apache.http.pool.ConnPoolControl;
58 import org.apache.http.pool.PoolEntry;
59 import org.apache.http.pool.PoolEntryCallback;
60 import org.apache.http.pool.PoolStats;
61 import org.apache.http.util.Args;
62 import org.apache.http.util.Asserts;
63 import org.apache.http.util.LangUtils;
64
65
66
67
68
69
70
71
72
73
74 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
75 public abstract class AbstractNIOConnPool<T, C, E extends PoolEntry<T, C>>
76 implements ConnPool<T, E>, ConnPoolControl<T> {
77
78 private final ConnectingIOReactor ioReactor;
79 private final NIOConnFactory<T, C> connFactory;
80 private final SocketAddressResolver<T> addressResolver;
81 private final SessionRequestCallback sessionRequestCallback;
82 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
83 private final LinkedList<LeaseRequest<T, C, E>> leasingRequests;
84 private final Set<SessionRequest> pending;
85 private final Set<E> leased;
86 private final LinkedList<E> available;
87 private final ConcurrentLinkedQueue<LeaseRequest<T, C, E>> completedRequests;
88 private final Map<T, Integer> maxPerRoute;
89 private final Lock lock;
90 private final AtomicBoolean isShutDown;
91
92 private volatile int defaultMaxPerRoute;
93 private volatile int maxTotal;
94
95
96
97
98
99 @Deprecated
100 public AbstractNIOConnPool(
101 final ConnectingIOReactor ioReactor,
102 final NIOConnFactory<T, C> connFactory,
103 final int defaultMaxPerRoute,
104 final int maxTotal) {
105 super();
106 Args.notNull(ioReactor, "I/O reactor");
107 Args.notNull(connFactory, "Connection factory");
108 Args.positive(defaultMaxPerRoute, "Max per route value");
109 Args.positive(maxTotal, "Max total value");
110 this.ioReactor = ioReactor;
111 this.connFactory = connFactory;
112 this.addressResolver = new SocketAddressResolver<T>() {
113
114 @Override
115 public SocketAddress resolveLocalAddress(final T route) throws IOException {
116 return AbstractNIOConnPool.this.resolveLocalAddress(route);
117 }
118
119 @Override
120 public SocketAddress resolveRemoteAddress(final T route) throws IOException {
121 return AbstractNIOConnPool.this.resolveRemoteAddress(route);
122 }
123
124 };
125 this.sessionRequestCallback = new InternalSessionRequestCallback();
126 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
127 this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
128 this.pending = new HashSet<SessionRequest>();
129 this.leased = new HashSet<E>();
130 this.available = new LinkedList<E>();
131 this.maxPerRoute = new HashMap<T, Integer>();
132 this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
133 this.lock = new ReentrantLock();
134 this.isShutDown = new AtomicBoolean(false);
135 this.defaultMaxPerRoute = defaultMaxPerRoute;
136 this.maxTotal = maxTotal;
137 }
138
139
140
141
142 public AbstractNIOConnPool(
143 final ConnectingIOReactor ioReactor,
144 final NIOConnFactory<T, C> connFactory,
145 final SocketAddressResolver<T> addressResolver,
146 final int defaultMaxPerRoute,
147 final int maxTotal) {
148 super();
149 Args.notNull(ioReactor, "I/O reactor");
150 Args.notNull(connFactory, "Connection factory");
151 Args.notNull(addressResolver, "Address resolver");
152 Args.positive(defaultMaxPerRoute, "Max per route value");
153 Args.positive(maxTotal, "Max total value");
154 this.ioReactor = ioReactor;
155 this.connFactory = connFactory;
156 this.addressResolver = addressResolver;
157 this.sessionRequestCallback = new InternalSessionRequestCallback();
158 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
159 this.leasingRequests = new LinkedList<LeaseRequest<T, C, E>>();
160 this.pending = new HashSet<SessionRequest>();
161 this.leased = new HashSet<E>();
162 this.available = new LinkedList<E>();
163 this.completedRequests = new ConcurrentLinkedQueue<LeaseRequest<T, C, E>>();
164 this.maxPerRoute = new HashMap<T, Integer>();
165 this.lock = new ReentrantLock();
166 this.isShutDown = new AtomicBoolean(false);
167 this.defaultMaxPerRoute = defaultMaxPerRoute;
168 this.maxTotal = maxTotal;
169 }
170
171
172
173
174 @Deprecated
175 protected SocketAddress resolveRemoteAddress(final T route) {
176 return null;
177 }
178
179
180
181
182 @Deprecated
183 protected SocketAddress resolveLocalAddress(final T route) {
184 return null;
185 }
186
187 protected abstract E createEntry(T route, C conn);
188
189
190
191
192 protected void onLease(final E entry) {
193 }
194
195
196
197
198 protected void onRelease(final E entry) {
199 }
200
201
202
203
204 protected void onReuse(final E entry) {
205 }
206
207 public boolean isShutdown() {
208 return this.isShutDown.get();
209 }
210
211 public void shutdown(final long waitMs) throws IOException {
212 if (this.isShutDown.compareAndSet(false, true)) {
213 fireCallbacks();
214 this.lock.lock();
215 try {
216 for (final SessionRequest sessionRequest: this.pending) {
217 sessionRequest.cancel();
218 }
219 for (final E entry: this.available) {
220 entry.close();
221 }
222 for (final E entry: this.leased) {
223 entry.close();
224 }
225 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
226 pool.shutdown();
227 }
228 this.routeToPool.clear();
229 this.leased.clear();
230 this.pending.clear();
231 this.available.clear();
232 this.leasingRequests.clear();
233 this.ioReactor.shutdown(waitMs);
234 } finally {
235 this.lock.unlock();
236 }
237 }
238 }
239
240 private RouteSpecificPool<T, C, E> getPool(final T route) {
241 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
242 if (pool == null) {
243 pool = new RouteSpecificPool<T, C, E>(route) {
244
245 @Override
246 protected E createEntry(final T route, final C conn) {
247 return AbstractNIOConnPool.this.createEntry(route, conn);
248 }
249
250 };
251 this.routeToPool.put(route, pool);
252 }
253 return pool;
254 }
255
256 public Future<E> lease(
257 final T route, final Object state,
258 final long connectTimeout, final TimeUnit timeUnit,
259 final FutureCallback<E> callback) {
260 return this.lease(route, state, connectTimeout, connectTimeout, timeUnit, callback);
261 }
262
263
264
265
266 public Future<E> lease(
267 final T route, final Object state,
268 final long connectTimeout, final long leaseTimeout, final TimeUnit timeUnit,
269 final FutureCallback<E> callback) {
270 Args.notNull(route, "Route");
271 Args.notNull(timeUnit, "Time unit");
272 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
273 final BasicFuture<E> future = new BasicFuture<E>(callback);
274 final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state,
275 connectTimeout >= 0 ? timeUnit.toMillis(connectTimeout) : -1,
276 leaseTimeout > 0 ? timeUnit.toMillis(leaseTimeout) : 0,
277 future);
278 this.lock.lock();
279 try {
280 final boolean completed = processPendingRequest(leaseRequest);
281 if (!leaseRequest.isDone() && !completed) {
282 this.leasingRequests.add(leaseRequest);
283 }
284 if (leaseRequest.isDone()) {
285 this.completedRequests.add(leaseRequest);
286 }
287 } finally {
288 this.lock.unlock();
289 }
290 fireCallbacks();
291 return new Future<E>() {
292
293 @Override
294 public E get() throws InterruptedException, ExecutionException {
295 return future.get();
296 }
297
298 @Override
299 public E get(
300 final long timeout,
301 final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
302 return future.get(timeout, unit);
303 }
304
305 @Override
306 public boolean cancel(final boolean mayInterruptIfRunning) {
307 try {
308 leaseRequest.cancel();
309 } finally {
310 return future.cancel(mayInterruptIfRunning);
311 }
312 }
313
314 @Override
315 public boolean isCancelled() {
316 return future.isCancelled();
317 }
318
319 @Override
320 public boolean isDone() {
321 return future.isDone();
322 }
323
324 };
325 }
326
327 @Override
328 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
329 return lease(route, state, -1, TimeUnit.MICROSECONDS, callback);
330 }
331
332 public Future<E> lease(final T route, final Object state) {
333 return lease(route, state, -1, TimeUnit.MICROSECONDS, null);
334 }
335
336 @Override
337 public void release(final E entry, final boolean reusable) {
338 if (entry == null) {
339 return;
340 }
341 if (this.isShutDown.get()) {
342 return;
343 }
344 this.lock.lock();
345 try {
346 if (this.leased.remove(entry)) {
347 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
348 pool.free(entry, reusable);
349 if (reusable) {
350 this.available.addFirst(entry);
351 onRelease(entry);
352 } else {
353 entry.close();
354 }
355 processNextPendingRequest();
356 }
357 } finally {
358 this.lock.unlock();
359 }
360 fireCallbacks();
361 }
362
363 private void processPendingRequests() {
364 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
365 while (it.hasNext()) {
366 final LeaseRequest<T, C, E> request = it.next();
367 final BasicFuture<E> future = request.getFuture();
368 if (future.isCancelled()) {
369 it.remove();
370 continue;
371 }
372 final boolean completed = processPendingRequest(request);
373 if (request.isDone() || completed) {
374 it.remove();
375 }
376 if (request.isDone()) {
377 this.completedRequests.add(request);
378 }
379 }
380 }
381
382 private void processNextPendingRequest() {
383 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
384 while (it.hasNext()) {
385 final LeaseRequest<T, C, E> request = it.next();
386 final BasicFuture<E> future = request.getFuture();
387 if (future.isCancelled()) {
388 it.remove();
389 continue;
390 }
391 final boolean completed = processPendingRequest(request);
392 if (request.isDone() || completed) {
393 it.remove();
394 }
395 if (request.isDone()) {
396 this.completedRequests.add(request);
397 }
398 if (completed) {
399 return;
400 }
401 }
402 }
403
404 private boolean processPendingRequest(final LeaseRequest<T, C, E> request) {
405 final T route = request.getRoute();
406 final Object state = request.getState();
407 final long deadline = request.getDeadline();
408
409 final long now = System.currentTimeMillis();
410 if (now > deadline) {
411 request.failed(new TimeoutException("Connection lease request time out"));
412 return false;
413 }
414
415 final RouteSpecificPool<T, C, E> pool = getPool(route);
416 E entry;
417 for (;;) {
418 entry = pool.getFree(state);
419 if (entry == null) {
420 break;
421 }
422 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
423 entry.close();
424 this.available.remove(entry);
425 pool.free(entry, false);
426 } else {
427 break;
428 }
429 }
430 if (entry != null) {
431 this.available.remove(entry);
432 this.leased.add(entry);
433 request.completed(entry);
434 onReuse(entry);
435 onLease(entry);
436 return true;
437 }
438
439
440 final int maxPerRoute = getMax(route);
441
442 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
443 if (excess > 0) {
444 for (int i = 0; i < excess; i++) {
445 final E lastUsed = pool.getLastUsed();
446 if (lastUsed == null) {
447 break;
448 }
449 lastUsed.close();
450 this.available.remove(lastUsed);
451 pool.remove(lastUsed);
452 }
453 }
454
455 if (pool.getAllocatedCount() < maxPerRoute) {
456 final int totalUsed = this.pending.size() + this.leased.size();
457 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
458 if (freeCapacity == 0) {
459 return false;
460 }
461 final int totalAvailable = this.available.size();
462 if (totalAvailable > freeCapacity - 1) {
463 final E lastUsed = this.available.removeLast();
464 lastUsed.close();
465 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
466 otherpool.remove(lastUsed);
467 }
468
469 final SocketAddress localAddress;
470 final SocketAddress remoteAddress;
471 try {
472 remoteAddress = this.addressResolver.resolveRemoteAddress(route);
473 localAddress = this.addressResolver.resolveLocalAddress(route);
474 } catch (final IOException ex) {
475 request.failed(ex);
476 return false;
477 }
478
479 final SessionRequest sessionRequest = this.ioReactor.connect(
480 remoteAddress, localAddress, route, this.sessionRequestCallback);
481 request.attachSessionRequest(sessionRequest);
482 final long connectTimeout = request.getConnectTimeout();
483 if (connectTimeout >= 0) {
484 sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE);
485 }
486 this.pending.add(sessionRequest);
487 pool.addPending(sessionRequest, request.getFuture());
488 return true;
489 }
490 return false;
491 }
492
493 private void fireCallbacks() {
494 LeaseRequest<T, C, E> request;
495 while ((request = this.completedRequests.poll()) != null) {
496 final BasicFuture<E> future = request.getFuture();
497 final Exception ex = request.getException();
498 final E result = request.getResult();
499 boolean successfullyCompleted = false;
500 if (ex != null) {
501 future.failed(ex);
502 } else if (result != null) {
503 if (future.completed(result)) {
504 successfullyCompleted = true;
505 }
506 } else {
507 future.cancel();
508 }
509 if (!successfullyCompleted) {
510 release(result, true);
511 }
512 }
513 }
514
515 public void validatePendingRequests() {
516 this.lock.lock();
517 try {
518 final long now = System.currentTimeMillis();
519 final ListIterator<LeaseRequest<T, C, E>> it = this.leasingRequests.listIterator();
520 while (it.hasNext()) {
521 final LeaseRequest<T, C, E> request = it.next();
522 final BasicFuture<E> future = request.getFuture();
523 if (future.isCancelled() && !request.isDone()) {
524 it.remove();
525 } else {
526 final long deadline = request.getDeadline();
527 if (now > deadline) {
528 request.failed(new TimeoutException("Connection lease request time out"));
529 }
530 if (request.isDone()) {
531 it.remove();
532 this.completedRequests.add(request);
533 }
534 }
535 }
536 } finally {
537 this.lock.unlock();
538 }
539 fireCallbacks();
540 }
541
542 protected void requestCompleted(final SessionRequest request) {
543 if (this.isShutDown.get()) {
544 final IOSession session = request.getSession();
545 if (session != null) {
546 session.close();
547 }
548 return;
549 }
550 @SuppressWarnings("unchecked")
551 final
552 T route = (T) request.getAttachment();
553 this.lock.lock();
554 try {
555 this.pending.remove(request);
556 final RouteSpecificPool<T, C, E> pool = getPool(route);
557 final IOSession session = request.getSession();
558 try {
559 final C conn = this.connFactory.create(route, session);
560 final E entry = pool.createEntry(request, conn);
561 if (pool.completed(request, entry)) {
562 this.leased.add(entry);
563 onLease(entry);
564 } else {
565 this.available.add(entry);
566 if (this.ioReactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
567 processNextPendingRequest();
568 }
569 }
570 } catch (final IOException ex) {
571 pool.failed(request, ex);
572 }
573 } finally {
574 this.lock.unlock();
575 }
576 fireCallbacks();
577 }
578
579 protected void requestCancelled(final SessionRequest request) {
580 if (this.isShutDown.get()) {
581 return;
582 }
583 @SuppressWarnings("unchecked")
584 final
585 T route = (T) request.getAttachment();
586 this.lock.lock();
587 try {
588 this.pending.remove(request);
589 final RouteSpecificPool<T, C, E> pool = getPool(route);
590 pool.cancelled(request);
591 if (this.ioReactor.getStatus().compareTo(IOReactorStatus.ACTIVE) <= 0) {
592 processNextPendingRequest();
593 }
594 } finally {
595 this.lock.unlock();
596 }
597 fireCallbacks();
598 }
599
600 protected void requestFailed(final SessionRequest request) {
601 if (this.isShutDown.get()) {
602 return;
603 }
604 @SuppressWarnings("unchecked")
605 final
606 T route = (T) request.getAttachment();
607 this.lock.lock();
608 try {
609 this.pending.remove(request);
610 final RouteSpecificPool<T, C, E> pool = getPool(route);
611 pool.failed(request, request.getException());
612 processNextPendingRequest();
613 } finally {
614 this.lock.unlock();
615 }
616 fireCallbacks();
617 }
618
619 protected void requestTimeout(final SessionRequest request) {
620 if (this.isShutDown.get()) {
621 return;
622 }
623 @SuppressWarnings("unchecked")
624 final
625 T route = (T) request.getAttachment();
626 this.lock.lock();
627 try {
628 this.pending.remove(request);
629 final RouteSpecificPool<T, C, E> pool = getPool(route);
630 pool.timeout(request);
631 processNextPendingRequest();
632 } finally {
633 this.lock.unlock();
634 }
635 fireCallbacks();
636 }
637
638 private int getMax(final T route) {
639 final Integer v = this.maxPerRoute.get(route);
640 return v != null ? v.intValue() : this.defaultMaxPerRoute;
641 }
642
643 @Override
644 public void setMaxTotal(final int max) {
645 Args.positive(max, "Max value");
646 this.lock.lock();
647 try {
648 this.maxTotal = max;
649 } finally {
650 this.lock.unlock();
651 }
652 }
653
654 @Override
655 public int getMaxTotal() {
656 this.lock.lock();
657 try {
658 return this.maxTotal;
659 } finally {
660 this.lock.unlock();
661 }
662 }
663
664 @Override
665 public void setDefaultMaxPerRoute(final int max) {
666 Args.positive(max, "Max value");
667 this.lock.lock();
668 try {
669 this.defaultMaxPerRoute = max;
670 } finally {
671 this.lock.unlock();
672 }
673 }
674
675 @Override
676 public int getDefaultMaxPerRoute() {
677 this.lock.lock();
678 try {
679 return this.defaultMaxPerRoute;
680 } finally {
681 this.lock.unlock();
682 }
683 }
684
685 @Override
686 public void setMaxPerRoute(final T route, final int max) {
687 Args.notNull(route, "Route");
688 this.lock.lock();
689 try {
690 if (max > -1) {
691 this.maxPerRoute.put(route, Integer.valueOf(max));
692 } else {
693 this.maxPerRoute.remove(route);
694 }
695 } finally {
696 this.lock.unlock();
697 }
698 }
699
700 @Override
701 public int getMaxPerRoute(final T route) {
702 Args.notNull(route, "Route");
703 this.lock.lock();
704 try {
705 return getMax(route);
706 } finally {
707 this.lock.unlock();
708 }
709 }
710
711 @Override
712 public PoolStats getTotalStats() {
713 this.lock.lock();
714 try {
715 return new PoolStats(
716 this.leased.size(),
717 this.pending.size(),
718 this.available.size(),
719 this.maxTotal);
720 } finally {
721 this.lock.unlock();
722 }
723 }
724
725 @Override
726 public PoolStats getStats(final T route) {
727 Args.notNull(route, "Route");
728 this.lock.lock();
729 try {
730 final RouteSpecificPool<T, C, E> pool = getPool(route);
731 int pendingCount = 0;
732 for (final LeaseRequest<T, C, E> request: leasingRequests) {
733 if (LangUtils.equals(route, request.getRoute())) {
734 pendingCount++;
735 }
736 }
737 return new PoolStats(
738 pool.getLeasedCount(),
739 pendingCount + pool.getPendingCount(),
740 pool.getAvailableCount(),
741 getMax(route));
742 } finally {
743 this.lock.unlock();
744 }
745 }
746
747
748
749
750
751
752 public Set<T> getRoutes() {
753 this.lock.lock();
754 try {
755 return new HashSet<T>(routeToPool.keySet());
756 } finally {
757 this.lock.unlock();
758 }
759 }
760
761
762
763
764
765
766 protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
767 this.lock.lock();
768 try {
769 final Iterator<E> it = this.available.iterator();
770 while (it.hasNext()) {
771 final E entry = it.next();
772 callback.process(entry);
773 if (entry.isClosed()) {
774 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
775 pool.remove(entry);
776 it.remove();
777 }
778 }
779 processPendingRequests();
780 purgePoolMap();
781 } finally {
782 this.lock.unlock();
783 }
784 }
785
786
787
788
789
790
791 protected void enumLeased(final PoolEntryCallback<T, C> callback) {
792 this.lock.lock();
793 try {
794 final Iterator<E> it = this.leased.iterator();
795 while (it.hasNext()) {
796 final E entry = it.next();
797 callback.process(entry);
798 }
799 processPendingRequests();
800 } finally {
801 this.lock.unlock();
802 }
803 }
804
805
806
807
808
809
810
811 @Deprecated
812 protected void enumEntries(final Iterator<E> it, final PoolEntryCallback<T, C> callback) {
813 while (it.hasNext()) {
814 final E entry = it.next();
815 callback.process(entry);
816 }
817 processPendingRequests();
818 }
819
820 private void purgePoolMap() {
821 final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
822 while (it.hasNext()) {
823 final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
824 final RouteSpecificPool<T, C, E> pool = entry.getValue();
825 if (pool.getAllocatedCount() == 0) {
826 it.remove();
827 }
828 }
829 }
830
831 public void closeIdle(final long idletime, final TimeUnit timeUnit) {
832 Args.notNull(timeUnit, "Time unit");
833 long time = timeUnit.toMillis(idletime);
834 if (time < 0) {
835 time = 0;
836 }
837 final long deadline = System.currentTimeMillis() - time;
838 enumAvailable(new PoolEntryCallback<T, C>() {
839
840 @Override
841 public void process(final PoolEntry<T, C> entry) {
842 if (entry.getUpdated() <= deadline) {
843 entry.close();
844 }
845 }
846
847 });
848 }
849
850 public void closeExpired() {
851 final long now = System.currentTimeMillis();
852 enumAvailable(new PoolEntryCallback<T, C>() {
853
854 @Override
855 public void process(final PoolEntry<T, C> entry) {
856 if (entry.isExpired(now)) {
857 entry.close();
858 }
859 }
860
861 });
862 }
863
864 @Override
865 public String toString() {
866 final StringBuilder buffer = new StringBuilder();
867 buffer.append("[leased: ");
868 buffer.append(this.leased);
869 buffer.append("][available: ");
870 buffer.append(this.available);
871 buffer.append("][pending: ");
872 buffer.append(this.pending);
873 buffer.append("]");
874 return buffer.toString();
875 }
876
877 class InternalSessionRequestCallback implements SessionRequestCallback {
878
879 @Override
880 public void completed(final SessionRequest request) {
881 requestCompleted(request);
882 }
883
884 @Override
885 public void cancelled(final SessionRequest request) {
886 requestCancelled(request);
887 }
888
889 @Override
890 public void failed(final SessionRequest request) {
891 requestFailed(request);
892 }
893
894 @Override
895 public void timeout(final SessionRequest request) {
896 requestTimeout(request);
897 }
898
899 }
900
901 }