1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.ListIterator;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.locks.Lock;
43 import java.util.concurrent.locks.ReentrantLock;
44
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.ThreadingBehavior;
47 import org.apache.hc.core5.concurrent.BasicFuture;
48 import org.apache.hc.core5.concurrent.FutureCallback;
49 import org.apache.hc.core5.function.Callback;
50 import org.apache.hc.core5.io.CloseMode;
51 import org.apache.hc.core5.io.ModalCloseable;
52 import org.apache.hc.core5.util.Args;
53 import org.apache.hc.core5.util.Asserts;
54 import org.apache.hc.core5.util.Deadline;
55 import org.apache.hc.core5.util.DeadlineTimeoutException;
56 import org.apache.hc.core5.util.LangUtils;
57 import org.apache.hc.core5.util.TimeValue;
58 import org.apache.hc.core5.util.Timeout;
59
60
61
62
63
64
65
66
67
68 @Contract(threading = ThreadingBehavior.SAFE)
69 public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70
71 private final TimeValue timeToLive;
72 private final PoolReusePolicy policy;
73 private final DisposalCallback<C> disposalCallback;
74 private final ConnPoolListener<T> connPoolListener;
75 private final Map<T, PerRoutePool<T, C>> routeToPool;
76 private final LinkedList<LeaseRequest<T, C>> leasingRequests;
77 private final Set<PoolEntry<T, C>> leased;
78 private final LinkedList<PoolEntry<T, C>> available;
79 private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80 private final Map<T, Integer> maxPerRoute;
81 private final Lock lock;
82 private final AtomicBoolean isShutDown;
83
84 private volatile int defaultMaxPerRoute;
85 private volatile int maxTotal;
86
87
88
89
90 public StrictConnPool(
91 final int defaultMaxPerRoute,
92 final int maxTotal,
93 final TimeValue timeToLive,
94 final PoolReusePolicy policy,
95 final DisposalCallback<C> disposalCallback,
96 final ConnPoolListener<T> connPoolListener) {
97 super();
98 Args.positive(defaultMaxPerRoute, "Max per route value");
99 Args.positive(maxTotal, "Max total value");
100 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102 this.disposalCallback = disposalCallback;
103 this.connPoolListener = connPoolListener;
104 this.routeToPool = new HashMap<>();
105 this.leasingRequests = new LinkedList<>();
106 this.leased = new HashSet<>();
107 this.available = new LinkedList<>();
108 this.completedRequests = new ConcurrentLinkedQueue<>();
109 this.maxPerRoute = new HashMap<>();
110 this.lock = new ReentrantLock();
111 this.isShutDown = new AtomicBoolean(false);
112 this.defaultMaxPerRoute = defaultMaxPerRoute;
113 this.maxTotal = maxTotal;
114 }
115
116
117
118
119 public StrictConnPool(
120 final int defaultMaxPerRoute,
121 final int maxTotal,
122 final TimeValue timeToLive,
123 final PoolReusePolicy policy,
124 final ConnPoolListener<T> connPoolListener) {
125 this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126 }
127
128 public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129 this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130 }
131
132 public boolean isShutdown() {
133 return this.isShutDown.get();
134 }
135
136 @Override
137 public void close(final CloseMode closeMode) {
138 if (this.isShutDown.compareAndSet(false, true)) {
139 fireCallbacks();
140 this.lock.lock();
141 try {
142 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143 pool.shutdown(closeMode);
144 }
145 this.routeToPool.clear();
146 this.leased.clear();
147 this.available.clear();
148 this.leasingRequests.clear();
149 } finally {
150 this.lock.unlock();
151 }
152 }
153 }
154
155 @Override
156 public void close() {
157 close(CloseMode.GRACEFUL);
158 }
159
160 private PerRoutePool<T, C> getPool(final T route) {
161 PerRoutePool<T, C> pool = this.routeToPool.get(route);
162 if (pool == null) {
163 pool = new PerRoutePool<>(route, this.disposalCallback);
164 this.routeToPool.put(route, pool);
165 }
166 return pool;
167 }
168
169 @Override
170 public Future<PoolEntry<T, C>> lease(
171 final T route, final Object state,
172 final Timeout requestTimeout,
173 final FutureCallback<PoolEntry<T, C>> callback) {
174 Args.notNull(route, "Route");
175 Args.notNull(requestTimeout, "Request timeout");
176 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177 final Deadline deadline = Deadline.calculate(requestTimeout);
178 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179
180 @Override
181 public synchronized PoolEntry<T, C> get(
182 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183 try {
184 return super.get(timeout, unit);
185 } catch (final TimeoutException ex) {
186 cancel();
187 throw ex;
188 }
189 }
190
191 };
192 final boolean acquiredLock;
193
194 try {
195 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
196 } catch (final InterruptedException interruptedException) {
197 Thread.currentThread().interrupt();
198 future.cancel();
199 return future;
200 }
201
202 if (acquiredLock) {
203 try {
204 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
205 final boolean completed = processPendingRequest(request);
206 if (!request.isDone() && !completed) {
207 this.leasingRequests.add(request);
208 }
209 if (request.isDone()) {
210 this.completedRequests.add(request);
211 }
212 } finally {
213 this.lock.unlock();
214 }
215 fireCallbacks();
216 } else {
217 future.failed(DeadlineTimeoutException.from(deadline));
218 }
219
220 return future;
221 }
222
223 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
224 return lease(route, state, Timeout.DISABLED, null);
225 }
226
227 @Override
228 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
229 if (entry == null) {
230 return;
231 }
232 if (this.isShutDown.get()) {
233 return;
234 }
235 if (!reusable) {
236 entry.discardConnection(CloseMode.GRACEFUL);
237 }
238 this.lock.lock();
239 try {
240 if (this.leased.remove(entry)) {
241 if (this.connPoolListener != null) {
242 this.connPoolListener.onRelease(entry.getRoute(), this);
243 }
244 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
245 final boolean keepAlive = entry.hasConnection() && reusable;
246 pool.free(entry, keepAlive);
247 if (keepAlive) {
248 switch (policy) {
249 case LIFO:
250 this.available.addFirst(entry);
251 break;
252 case FIFO:
253 this.available.addLast(entry);
254 break;
255 default:
256 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
257 }
258 } else {
259 entry.discardConnection(CloseMode.GRACEFUL);
260 }
261 processNextPendingRequest();
262 } else {
263 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
264 }
265 } finally {
266 this.lock.unlock();
267 }
268 fireCallbacks();
269 }
270
271 private void processPendingRequests() {
272 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
273 while (it.hasNext()) {
274 final LeaseRequest<T, C> request = it.next();
275 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
276 if (future.isCancelled()) {
277 it.remove();
278 continue;
279 }
280 final boolean completed = processPendingRequest(request);
281 if (request.isDone() || completed) {
282 it.remove();
283 }
284 if (request.isDone()) {
285 this.completedRequests.add(request);
286 }
287 }
288 }
289
290 private void processNextPendingRequest() {
291 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
292 while (it.hasNext()) {
293 final LeaseRequest<T, C> request = it.next();
294 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
295 if (future.isCancelled()) {
296 it.remove();
297 continue;
298 }
299 final boolean completed = processPendingRequest(request);
300 if (request.isDone() || completed) {
301 it.remove();
302 }
303 if (request.isDone()) {
304 this.completedRequests.add(request);
305 }
306 if (completed) {
307 return;
308 }
309 }
310 }
311
312 private boolean processPendingRequest(final LeaseRequest<T, C> request) {
313 final T route = request.getRoute();
314 final Object state = request.getState();
315 final Deadline deadline = request.getDeadline();
316
317 if (deadline.isExpired()) {
318 request.failed(DeadlineTimeoutException.from(deadline));
319 return false;
320 }
321
322 final PerRoutePool<T, C> pool = getPool(route);
323 PoolEntry<T, C> entry;
324 for (;;) {
325 entry = pool.getFree(state);
326 if (entry == null) {
327 break;
328 }
329 if (entry.getExpiryDeadline().isExpired()) {
330 entry.discardConnection(CloseMode.GRACEFUL);
331 this.available.remove(entry);
332 pool.free(entry, false);
333 } else {
334 break;
335 }
336 }
337 if (entry != null) {
338 this.available.remove(entry);
339 this.leased.add(entry);
340 request.completed(entry);
341 if (this.connPoolListener != null) {
342 this.connPoolListener.onLease(entry.getRoute(), this);
343 }
344 return true;
345 }
346
347
348 final int maxPerRoute = getMax(route);
349
350 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
351 if (excess > 0) {
352 for (int i = 0; i < excess; i++) {
353 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
354 if (lastUsed == null) {
355 break;
356 }
357 lastUsed.discardConnection(CloseMode.GRACEFUL);
358 this.available.remove(lastUsed);
359 pool.remove(lastUsed);
360 }
361 }
362
363 if (pool.getAllocatedCount() < maxPerRoute) {
364 final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
365 if (freeCapacity == 0) {
366 return false;
367 }
368 final int totalAvailable = this.available.size();
369 if (totalAvailable > freeCapacity - 1) {
370 final PoolEntry<T, C> lastUsed = this.available.removeLast();
371 lastUsed.discardConnection(CloseMode.GRACEFUL);
372 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
373 otherpool.remove(lastUsed);
374 }
375
376 entry = pool.createEntry(this.timeToLive);
377 this.leased.add(entry);
378 request.completed(entry);
379 if (this.connPoolListener != null) {
380 this.connPoolListener.onLease(entry.getRoute(), this);
381 }
382 return true;
383 }
384 return false;
385 }
386
387 private void fireCallbacks() {
388 LeaseRequest<T, C> request;
389 while ((request = this.completedRequests.poll()) != null) {
390 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
391 final Exception ex = request.getException();
392 final PoolEntry<T, C> result = request.getResult();
393 boolean successfullyCompleted = false;
394 if (ex != null) {
395 future.failed(ex);
396 } else if (result != null) {
397 if (future.completed(result)) {
398 successfullyCompleted = true;
399 }
400 } else {
401 future.cancel();
402 }
403 if (!successfullyCompleted) {
404 release(result, true);
405 }
406 }
407 }
408
409 public void validatePendingRequests() {
410 this.lock.lock();
411 try {
412 final long now = System.currentTimeMillis();
413 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
414 while (it.hasNext()) {
415 final LeaseRequest<T, C> request = it.next();
416 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
417 if (future.isCancelled() && !request.isDone()) {
418 it.remove();
419 } else {
420 final Deadline deadline = request.getDeadline();
421 if (deadline.isBefore(now)) {
422 request.failed(DeadlineTimeoutException.from(deadline));
423 }
424 if (request.isDone()) {
425 it.remove();
426 this.completedRequests.add(request);
427 }
428 }
429 }
430 } finally {
431 this.lock.unlock();
432 }
433 fireCallbacks();
434 }
435
436 private int getMax(final T route) {
437 final Integer v = this.maxPerRoute.get(route);
438 if (v != null) {
439 return v;
440 }
441 return this.defaultMaxPerRoute;
442 }
443
444 @Override
445 public void setMaxTotal(final int max) {
446 Args.positive(max, "Max value");
447 this.lock.lock();
448 try {
449 this.maxTotal = max;
450 } finally {
451 this.lock.unlock();
452 }
453 }
454
455 @Override
456 public int getMaxTotal() {
457 this.lock.lock();
458 try {
459 return this.maxTotal;
460 } finally {
461 this.lock.unlock();
462 }
463 }
464
465 @Override
466 public void setDefaultMaxPerRoute(final int max) {
467 Args.positive(max, "Max value");
468 this.lock.lock();
469 try {
470 this.defaultMaxPerRoute = max;
471 } finally {
472 this.lock.unlock();
473 }
474 }
475
476 @Override
477 public int getDefaultMaxPerRoute() {
478 this.lock.lock();
479 try {
480 return this.defaultMaxPerRoute;
481 } finally {
482 this.lock.unlock();
483 }
484 }
485
486 @Override
487 public void setMaxPerRoute(final T route, final int max) {
488 Args.notNull(route, "Route");
489 this.lock.lock();
490 try {
491 if (max > -1) {
492 this.maxPerRoute.put(route, max);
493 } else {
494 this.maxPerRoute.remove(route);
495 }
496 } finally {
497 this.lock.unlock();
498 }
499 }
500
501 @Override
502 public int getMaxPerRoute(final T route) {
503 Args.notNull(route, "Route");
504 this.lock.lock();
505 try {
506 return getMax(route);
507 } finally {
508 this.lock.unlock();
509 }
510 }
511
512 @Override
513 public PoolStats getTotalStats() {
514 this.lock.lock();
515 try {
516 return new PoolStats(
517 this.leased.size(),
518 this.leasingRequests.size(),
519 this.available.size(),
520 this.maxTotal);
521 } finally {
522 this.lock.unlock();
523 }
524 }
525
526 @Override
527 public PoolStats getStats(final T route) {
528 Args.notNull(route, "Route");
529 this.lock.lock();
530 try {
531 final PerRoutePool<T, C> pool = getPool(route);
532 int pendingCount = 0;
533 for (final LeaseRequest<T, C> request: leasingRequests) {
534 if (LangUtils.equals(route, request.getRoute())) {
535 pendingCount++;
536 }
537 }
538 return new PoolStats(
539 pool.getLeasedCount(),
540 pendingCount,
541 pool.getAvailableCount(),
542 getMax(route));
543 } finally {
544 this.lock.unlock();
545 }
546 }
547
548
549
550
551
552
553 @Override
554 public Set<T> getRoutes() {
555 this.lock.lock();
556 try {
557 return new HashSet<>(routeToPool.keySet());
558 } finally {
559 this.lock.unlock();
560 }
561 }
562
563
564
565
566
567
568 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
569 this.lock.lock();
570 try {
571 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
572 while (it.hasNext()) {
573 final PoolEntry<T, C> entry = it.next();
574 callback.execute(entry);
575 if (!entry.hasConnection()) {
576 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
577 pool.remove(entry);
578 it.remove();
579 }
580 }
581 processPendingRequests();
582 purgePoolMap();
583 } finally {
584 this.lock.unlock();
585 }
586 }
587
588
589
590
591
592
593 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
594 this.lock.lock();
595 try {
596 final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
597 while (it.hasNext()) {
598 final PoolEntry<T, C> entry = it.next();
599 callback.execute(entry);
600 }
601 processPendingRequests();
602 } finally {
603 this.lock.unlock();
604 }
605 }
606
607 private void purgePoolMap() {
608 final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
609 while (it.hasNext()) {
610 final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
611 final PerRoutePool<T, C> pool = entry.getValue();
612 if (pool.getAllocatedCount() == 0) {
613 it.remove();
614 }
615 }
616 }
617
618 @Override
619 public void closeIdle(final TimeValue idleTime) {
620 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
621 enumAvailable(new Callback<PoolEntry<T, C>>() {
622
623 @Override
624 public void execute(final PoolEntry<T, C> entry) {
625 if (entry.getUpdated() <= deadline) {
626 entry.discardConnection(CloseMode.GRACEFUL);
627 }
628 }
629
630 });
631 }
632
633 @Override
634 public void closeExpired() {
635 final long now = System.currentTimeMillis();
636 enumAvailable(new Callback<PoolEntry<T, C>>() {
637
638 @Override
639 public void execute(final PoolEntry<T, C> entry) {
640 if (entry.getExpiryDeadline().isBefore(now)) {
641 entry.discardConnection(CloseMode.GRACEFUL);
642 }
643 }
644
645 });
646 }
647
648 @Override
649 public String toString() {
650 final StringBuilder buffer = new StringBuilder();
651 buffer.append("[leased: ");
652 buffer.append(this.leased.size());
653 buffer.append("][available: ");
654 buffer.append(this.available.size());
655 buffer.append("][pending: ");
656 buffer.append(this.leasingRequests.size());
657 buffer.append("]");
658 return buffer.toString();
659 }
660
661
662 static class LeaseRequest<T, C extends ModalCloseable> {
663
664 private final T route;
665 private final Object state;
666 private final Deadline deadline;
667 private final BasicFuture<PoolEntry<T, C>> future;
668 private final AtomicBoolean completed;
669 private volatile PoolEntry<T, C> result;
670 private volatile Exception ex;
671
672
673
674
675
676
677
678
679
680 public LeaseRequest(
681 final T route,
682 final Object state,
683 final Timeout requestTimeout,
684 final BasicFuture<PoolEntry<T, C>> future) {
685 super();
686 this.route = route;
687 this.state = state;
688 this.deadline = Deadline.calculate(requestTimeout);
689 this.future = future;
690 this.completed = new AtomicBoolean(false);
691 }
692
693 public T getRoute() {
694 return this.route;
695 }
696
697 public Object getState() {
698 return this.state;
699 }
700
701 public Deadline getDeadline() {
702 return this.deadline;
703 }
704
705 public boolean isDone() {
706 return this.completed.get();
707 }
708
709 public void failed(final Exception ex) {
710 if (this.completed.compareAndSet(false, true)) {
711 this.ex = ex;
712 }
713 }
714
715 public void completed(final PoolEntry<T, C> result) {
716 if (this.completed.compareAndSet(false, true)) {
717 this.result = result;
718 }
719 }
720
721 public BasicFuture<PoolEntry<T, C>> getFuture() {
722 return this.future;
723 }
724
725 public PoolEntry<T, C> getResult() {
726 return this.result;
727 }
728
729 public Exception getException() {
730 return this.ex;
731 }
732
733 @Override
734 public String toString() {
735 final StringBuilder buffer = new StringBuilder();
736 buffer.append("[");
737 buffer.append(this.route);
738 buffer.append("][");
739 buffer.append(this.state);
740 buffer.append("]");
741 return buffer.toString();
742 }
743
744 }
745
746 static class PerRoutePool<T, C extends ModalCloseable> {
747
748 private final T route;
749 private final Set<PoolEntry<T, C>> leased;
750 private final LinkedList<PoolEntry<T, C>> available;
751 private final DisposalCallback<C> disposalCallback;
752
753 PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
754 super();
755 this.route = route;
756 this.disposalCallback = disposalCallback;
757 this.leased = new HashSet<>();
758 this.available = new LinkedList<>();
759 }
760
761 public final T getRoute() {
762 return route;
763 }
764
765 public int getLeasedCount() {
766 return this.leased.size();
767 }
768
769 public int getAvailableCount() {
770 return this.available.size();
771 }
772
773 public int getAllocatedCount() {
774 return this.available.size() + this.leased.size();
775 }
776
777 public PoolEntry<T, C> getFree(final Object state) {
778 if (!this.available.isEmpty()) {
779 if (state != null) {
780 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
781 while (it.hasNext()) {
782 final PoolEntry<T, C> entry = it.next();
783 if (state.equals(entry.getState())) {
784 it.remove();
785 this.leased.add(entry);
786 return entry;
787 }
788 }
789 }
790 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
791 while (it.hasNext()) {
792 final PoolEntry<T, C> entry = it.next();
793 if (entry.getState() == null) {
794 it.remove();
795 this.leased.add(entry);
796 return entry;
797 }
798 }
799 }
800 return null;
801 }
802
803 public PoolEntry<T, C> getLastUsed() {
804 return this.available.peekLast();
805 }
806
807 public boolean remove(final PoolEntry<T, C> entry) {
808 return this.available.remove(entry) || this.leased.remove(entry);
809 }
810
811 public void free(final PoolEntry<T, C> entry, final boolean reusable) {
812 final boolean found = this.leased.remove(entry);
813 Asserts.check(found, "Entry %s has not been leased from this pool", entry);
814 if (reusable) {
815 this.available.addFirst(entry);
816 }
817 }
818
819 public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
820 final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
821 this.leased.add(entry);
822 return entry;
823 }
824
825 public void shutdown(final CloseMode closeMode) {
826 PoolEntry<T, C> availableEntry;
827 while ((availableEntry = available.poll()) != null) {
828 availableEntry.discardConnection(closeMode);
829 }
830 for (final PoolEntry<T, C> entry: this.leased) {
831 entry.discardConnection(closeMode);
832 }
833 this.leased.clear();
834 }
835
836 @Override
837 public String toString() {
838 final StringBuilder buffer = new StringBuilder();
839 buffer.append("[route: ");
840 buffer.append(this.route);
841 buffer.append("][leased: ");
842 buffer.append(this.leased.size());
843 buffer.append("][available: ");
844 buffer.append(this.available.size());
845 buffer.append("]");
846 return buffer.toString();
847 }
848
849 }
850 }