1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.ListIterator;
34 import java.util.Map;
35 import java.util.Objects;
36 import java.util.Set;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.locks.ReentrantLock;
44
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.ThreadingBehavior;
47 import org.apache.hc.core5.concurrent.BasicFuture;
48 import org.apache.hc.core5.concurrent.FutureCallback;
49 import org.apache.hc.core5.function.Callback;
50 import org.apache.hc.core5.io.CloseMode;
51 import org.apache.hc.core5.io.ModalCloseable;
52 import org.apache.hc.core5.util.Args;
53 import org.apache.hc.core5.util.Asserts;
54 import org.apache.hc.core5.util.Deadline;
55 import org.apache.hc.core5.util.DeadlineTimeoutException;
56 import org.apache.hc.core5.util.TimeValue;
57 import org.apache.hc.core5.util.Timeout;
58
59
60
61
62
63
64
65
66
67 @Contract(threading = ThreadingBehavior.SAFE)
68 public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
69
70 private final TimeValue timeToLive;
71 private final PoolReusePolicy policy;
72 private final DisposalCallback<C> disposalCallback;
73 private final ConnPoolListener<T> connPoolListener;
74 private final Map<T, PerRoutePool<T, C>> routeToPool;
75 private final LinkedList<LeaseRequest<T, C>> pendingRequests;
76 private final Set<PoolEntry<T, C>> leased;
77 private final LinkedList<PoolEntry<T, C>> available;
78 private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
79 private final Map<T, Integer> maxPerRoute;
80 private final ReentrantLock lock;
81 private final AtomicBoolean isShutDown;
82
83 private volatile int defaultMaxPerRoute;
84 private volatile int maxTotal;
85
86
87
88
89 public StrictConnPool(
90 final int defaultMaxPerRoute,
91 final int maxTotal,
92 final TimeValue timeToLive,
93 final PoolReusePolicy policy,
94 final DisposalCallback<C> disposalCallback,
95 final ConnPoolListener<T> connPoolListener) {
96 super();
97 Args.positive(defaultMaxPerRoute, "Max per route value");
98 Args.positive(maxTotal, "Max total value");
99 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
100 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
101 this.disposalCallback = disposalCallback;
102 this.connPoolListener = connPoolListener;
103 this.routeToPool = new HashMap<>();
104 this.pendingRequests = new LinkedList<>();
105 this.leased = new HashSet<>();
106 this.available = new LinkedList<>();
107 this.completedRequests = new ConcurrentLinkedQueue<>();
108 this.maxPerRoute = new HashMap<>();
109 this.lock = new ReentrantLock();
110 this.isShutDown = new AtomicBoolean(false);
111 this.defaultMaxPerRoute = defaultMaxPerRoute;
112 this.maxTotal = maxTotal;
113 }
114
115
116
117
118 public StrictConnPool(
119 final int defaultMaxPerRoute,
120 final int maxTotal,
121 final TimeValue timeToLive,
122 final PoolReusePolicy policy,
123 final ConnPoolListener<T> connPoolListener) {
124 this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
125 }
126
127 public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
128 this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
129 }
130
131 public boolean isShutdown() {
132 return this.isShutDown.get();
133 }
134
135 @Override
136 public void close(final CloseMode closeMode) {
137 if (this.isShutDown.compareAndSet(false, true)) {
138 fireCallbacks();
139 this.lock.lock();
140 try {
141 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
142 pool.shutdown(closeMode);
143 }
144 this.routeToPool.clear();
145 this.leased.clear();
146 this.available.clear();
147 this.pendingRequests.clear();
148 } finally {
149 this.lock.unlock();
150 }
151 }
152 }
153
154 @Override
155 public void close() {
156 close(CloseMode.GRACEFUL);
157 }
158
159 private PerRoutePool<T, C> getPool(final T route) {
160 PerRoutePool<T, C> pool = this.routeToPool.get(route);
161 if (pool == null) {
162 pool = new PerRoutePool<>(route, this.disposalCallback);
163 this.routeToPool.put(route, pool);
164 }
165 return pool;
166 }
167
168 @Override
169 public Future<PoolEntry<T, C>> lease(
170 final T route, final Object state,
171 final Timeout requestTimeout,
172 final FutureCallback<PoolEntry<T, C>> callback) {
173 Args.notNull(route, "Route");
174 Args.notNull(requestTimeout, "Request timeout");
175 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
176 final Deadline deadline = Deadline.calculate(requestTimeout);
177 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
178
179 @Override
180 public PoolEntry<T, C> get(
181 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
182 try {
183 return super.get(timeout, unit);
184 } catch (final TimeoutException ex) {
185 cancel();
186 throw ex;
187 }
188 }
189
190 };
191 final boolean acquiredLock;
192
193 try {
194 if (Timeout.isPositive(requestTimeout)) {
195 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
196 } else {
197 this.lock.lockInterruptibly();
198 acquiredLock = true;
199 }
200 } catch (final InterruptedException interruptedException) {
201 Thread.currentThread().interrupt();
202 future.cancel();
203 return future;
204 }
205
206 if (acquiredLock) {
207 try {
208 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
209 final boolean completed = processPendingRequest(request);
210 if (!request.isDone() && !completed) {
211 this.pendingRequests.add(request);
212 }
213 if (request.isDone()) {
214 this.completedRequests.add(request);
215 }
216 } finally {
217 this.lock.unlock();
218 }
219 fireCallbacks();
220 } else {
221 future.failed(DeadlineTimeoutException.from(deadline));
222 }
223
224 return future;
225 }
226
227 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
228 return lease(route, state, Timeout.DISABLED, null);
229 }
230
231 @Override
232 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
233 if (entry == null) {
234 return;
235 }
236 if (this.isShutDown.get()) {
237 return;
238 }
239 if (!reusable) {
240 entry.discardConnection(CloseMode.GRACEFUL);
241 }
242 this.lock.lock();
243 try {
244 if (this.leased.remove(entry)) {
245 if (this.connPoolListener != null) {
246 this.connPoolListener.onRelease(entry.getRoute(), this);
247 }
248 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
249 final boolean keepAlive = entry.hasConnection() && reusable;
250 pool.free(entry, keepAlive);
251 if (keepAlive) {
252 switch (policy) {
253 case LIFO:
254 this.available.addFirst(entry);
255 break;
256 case FIFO:
257 this.available.addLast(entry);
258 break;
259 default:
260 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
261 }
262 } else {
263 entry.discardConnection(CloseMode.GRACEFUL);
264 }
265 processNextPendingRequest();
266 } else {
267 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
268 }
269 } finally {
270 this.lock.unlock();
271 }
272 fireCallbacks();
273 }
274
275 private void processPendingRequests() {
276 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
277 while (it.hasNext()) {
278 final LeaseRequest<T, C> request = it.next();
279 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
280 if (future.isCancelled()) {
281 it.remove();
282 continue;
283 }
284 final boolean completed = processPendingRequest(request);
285 if (request.isDone() || completed) {
286 it.remove();
287 }
288 if (request.isDone()) {
289 this.completedRequests.add(request);
290 }
291 }
292 }
293
294 private void processNextPendingRequest() {
295 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
296 while (it.hasNext()) {
297 final LeaseRequest<T, C> request = it.next();
298 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
299 if (future.isCancelled()) {
300 it.remove();
301 continue;
302 }
303 final boolean completed = processPendingRequest(request);
304 if (request.isDone() || completed) {
305 it.remove();
306 }
307 if (request.isDone()) {
308 this.completedRequests.add(request);
309 }
310 if (completed) {
311 return;
312 }
313 }
314 }
315
316 private boolean processPendingRequest(final LeaseRequest<T, C> request) {
317 final T route = request.getRoute();
318 final Object state = request.getState();
319 final Deadline deadline = request.getDeadline();
320
321 if (deadline.isExpired()) {
322 request.failed(DeadlineTimeoutException.from(deadline));
323 return false;
324 }
325
326 final PerRoutePool<T, C> pool = getPool(route);
327 PoolEntry<T, C> entry;
328 for (;;) {
329 entry = pool.getFree(state);
330 if (entry == null) {
331 break;
332 }
333 if (entry.getExpiryDeadline().isExpired()) {
334 entry.discardConnection(CloseMode.GRACEFUL);
335 this.available.remove(entry);
336 pool.free(entry, false);
337 } else {
338 break;
339 }
340 }
341 if (entry != null) {
342 this.available.remove(entry);
343 this.leased.add(entry);
344 request.completed(entry);
345 if (this.connPoolListener != null) {
346 this.connPoolListener.onLease(entry.getRoute(), this);
347 }
348 return true;
349 }
350
351
352 final int maxPerRoute = getMax(route);
353
354 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
355 if (excess > 0) {
356 for (int i = 0; i < excess; i++) {
357 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
358 if (lastUsed == null) {
359 break;
360 }
361 lastUsed.discardConnection(CloseMode.GRACEFUL);
362 this.available.remove(lastUsed);
363 pool.remove(lastUsed);
364 }
365 }
366
367 if (pool.getAllocatedCount() < maxPerRoute) {
368 final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
369 if (freeCapacity == 0) {
370 return false;
371 }
372 final int totalAvailable = this.available.size();
373 if (totalAvailable > freeCapacity - 1) {
374 final PoolEntry<T, C> lastUsed = this.available.removeLast();
375 lastUsed.discardConnection(CloseMode.GRACEFUL);
376 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
377 otherpool.remove(lastUsed);
378 }
379
380 entry = pool.createEntry(this.timeToLive);
381 this.leased.add(entry);
382 request.completed(entry);
383 if (this.connPoolListener != null) {
384 this.connPoolListener.onLease(entry.getRoute(), this);
385 }
386 return true;
387 }
388 return false;
389 }
390
391 private void fireCallbacks() {
392 LeaseRequest<T, C> request;
393 while ((request = this.completedRequests.poll()) != null) {
394 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
395 final Exception ex = request.getException();
396 final PoolEntry<T, C> result = request.getResult();
397 boolean successfullyCompleted = false;
398 if (ex != null) {
399 future.failed(ex);
400 } else if (result != null) {
401 if (future.completed(result)) {
402 successfullyCompleted = true;
403 }
404 } else {
405 future.cancel();
406 }
407 if (!successfullyCompleted) {
408 release(result, true);
409 }
410 }
411 }
412
413 public void validatePendingRequests() {
414 this.lock.lock();
415 try {
416 final long now = System.currentTimeMillis();
417 final ListIterator<LeaseRequest<T, C>> it = this.pendingRequests.listIterator();
418 while (it.hasNext()) {
419 final LeaseRequest<T, C> request = it.next();
420 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
421 if (future.isCancelled() && !request.isDone()) {
422 it.remove();
423 } else {
424 final Deadline deadline = request.getDeadline();
425 if (deadline.isBefore(now)) {
426 request.failed(DeadlineTimeoutException.from(deadline));
427 }
428 if (request.isDone()) {
429 it.remove();
430 this.completedRequests.add(request);
431 }
432 }
433 }
434 } finally {
435 this.lock.unlock();
436 }
437 fireCallbacks();
438 }
439
440 private int getMax(final T route) {
441 final Integer v = this.maxPerRoute.get(route);
442 if (v != null) {
443 return v;
444 }
445 return this.defaultMaxPerRoute;
446 }
447
448 @Override
449 public void setMaxTotal(final int max) {
450 Args.positive(max, "Max value");
451 this.lock.lock();
452 try {
453 this.maxTotal = max;
454 } finally {
455 this.lock.unlock();
456 }
457 }
458
459 @Override
460 public int getMaxTotal() {
461 this.lock.lock();
462 try {
463 return this.maxTotal;
464 } finally {
465 this.lock.unlock();
466 }
467 }
468
469 @Override
470 public void setDefaultMaxPerRoute(final int max) {
471 Args.positive(max, "Max value");
472 this.lock.lock();
473 try {
474 this.defaultMaxPerRoute = max;
475 } finally {
476 this.lock.unlock();
477 }
478 }
479
480 @Override
481 public int getDefaultMaxPerRoute() {
482 this.lock.lock();
483 try {
484 return this.defaultMaxPerRoute;
485 } finally {
486 this.lock.unlock();
487 }
488 }
489
490 @Override
491 public void setMaxPerRoute(final T route, final int max) {
492 Args.notNull(route, "Route");
493 this.lock.lock();
494 try {
495 if (max > -1) {
496 this.maxPerRoute.put(route, max);
497 } else {
498 this.maxPerRoute.remove(route);
499 }
500 } finally {
501 this.lock.unlock();
502 }
503 }
504
505 @Override
506 public int getMaxPerRoute(final T route) {
507 Args.notNull(route, "Route");
508 this.lock.lock();
509 try {
510 return getMax(route);
511 } finally {
512 this.lock.unlock();
513 }
514 }
515
516 @Override
517 public PoolStats getTotalStats() {
518 this.lock.lock();
519 try {
520 int pendingCount = 0;
521 for (final LeaseRequest<T, C> request: pendingRequests) {
522 if (!request.isDone()) {
523 final Deadline deadline = request.getDeadline();
524 if (!deadline.isExpired()) {
525 pendingCount++;
526 }
527 }
528 }
529 return new PoolStats(
530 this.leased.size(),
531 pendingCount,
532 this.available.size(),
533 this.maxTotal);
534 } finally {
535 this.lock.unlock();
536 }
537 }
538
539 @Override
540 public PoolStats getStats(final T route) {
541 Args.notNull(route, "Route");
542 this.lock.lock();
543 try {
544 final PerRoutePool<T, C> pool = getPool(route);
545 int pendingCount = 0;
546 for (final LeaseRequest<T, C> request: pendingRequests) {
547 if (!request.isDone() && Objects.equals(route, request.getRoute())) {
548 final Deadline deadline = request.getDeadline();
549 if (!deadline.isExpired()) {
550 pendingCount++;
551 }
552 }
553 }
554 return new PoolStats(
555 pool.getLeasedCount(),
556 pendingCount,
557 pool.getAvailableCount(),
558 getMax(route));
559 } finally {
560 this.lock.unlock();
561 }
562 }
563
564
565
566
567
568
569 @Override
570 public Set<T> getRoutes() {
571 this.lock.lock();
572 try {
573 return new HashSet<>(routeToPool.keySet());
574 } finally {
575 this.lock.unlock();
576 }
577 }
578
579
580
581
582
583
584 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
585 this.lock.lock();
586 try {
587 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
588 while (it.hasNext()) {
589 final PoolEntry<T, C> entry = it.next();
590 callback.execute(entry);
591 if (!entry.hasConnection()) {
592 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
593 pool.remove(entry);
594 it.remove();
595 }
596 }
597 processPendingRequests();
598 purgePoolMap();
599 } finally {
600 this.lock.unlock();
601 }
602 }
603
604
605
606
607
608
609 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
610 this.lock.lock();
611 try {
612 final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
613 while (it.hasNext()) {
614 final PoolEntry<T, C> entry = it.next();
615 callback.execute(entry);
616 }
617 processPendingRequests();
618 } finally {
619 this.lock.unlock();
620 }
621 }
622
623 private void purgePoolMap() {
624 final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
625 while (it.hasNext()) {
626 final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
627 final PerRoutePool<T, C> pool = entry.getValue();
628 if (pool.getAllocatedCount() == 0) {
629 it.remove();
630 }
631 }
632 }
633
634 @Override
635 public void closeIdle(final TimeValue idleTime) {
636 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
637 enumAvailable(entry -> {
638 if (entry.getUpdated() <= deadline) {
639 entry.discardConnection(CloseMode.GRACEFUL);
640 }
641 });
642 }
643
644 @Override
645 public void closeExpired() {
646 final long now = System.currentTimeMillis();
647 enumAvailable(entry -> {
648 if (entry.getExpiryDeadline().isBefore(now)) {
649 entry.discardConnection(CloseMode.GRACEFUL);
650 }
651 });
652 }
653
654 @Override
655 public String toString() {
656 final StringBuilder buffer = new StringBuilder();
657 buffer.append("[leased: ");
658 buffer.append(this.leased.size());
659 buffer.append("][available: ");
660 buffer.append(this.available.size());
661 buffer.append("][pending: ");
662 buffer.append(this.pendingRequests.size());
663 buffer.append("]");
664 return buffer.toString();
665 }
666
667
668 static class LeaseRequest<T, C extends ModalCloseable> {
669
670 private final T route;
671 private final Object state;
672 private final Deadline deadline;
673 private final BasicFuture<PoolEntry<T, C>> future;
674
675
676 private final AtomicBoolean completed;
677 private volatile PoolEntry<T, C> result;
678 private volatile Exception ex;
679
680
681
682
683
684
685
686
687
688 public LeaseRequest(
689 final T route,
690 final Object state,
691 final Timeout requestTimeout,
692 final BasicFuture<PoolEntry<T, C>> future) {
693 super();
694 this.route = route;
695 this.state = state;
696 this.deadline = Deadline.calculate(requestTimeout);
697 this.future = future;
698 this.completed = new AtomicBoolean(false);
699 }
700
701 public T getRoute() {
702 return this.route;
703 }
704
705 public Object getState() {
706 return this.state;
707 }
708
709 public Deadline getDeadline() {
710 return this.deadline;
711 }
712
713 public boolean isDone() {
714
715
716
717 return ex != null || result != null;
718 }
719
720 public void failed(final Exception ex) {
721 if (this.completed.compareAndSet(false, true)) {
722 this.ex = ex;
723 }
724 }
725
726 public void completed(final PoolEntry<T, C> result) {
727 if (this.completed.compareAndSet(false, true)) {
728 this.result = result;
729 }
730 }
731
732 public BasicFuture<PoolEntry<T, C>> getFuture() {
733 return this.future;
734 }
735
736 public PoolEntry<T, C> getResult() {
737 return this.result;
738 }
739
740 public Exception getException() {
741 return this.ex;
742 }
743
744 @Override
745 public String toString() {
746 final StringBuilder buffer = new StringBuilder();
747 buffer.append("[");
748 buffer.append(this.route);
749 buffer.append("][");
750 buffer.append(this.state);
751 buffer.append("]");
752 return buffer.toString();
753 }
754
755 }
756
757 static class PerRoutePool<T, C extends ModalCloseable> {
758
759 private final T route;
760 private final Set<PoolEntry<T, C>> leased;
761 private final LinkedList<PoolEntry<T, C>> available;
762 private final DisposalCallback<C> disposalCallback;
763
764 PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
765 super();
766 this.route = route;
767 this.disposalCallback = disposalCallback;
768 this.leased = new HashSet<>();
769 this.available = new LinkedList<>();
770 }
771
772 public final T getRoute() {
773 return route;
774 }
775
776 public int getLeasedCount() {
777 return this.leased.size();
778 }
779
780 public int getAvailableCount() {
781 return this.available.size();
782 }
783
784 public int getAllocatedCount() {
785 return this.available.size() + this.leased.size();
786 }
787
788 public PoolEntry<T, C> getFree(final Object state) {
789 if (!this.available.isEmpty()) {
790 if (state != null) {
791 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
792 while (it.hasNext()) {
793 final PoolEntry<T, C> entry = it.next();
794 if (state.equals(entry.getState())) {
795 it.remove();
796 this.leased.add(entry);
797 return entry;
798 }
799 }
800 }
801 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
802 while (it.hasNext()) {
803 final PoolEntry<T, C> entry = it.next();
804 if (entry.getState() == null) {
805 it.remove();
806 this.leased.add(entry);
807 return entry;
808 }
809 }
810 }
811 return null;
812 }
813
814 public PoolEntry<T, C> getLastUsed() {
815 return this.available.peekLast();
816 }
817
818 public boolean remove(final PoolEntry<T, C> entry) {
819 return this.available.remove(entry) || this.leased.remove(entry);
820 }
821
822 public void free(final PoolEntry<T, C> entry, final boolean reusable) {
823 final boolean found = this.leased.remove(entry);
824 Asserts.check(found, "Entry %s has not been leased from this pool", entry);
825 if (reusable) {
826 this.available.addFirst(entry);
827 }
828 }
829
830 public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
831 final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
832 this.leased.add(entry);
833 return entry;
834 }
835
836 public void shutdown(final CloseMode closeMode) {
837 PoolEntry<T, C> availableEntry;
838 while ((availableEntry = available.poll()) != null) {
839 availableEntry.discardConnection(closeMode);
840 }
841 for (final PoolEntry<T, C> entry: this.leased) {
842 entry.discardConnection(closeMode);
843 }
844 this.leased.clear();
845 }
846
847 @Override
848 public String toString() {
849 final StringBuilder buffer = new StringBuilder();
850 buffer.append("[route: ");
851 buffer.append(this.route);
852 buffer.append("][leased: ");
853 buffer.append(this.leased.size());
854 buffer.append("][available: ");
855 buffer.append(this.available.size());
856 buffer.append("]");
857 return buffer.toString();
858 }
859
860 }
861 }