1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.LinkedList;
33 import java.util.ListIterator;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.locks.Lock;
43 import java.util.concurrent.locks.ReentrantLock;
44
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.ThreadingBehavior;
47 import org.apache.hc.core5.concurrent.BasicFuture;
48 import org.apache.hc.core5.concurrent.FutureCallback;
49 import org.apache.hc.core5.function.Callback;
50 import org.apache.hc.core5.io.CloseMode;
51 import org.apache.hc.core5.io.ModalCloseable;
52 import org.apache.hc.core5.util.Args;
53 import org.apache.hc.core5.util.Asserts;
54 import org.apache.hc.core5.util.Deadline;
55 import org.apache.hc.core5.util.DeadlineTimeoutException;
56 import org.apache.hc.core5.util.LangUtils;
57 import org.apache.hc.core5.util.TimeValue;
58 import org.apache.hc.core5.util.Timeout;
59
60
61
62
63
64
65
66
67
68 @Contract(threading = ThreadingBehavior.SAFE)
69 public class StrictConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70
71 private final TimeValue timeToLive;
72 private final PoolReusePolicy policy;
73 private final DisposalCallback<C> disposalCallback;
74 private final ConnPoolListener<T> connPoolListener;
75 private final Map<T, PerRoutePool<T, C>> routeToPool;
76 private final LinkedList<LeaseRequest<T, C>> leasingRequests;
77 private final Set<PoolEntry<T, C>> leased;
78 private final LinkedList<PoolEntry<T, C>> available;
79 private final ConcurrentLinkedQueue<LeaseRequest<T, C>> completedRequests;
80 private final Map<T, Integer> maxPerRoute;
81 private final Lock lock;
82 private final AtomicBoolean isShutDown;
83
84 private volatile int defaultMaxPerRoute;
85 private volatile int maxTotal;
86
87
88
89
90 public StrictConnPool(
91 final int defaultMaxPerRoute,
92 final int maxTotal,
93 final TimeValue timeToLive,
94 final PoolReusePolicy policy,
95 final DisposalCallback<C> disposalCallback,
96 final ConnPoolListener<T> connPoolListener) {
97 super();
98 Args.positive(defaultMaxPerRoute, "Max per route value");
99 Args.positive(maxTotal, "Max total value");
100 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
101 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
102 this.disposalCallback = disposalCallback;
103 this.connPoolListener = connPoolListener;
104 this.routeToPool = new HashMap<>();
105 this.leasingRequests = new LinkedList<>();
106 this.leased = new HashSet<>();
107 this.available = new LinkedList<>();
108 this.completedRequests = new ConcurrentLinkedQueue<>();
109 this.maxPerRoute = new HashMap<>();
110 this.lock = new ReentrantLock();
111 this.isShutDown = new AtomicBoolean(false);
112 this.defaultMaxPerRoute = defaultMaxPerRoute;
113 this.maxTotal = maxTotal;
114 }
115
116
117
118
119 public StrictConnPool(
120 final int defaultMaxPerRoute,
121 final int maxTotal,
122 final TimeValue timeToLive,
123 final PoolReusePolicy policy,
124 final ConnPoolListener<T> connPoolListener) {
125 this(defaultMaxPerRoute, maxTotal, timeToLive, policy, null, connPoolListener);
126 }
127
128 public StrictConnPool(final int defaultMaxPerRoute, final int maxTotal) {
129 this(defaultMaxPerRoute, maxTotal, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null);
130 }
131
132 public boolean isShutdown() {
133 return this.isShutDown.get();
134 }
135
136 @Override
137 public void close(final CloseMode closeMode) {
138 if (this.isShutDown.compareAndSet(false, true)) {
139 fireCallbacks();
140 this.lock.lock();
141 try {
142 for (final PerRoutePool<T, C> pool: this.routeToPool.values()) {
143 pool.shutdown(closeMode);
144 }
145 this.routeToPool.clear();
146 this.leased.clear();
147 this.available.clear();
148 this.leasingRequests.clear();
149 } finally {
150 this.lock.unlock();
151 }
152 }
153 }
154
155 @Override
156 public void close() {
157 close(CloseMode.GRACEFUL);
158 }
159
160 private PerRoutePool<T, C> getPool(final T route) {
161 PerRoutePool<T, C> pool = this.routeToPool.get(route);
162 if (pool == null) {
163 pool = new PerRoutePool<>(route, this.disposalCallback);
164 this.routeToPool.put(route, pool);
165 }
166 return pool;
167 }
168
169 @Override
170 public Future<PoolEntry<T, C>> lease(
171 final T route, final Object state,
172 final Timeout requestTimeout,
173 final FutureCallback<PoolEntry<T, C>> callback) {
174 Args.notNull(route, "Route");
175 Args.notNull(requestTimeout, "Request timeout");
176 Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
177 final Deadline deadline = Deadline.calculate(requestTimeout);
178 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
179
180 @Override
181 public synchronized PoolEntry<T, C> get(
182 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
183 try {
184 return super.get(timeout, unit);
185 } catch (final TimeoutException ex) {
186 cancel();
187 throw ex;
188 }
189 }
190
191 };
192 final boolean acquiredLock;
193
194 try {
195 acquiredLock = this.lock.tryLock(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
196 } catch (final InterruptedException interruptedException) {
197 Thread.currentThread().interrupt();
198 future.cancel();
199 return future;
200 }
201
202 if (acquiredLock) {
203 try {
204 final LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
205 final boolean completed = processPendingRequest(request);
206 if (!request.isDone() && !completed) {
207 this.leasingRequests.add(request);
208 }
209 if (request.isDone()) {
210 this.completedRequests.add(request);
211 }
212 } finally {
213 this.lock.unlock();
214 }
215 fireCallbacks();
216 } else {
217 future.failed(DeadlineTimeoutException.from(deadline));
218 }
219
220 return future;
221 }
222
223 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
224 return lease(route, state, Timeout.DISABLED, null);
225 }
226
227 @Override
228 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
229 if (entry == null) {
230 return;
231 }
232 if (this.isShutDown.get()) {
233 return;
234 }
235 if (!reusable) {
236 entry.discardConnection(CloseMode.GRACEFUL);
237 }
238 this.lock.lock();
239 try {
240 if (this.leased.remove(entry)) {
241 if (this.connPoolListener != null) {
242 this.connPoolListener.onRelease(entry.getRoute(), this);
243 }
244 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
245 final boolean keepAlive = entry.hasConnection() && reusable;
246 pool.free(entry, keepAlive);
247 if (keepAlive) {
248 switch (policy) {
249 case LIFO:
250 this.available.addFirst(entry);
251 break;
252 case FIFO:
253 this.available.addLast(entry);
254 break;
255 default:
256 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
257 }
258 } else {
259 entry.discardConnection(CloseMode.GRACEFUL);
260 }
261 processNextPendingRequest();
262 } else {
263 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
264 }
265 } finally {
266 this.lock.unlock();
267 }
268 fireCallbacks();
269 }
270
271 private void processPendingRequests() {
272 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
273 while (it.hasNext()) {
274 final LeaseRequest<T, C> request = it.next();
275 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
276 if (future.isCancelled()) {
277 it.remove();
278 continue;
279 }
280 final boolean completed = processPendingRequest(request);
281 if (request.isDone() || completed) {
282 it.remove();
283 }
284 if (request.isDone()) {
285 this.completedRequests.add(request);
286 }
287 }
288 }
289
290 private void processNextPendingRequest() {
291 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
292 while (it.hasNext()) {
293 final LeaseRequest<T, C> request = it.next();
294 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
295 if (future.isCancelled()) {
296 it.remove();
297 continue;
298 }
299 final boolean completed = processPendingRequest(request);
300 if (request.isDone() || completed) {
301 it.remove();
302 }
303 if (request.isDone()) {
304 this.completedRequests.add(request);
305 }
306 if (completed) {
307 return;
308 }
309 }
310 }
311
312 private boolean processPendingRequest(final LeaseRequest<T, C> request) {
313 final T route = request.getRoute();
314 final Object state = request.getState();
315 final Deadline deadline = request.getDeadline();
316
317 if (deadline.isExpired()) {
318 request.failed(DeadlineTimeoutException.from(deadline));
319 return false;
320 }
321
322 final PerRoutePool<T, C> pool = getPool(route);
323 PoolEntry<T, C> entry;
324 for (;;) {
325 entry = pool.getFree(state);
326 if (entry == null) {
327 break;
328 }
329 if (entry.getExpiryDeadline().isExpired()) {
330 entry.discardConnection(CloseMode.GRACEFUL);
331 this.available.remove(entry);
332 pool.free(entry, false);
333 } else {
334 break;
335 }
336 }
337 if (entry != null) {
338 this.available.remove(entry);
339 this.leased.add(entry);
340 request.completed(entry);
341 if (this.connPoolListener != null) {
342 this.connPoolListener.onLease(entry.getRoute(), this);
343 }
344 return true;
345 }
346
347
348 final int maxPerRoute = getMax(route);
349
350 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
351 if (excess > 0) {
352 for (int i = 0; i < excess; i++) {
353 final PoolEntry<T, C> lastUsed = pool.getLastUsed();
354 if (lastUsed == null) {
355 break;
356 }
357 lastUsed.discardConnection(CloseMode.GRACEFUL);
358 this.available.remove(lastUsed);
359 pool.remove(lastUsed);
360 }
361 }
362
363 if (pool.getAllocatedCount() < maxPerRoute) {
364 final int freeCapacity = Math.max(this.maxTotal - this.leased.size(), 0);
365 if (freeCapacity == 0) {
366 return false;
367 }
368 final int totalAvailable = this.available.size();
369 if (totalAvailable > freeCapacity - 1) {
370 if (!this.available.isEmpty()) {
371 final PoolEntry<T, C> lastUsed = this.available.removeLast();
372 lastUsed.discardConnection(CloseMode.GRACEFUL);
373 final PerRoutePool<T, C> otherpool = getPool(lastUsed.getRoute());
374 otherpool.remove(lastUsed);
375 }
376 }
377
378 entry = pool.createEntry(this.timeToLive);
379 this.leased.add(entry);
380 request.completed(entry);
381 if (this.connPoolListener != null) {
382 this.connPoolListener.onLease(entry.getRoute(), this);
383 }
384 return true;
385 }
386 return false;
387 }
388
389 private void fireCallbacks() {
390 LeaseRequest<T, C> request;
391 while ((request = this.completedRequests.poll()) != null) {
392 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
393 final Exception ex = request.getException();
394 final PoolEntry<T, C> result = request.getResult();
395 boolean successfullyCompleted = false;
396 if (ex != null) {
397 future.failed(ex);
398 } else if (result != null) {
399 if (future.completed(result)) {
400 successfullyCompleted = true;
401 }
402 } else {
403 future.cancel();
404 }
405 if (!successfullyCompleted) {
406 release(result, true);
407 }
408 }
409 }
410
411 public void validatePendingRequests() {
412 this.lock.lock();
413 try {
414 final long now = System.currentTimeMillis();
415 final ListIterator<LeaseRequest<T, C>> it = this.leasingRequests.listIterator();
416 while (it.hasNext()) {
417 final LeaseRequest<T, C> request = it.next();
418 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
419 if (future.isCancelled() && !request.isDone()) {
420 it.remove();
421 } else {
422 final Deadline deadline = request.getDeadline();
423 if (deadline.isBefore(now)) {
424 request.failed(DeadlineTimeoutException.from(deadline));
425 }
426 if (request.isDone()) {
427 it.remove();
428 this.completedRequests.add(request);
429 }
430 }
431 }
432 } finally {
433 this.lock.unlock();
434 }
435 fireCallbacks();
436 }
437
438 private int getMax(final T route) {
439 final Integer v = this.maxPerRoute.get(route);
440 if (v != null) {
441 return v;
442 }
443 return this.defaultMaxPerRoute;
444 }
445
446 @Override
447 public void setMaxTotal(final int max) {
448 Args.positive(max, "Max value");
449 this.lock.lock();
450 try {
451 this.maxTotal = max;
452 } finally {
453 this.lock.unlock();
454 }
455 }
456
457 @Override
458 public int getMaxTotal() {
459 this.lock.lock();
460 try {
461 return this.maxTotal;
462 } finally {
463 this.lock.unlock();
464 }
465 }
466
467 @Override
468 public void setDefaultMaxPerRoute(final int max) {
469 Args.positive(max, "Max value");
470 this.lock.lock();
471 try {
472 this.defaultMaxPerRoute = max;
473 } finally {
474 this.lock.unlock();
475 }
476 }
477
478 @Override
479 public int getDefaultMaxPerRoute() {
480 this.lock.lock();
481 try {
482 return this.defaultMaxPerRoute;
483 } finally {
484 this.lock.unlock();
485 }
486 }
487
488 @Override
489 public void setMaxPerRoute(final T route, final int max) {
490 Args.notNull(route, "Route");
491 this.lock.lock();
492 try {
493 if (max > -1) {
494 this.maxPerRoute.put(route, max);
495 } else {
496 this.maxPerRoute.remove(route);
497 }
498 } finally {
499 this.lock.unlock();
500 }
501 }
502
503 @Override
504 public int getMaxPerRoute(final T route) {
505 Args.notNull(route, "Route");
506 this.lock.lock();
507 try {
508 return getMax(route);
509 } finally {
510 this.lock.unlock();
511 }
512 }
513
514 @Override
515 public PoolStats getTotalStats() {
516 this.lock.lock();
517 try {
518 return new PoolStats(
519 this.leased.size(),
520 this.leasingRequests.size(),
521 this.available.size(),
522 this.maxTotal);
523 } finally {
524 this.lock.unlock();
525 }
526 }
527
528 @Override
529 public PoolStats getStats(final T route) {
530 Args.notNull(route, "Route");
531 this.lock.lock();
532 try {
533 final PerRoutePool<T, C> pool = getPool(route);
534 int pendingCount = 0;
535 for (final LeaseRequest<T, C> request: leasingRequests) {
536 if (LangUtils.equals(route, request.getRoute())) {
537 pendingCount++;
538 }
539 }
540 return new PoolStats(
541 pool.getLeasedCount(),
542 pendingCount,
543 pool.getAvailableCount(),
544 getMax(route));
545 } finally {
546 this.lock.unlock();
547 }
548 }
549
550
551
552
553
554
555 @Override
556 public Set<T> getRoutes() {
557 this.lock.lock();
558 try {
559 return new HashSet<>(routeToPool.keySet());
560 } finally {
561 this.lock.unlock();
562 }
563 }
564
565
566
567
568
569
570 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
571 this.lock.lock();
572 try {
573 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
574 while (it.hasNext()) {
575 final PoolEntry<T, C> entry = it.next();
576 callback.execute(entry);
577 if (!entry.hasConnection()) {
578 final PerRoutePool<T, C> pool = getPool(entry.getRoute());
579 pool.remove(entry);
580 it.remove();
581 }
582 }
583 processPendingRequests();
584 purgePoolMap();
585 } finally {
586 this.lock.unlock();
587 }
588 }
589
590
591
592
593
594
595 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
596 this.lock.lock();
597 try {
598 final Iterator<PoolEntry<T, C>> it = this.leased.iterator();
599 while (it.hasNext()) {
600 final PoolEntry<T, C> entry = it.next();
601 callback.execute(entry);
602 }
603 processPendingRequests();
604 } finally {
605 this.lock.unlock();
606 }
607 }
608
609 private void purgePoolMap() {
610 final Iterator<Map.Entry<T, PerRoutePool<T, C>>> it = this.routeToPool.entrySet().iterator();
611 while (it.hasNext()) {
612 final Map.Entry<T, PerRoutePool<T, C>> entry = it.next();
613 final PerRoutePool<T, C> pool = entry.getValue();
614 if (pool.getAllocatedCount() == 0) {
615 it.remove();
616 }
617 }
618 }
619
620 @Override
621 public void closeIdle(final TimeValue idleTime) {
622 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
623 enumAvailable(new Callback<PoolEntry<T, C>>() {
624
625 @Override
626 public void execute(final PoolEntry<T, C> entry) {
627 if (entry.getUpdated() <= deadline) {
628 entry.discardConnection(CloseMode.GRACEFUL);
629 }
630 }
631
632 });
633 }
634
635 @Override
636 public void closeExpired() {
637 final long now = System.currentTimeMillis();
638 enumAvailable(new Callback<PoolEntry<T, C>>() {
639
640 @Override
641 public void execute(final PoolEntry<T, C> entry) {
642 if (entry.getExpiryDeadline().isBefore(now)) {
643 entry.discardConnection(CloseMode.GRACEFUL);
644 }
645 }
646
647 });
648 }
649
650 @Override
651 public String toString() {
652 final StringBuilder buffer = new StringBuilder();
653 buffer.append("[leased: ");
654 buffer.append(this.leased.size());
655 buffer.append("][available: ");
656 buffer.append(this.available.size());
657 buffer.append("][pending: ");
658 buffer.append(this.leasingRequests.size());
659 buffer.append("]");
660 return buffer.toString();
661 }
662
663
664 static class LeaseRequest<T, C extends ModalCloseable> {
665
666 private final T route;
667 private final Object state;
668 private final Deadline deadline;
669 private final BasicFuture<PoolEntry<T, C>> future;
670 private final AtomicBoolean completed;
671 private volatile PoolEntry<T, C> result;
672 private volatile Exception ex;
673
674
675
676
677
678
679
680
681
682 public LeaseRequest(
683 final T route,
684 final Object state,
685 final Timeout requestTimeout,
686 final BasicFuture<PoolEntry<T, C>> future) {
687 super();
688 this.route = route;
689 this.state = state;
690 this.deadline = Deadline.calculate(requestTimeout);
691 this.future = future;
692 this.completed = new AtomicBoolean(false);
693 }
694
695 public T getRoute() {
696 return this.route;
697 }
698
699 public Object getState() {
700 return this.state;
701 }
702
703 public Deadline getDeadline() {
704 return this.deadline;
705 }
706
707 public boolean isDone() {
708 return this.completed.get();
709 }
710
711 public void failed(final Exception ex) {
712 if (this.completed.compareAndSet(false, true)) {
713 this.ex = ex;
714 }
715 }
716
717 public void completed(final PoolEntry<T, C> result) {
718 if (this.completed.compareAndSet(false, true)) {
719 this.result = result;
720 }
721 }
722
723 public BasicFuture<PoolEntry<T, C>> getFuture() {
724 return this.future;
725 }
726
727 public PoolEntry<T, C> getResult() {
728 return this.result;
729 }
730
731 public Exception getException() {
732 return this.ex;
733 }
734
735 @Override
736 public String toString() {
737 final StringBuilder buffer = new StringBuilder();
738 buffer.append("[");
739 buffer.append(this.route);
740 buffer.append("][");
741 buffer.append(this.state);
742 buffer.append("]");
743 return buffer.toString();
744 }
745
746 }
747
748 static class PerRoutePool<T, C extends ModalCloseable> {
749
750 private final T route;
751 private final Set<PoolEntry<T, C>> leased;
752 private final LinkedList<PoolEntry<T, C>> available;
753 private final DisposalCallback<C> disposalCallback;
754
755 PerRoutePool(final T route, final DisposalCallback<C> disposalCallback) {
756 super();
757 this.route = route;
758 this.disposalCallback = disposalCallback;
759 this.leased = new HashSet<>();
760 this.available = new LinkedList<>();
761 }
762
763 public final T getRoute() {
764 return route;
765 }
766
767 public int getLeasedCount() {
768 return this.leased.size();
769 }
770
771 public int getAvailableCount() {
772 return this.available.size();
773 }
774
775 public int getAllocatedCount() {
776 return this.available.size() + this.leased.size();
777 }
778
779 public PoolEntry<T, C> getFree(final Object state) {
780 if (!this.available.isEmpty()) {
781 if (state != null) {
782 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
783 while (it.hasNext()) {
784 final PoolEntry<T, C> entry = it.next();
785 if (state.equals(entry.getState())) {
786 it.remove();
787 this.leased.add(entry);
788 return entry;
789 }
790 }
791 }
792 final Iterator<PoolEntry<T, C>> it = this.available.iterator();
793 while (it.hasNext()) {
794 final PoolEntry<T, C> entry = it.next();
795 if (entry.getState() == null) {
796 it.remove();
797 this.leased.add(entry);
798 return entry;
799 }
800 }
801 }
802 return null;
803 }
804
805 public PoolEntry<T, C> getLastUsed() {
806 return this.available.peekLast();
807 }
808
809 public boolean remove(final PoolEntry<T, C> entry) {
810 return this.available.remove(entry) || this.leased.remove(entry);
811 }
812
813 public void free(final PoolEntry<T, C> entry, final boolean reusable) {
814 final boolean found = this.leased.remove(entry);
815 Asserts.check(found, "Entry %s has not been leased from this pool", entry);
816 if (reusable) {
817 this.available.addFirst(entry);
818 }
819 }
820
821 public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
822 final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
823 this.leased.add(entry);
824 return entry;
825 }
826
827 public void shutdown(final CloseMode closeMode) {
828 PoolEntry<T, C> availableEntry;
829 while ((availableEntry = available.poll()) != null) {
830 availableEntry.discardConnection(closeMode);
831 }
832 for (final PoolEntry<T, C> entry: this.leased) {
833 entry.discardConnection(closeMode);
834 }
835 this.leased.clear();
836 }
837
838 @Override
839 public String toString() {
840 final StringBuilder buffer = new StringBuilder();
841 buffer.append("[route: ");
842 buffer.append(this.route);
843 buffer.append("][leased: ");
844 buffer.append(this.leased.size());
845 buffer.append("][available: ");
846 buffer.append(this.available.size());
847 buffer.append("]");
848 return buffer.toString();
849 }
850
851 }
852 }