1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.Deque;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.Objects;
33 import java.util.Set;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.ConcurrentLinkedDeque;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.atomic.AtomicLong;
44 import java.util.concurrent.atomic.AtomicMarkableReference;
45 import java.util.concurrent.locks.ReentrantLock;
46
47 import org.apache.hc.core5.annotation.Contract;
48 import org.apache.hc.core5.annotation.Experimental;
49 import org.apache.hc.core5.annotation.ThreadingBehavior;
50 import org.apache.hc.core5.concurrent.BasicFuture;
51 import org.apache.hc.core5.concurrent.Cancellable;
52 import org.apache.hc.core5.concurrent.FutureCallback;
53 import org.apache.hc.core5.function.Callback;
54 import org.apache.hc.core5.io.CloseMode;
55 import org.apache.hc.core5.io.ModalCloseable;
56 import org.apache.hc.core5.util.Args;
57 import org.apache.hc.core5.util.Asserts;
58 import org.apache.hc.core5.util.Deadline;
59 import org.apache.hc.core5.util.DeadlineTimeoutException;
60 import org.apache.hc.core5.util.TimeValue;
61 import org.apache.hc.core5.util.Timeout;
62
63
64
65
66
67
68
69
70
71 @Contract(threading = ThreadingBehavior.SAFE)
72 @Experimental
73 public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
74
75 private final TimeValue timeToLive;
76 private final PoolReusePolicy policy;
77 private final DisposalCallback<C> disposalCallback;
78 private final ConnPoolListener<T> connPoolListener;
79 private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
80 private final AtomicBoolean isShutDown;
81
82 private volatile int defaultMaxPerRoute;
83
84
85
86
87 public LaxConnPool(
88 final int defaultMaxPerRoute,
89 final TimeValue timeToLive,
90 final PoolReusePolicy policy,
91 final DisposalCallback<C> disposalCallback,
92 final ConnPoolListener<T> connPoolListener) {
93 super();
94 Args.positive(defaultMaxPerRoute, "Max per route value");
95 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
96 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
97 this.disposalCallback = disposalCallback;
98 this.connPoolListener = connPoolListener;
99 this.routeToPool = new ConcurrentHashMap<>();
100 this.isShutDown = new AtomicBoolean(false);
101 this.defaultMaxPerRoute = defaultMaxPerRoute;
102 }
103
104
105
106
107 public LaxConnPool(
108 final int defaultMaxPerRoute,
109 final TimeValue timeToLive,
110 final PoolReusePolicy policy,
111 final ConnPoolListener<T> connPoolListener) {
112 this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
113 }
114
115 public LaxConnPool(final int defaultMaxPerRoute) {
116 this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
117 }
118
119 public boolean isShutdown() {
120 return isShutDown.get();
121 }
122
123 @Override
124 public void close(final CloseMode closeMode) {
125 if (isShutDown.compareAndSet(false, true)) {
126 for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
127 final PerRoutePool<T, C> routePool = it.next();
128 routePool.shutdown(closeMode);
129 }
130 routeToPool.clear();
131 }
132 }
133
134 @Override
135 public void close() {
136 close(CloseMode.GRACEFUL);
137 }
138
139 private PerRoutePool<T, C> getPool(final T route) {
140 PerRoutePool<T, C> routePool = routeToPool.get(route);
141 if (routePool == null) {
142 final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
143 route,
144 defaultMaxPerRoute,
145 timeToLive,
146 policy,
147 this,
148 disposalCallback,
149 connPoolListener);
150 routePool = routeToPool.putIfAbsent(route, newRoutePool);
151 if (routePool == null) {
152 routePool = newRoutePool;
153 }
154 }
155 return routePool;
156 }
157
158 @Override
159 public Future<PoolEntry<T, C>> lease(
160 final T route, final Object state,
161 final Timeout requestTimeout,
162 final FutureCallback<PoolEntry<T, C>> callback) {
163 Args.notNull(route, "Route");
164 Asserts.check(!isShutDown.get(), "Connection pool shut down");
165 final PerRoutePool<T, C> routePool = getPool(route);
166 return routePool.lease(state, requestTimeout, callback);
167 }
168
169 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
170 return lease(route, state, Timeout.DISABLED, null);
171 }
172
173 @Override
174 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
175 if (entry == null) {
176 return;
177 }
178 if (isShutDown.get()) {
179 return;
180 }
181 final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
182 routePool.release(entry, reusable);
183 }
184
185 public void validatePendingRequests() {
186 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
187 routePool.validatePendingRequests();
188 }
189 }
190
191 @Override
192 public void setMaxTotal(final int max) {
193 }
194
195 @Override
196 public int getMaxTotal() {
197 return 0;
198 }
199
200 @Override
201 public void setDefaultMaxPerRoute(final int max) {
202 Args.positive(max, "Max value");
203 defaultMaxPerRoute = max;
204 }
205
206 @Override
207 public int getDefaultMaxPerRoute() {
208 return defaultMaxPerRoute;
209 }
210
211 @Override
212 public void setMaxPerRoute(final T route, final int max) {
213 Args.notNull(route, "Route");
214 final PerRoutePool<T, C> routePool = getPool(route);
215 routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
216 }
217
218 @Override
219 public int getMaxPerRoute(final T route) {
220 Args.notNull(route, "Route");
221 final PerRoutePool<T, C> routePool = getPool(route);
222 return routePool.getMax();
223 }
224
225 @Override
226 public PoolStats getTotalStats() {
227 int leasedTotal = 0;
228 int pendingTotal = 0;
229 int availableTotal = 0;
230 int maxTotal = 0;
231 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
232 leasedTotal += routePool.getLeasedCount();
233 pendingTotal += routePool.getPendingCount();
234 availableTotal += routePool.getAvailableCount();
235 maxTotal += routePool.getMax();
236 }
237 return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
238 }
239
240 @Override
241 public PoolStats getStats(final T route) {
242 Args.notNull(route, "Route");
243 final PerRoutePool<T, C> routePool = getPool(route);
244 return new PoolStats(
245 routePool.getLeasedCount(),
246 routePool.getPendingCount(),
247 routePool.getAvailableCount(),
248 routePool.getMax());
249 }
250
251 @Override
252 public Set<T> getRoutes() {
253 return new HashSet<>(routeToPool.keySet());
254 }
255
256 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
257 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
258 routePool.enumAvailable(callback);
259 }
260 }
261
262 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
263 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
264 routePool.enumLeased(callback);
265 }
266 }
267
268 @Override
269 public void closeIdle(final TimeValue idleTime) {
270 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
271 enumAvailable(entry -> {
272 if (entry.getUpdated() <= deadline) {
273 entry.discardConnection(CloseMode.GRACEFUL);
274 }
275 });
276 }
277
278 @Override
279 public void closeExpired() {
280 final long now = System.currentTimeMillis();
281 enumAvailable(entry -> {
282 if (entry.getExpiryDeadline().isBefore(now)) {
283 entry.discardConnection(CloseMode.GRACEFUL);
284 }
285 });
286 }
287
288 @Override
289 public String toString() {
290 final PoolStats totalStats = getTotalStats();
291 final StringBuilder buffer = new StringBuilder();
292 buffer.append("[leased: ");
293 buffer.append(totalStats.getLeased());
294 buffer.append("][available: ");
295 buffer.append(totalStats.getAvailable());
296 buffer.append("][pending: ");
297 buffer.append(totalStats.getPending());
298 buffer.append("]");
299 return buffer.toString();
300 }
301
302 static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
303
304 private final Object state;
305 private final Deadline deadline;
306 private final BasicFuture<PoolEntry<T, C>> future;
307
308 LeaseRequest(
309 final Object state,
310 final Timeout requestTimeout,
311 final BasicFuture<PoolEntry<T, C>> future) {
312 super();
313 this.state = state;
314 this.deadline = Deadline.calculate(requestTimeout);
315 this.future = future;
316 }
317
318 BasicFuture<PoolEntry<T, C>> getFuture() {
319 return this.future;
320 }
321
322 public Object getState() {
323 return this.state;
324 }
325
326 public Deadline getDeadline() {
327 return this.deadline;
328 }
329
330 public boolean isDone() {
331 return this.future.isDone();
332 }
333
334 public boolean completed(final PoolEntry<T, C> result) {
335 return future.completed(result);
336 }
337
338 public boolean failed(final Exception ex) {
339 return future.failed(ex);
340 }
341
342 @Override
343 public boolean cancel() {
344 return future.cancel();
345 }
346
347 }
348
349 static class PerRoutePool<T, C extends ModalCloseable> {
350
351 private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
352
353 private final T route;
354 private final TimeValue timeToLive;
355 private final PoolReusePolicy policy;
356 private final DisposalCallback<C> disposalCallback;
357 private final ConnPoolListener<T> connPoolListener;
358 private final ConnPoolStats<T> connPoolStats;
359 private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
360 private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
361 private final Deque<LeaseRequest<T, C>> pending;
362 private final AtomicBoolean terminated;
363 private final AtomicInteger allocated;
364 private final AtomicLong releaseSeqNum;
365
366 private final ReentrantLock lock;
367
368 private volatile int max;
369
370 PerRoutePool(
371 final T route,
372 final int max,
373 final TimeValue timeToLive,
374 final PoolReusePolicy policy,
375 final ConnPoolStats<T> connPoolStats,
376 final DisposalCallback<C> disposalCallback,
377 final ConnPoolListener<T> connPoolListener) {
378 super();
379 this.route = route;
380 this.timeToLive = timeToLive;
381 this.policy = policy;
382 this.connPoolStats = connPoolStats;
383 this.disposalCallback = disposalCallback;
384 this.connPoolListener = connPoolListener;
385 this.leased = new ConcurrentHashMap<>();
386 this.available = new ConcurrentLinkedDeque<>();
387 this.pending = new ConcurrentLinkedDeque<>();
388 this.terminated = new AtomicBoolean(false);
389 this.allocated = new AtomicInteger(0);
390 this.releaseSeqNum = new AtomicLong(0);
391 this.max = max;
392 this.lock = new ReentrantLock();
393 }
394
395 public void shutdown(final CloseMode closeMode) {
396 if (terminated.compareAndSet(false, true)) {
397 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
398 while ((entryRef = available.poll()) != null) {
399 entryRef.getReference().discardConnection(closeMode);
400 }
401 for (final PoolEntry<T, C> entry : leased.keySet()) {
402 entry.discardConnection(closeMode);
403 }
404 leased.clear();
405 LeaseRequest<T, C> leaseRequest;
406 while ((leaseRequest = pending.poll()) != null) {
407 leaseRequest.cancel();
408 }
409 }
410 }
411
412 private PoolEntry<T, C> createPoolEntry() {
413 final int poolMax = max;
414 int prev, next;
415 do {
416 prev = allocated.get();
417 next = (prev<poolMax)? prev+1 : prev;
418 } while (!allocated.compareAndSet(prev, next));
419 return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
420 }
421
422 private void deallocatePoolEntry() {
423 allocated.decrementAndGet();
424 }
425
426 private void addLeased(final PoolEntry<T, C> entry) {
427 if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
428 throw new IllegalStateException("Pool entry already present in the set of leased entries");
429 } else if (connPoolListener != null) {
430 connPoolListener.onLease(route, connPoolStats);
431 }
432 }
433
434 private void removeLeased(final PoolEntry<T, C> entry) {
435 if (connPoolListener != null) {
436 connPoolListener.onRelease(route, connPoolStats);
437 }
438 if (!leased.remove(entry, Boolean.TRUE)) {
439 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
440 }
441 }
442
443 private PoolEntry<T, C> getAvailableEntry(final Object state) {
444 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
445 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
446 final PoolEntry<T, C> entry = ref.getReference();
447 if (ref.compareAndSet(entry, entry, false, true)) {
448 it.remove();
449 if (entry.getExpiryDeadline().isExpired()) {
450 entry.discardConnection(CloseMode.GRACEFUL);
451 }
452 if (!Objects.equals(entry.getState(), state)) {
453 entry.discardConnection(CloseMode.GRACEFUL);
454 }
455 return entry;
456 }
457 }
458 return null;
459 }
460
461 public Future<PoolEntry<T, C>> lease(
462 final Object state,
463 final Timeout requestTimeout,
464 final FutureCallback<PoolEntry<T, C>> callback) {
465 Asserts.check(!terminated.get(), "Connection pool shut down");
466 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
467
468 @Override
469 public PoolEntry<T, C> get(
470 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
471 try {
472 return super.get(timeout, unit);
473 } catch (final TimeoutException ex) {
474 cancel();
475 throw ex;
476 }
477 }
478
479 };
480 final long releaseState = releaseSeqNum.get();
481 PoolEntry<T, C> entry = null;
482 if (pending.isEmpty()) {
483 entry = getAvailableEntry(state);
484 if (entry == null) {
485 entry = createPoolEntry();
486 }
487 }
488 if (entry != null) {
489 addLeased(entry);
490 future.completed(entry);
491 } else {
492 pending.add(new LeaseRequest<>(state, requestTimeout, future));
493 if (releaseState != releaseSeqNum.get()) {
494 servicePendingRequest();
495 }
496 }
497 return future;
498 }
499
500 public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
501 removeLeased(releasedEntry);
502 if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
503 releasedEntry.discardConnection(CloseMode.GRACEFUL);
504 }
505 if (releasedEntry.hasConnection()) {
506 switch (policy) {
507 case LIFO:
508 available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
509 break;
510 case FIFO:
511 available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
512 break;
513 default:
514 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
515 }
516 }
517 else {
518 deallocatePoolEntry();
519 }
520 releaseSeqNum.incrementAndGet();
521 servicePendingRequest();
522 }
523
524
525 private void servicePendingRequest() {
526 servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
527 }
528
529 private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
530 LeaseRequest<T, C> leaseRequest;
531 while ((leaseRequest = pending.poll()) != null) {
532 if (leaseRequest.isDone()) {
533 continue;
534 }
535 final Object state = leaseRequest.getState();
536 final Deadline deadline = leaseRequest.getDeadline();
537
538 if (deadline.isExpired()) {
539 leaseRequest.failed(DeadlineTimeoutException.from(deadline));
540 } else {
541 final long releaseState = releaseSeqNum.get();
542 PoolEntry<T, C> entry = getAvailableEntry(state);
543 if (entry == null) {
544 entry = createPoolEntry();
545 }
546 if (entry != null) {
547 addLeased(entry);
548 if (!leaseRequest.completed(entry)) {
549 release(entry, true);
550 }
551 if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
552 break;
553 }
554 }
555 else {
556 pending.addFirst(leaseRequest);
557 if (releaseState == releaseSeqNum.get()) {
558 break;
559 }
560 }
561 }
562 }
563 }
564
565 public void validatePendingRequests() {
566 final Iterator<LeaseRequest<T, C>> it = pending.iterator();
567 while (it.hasNext()) {
568 final LeaseRequest<T, C> request = it.next();
569 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
570 if (future.isCancelled() && !request.isDone()) {
571 it.remove();
572 } else {
573 final Deadline deadline = request.getDeadline();
574 if (deadline.isExpired()) {
575 request.failed(DeadlineTimeoutException.from(deadline));
576 }
577 if (request.isDone()) {
578 it.remove();
579 }
580 }
581 }
582 }
583
584 public final T getRoute() {
585 return route;
586 }
587
588 public int getMax() {
589 return max;
590 }
591
592 public void setMax(final int max) {
593 this.max = max;
594 }
595
596 public int getPendingCount() {
597 return pending.size();
598 }
599
600 public int getLeasedCount() {
601 return leased.size();
602 }
603
604 public int getAvailableCount() {
605 return available.size();
606 }
607
608 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
609 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
610 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
611 final PoolEntry<T, C> entry = ref.getReference();
612 if (ref.compareAndSet(entry, entry, false, true)) {
613 callback.execute(entry);
614 if (!entry.hasConnection()) {
615 deallocatePoolEntry();
616 it.remove();
617 }
618 else {
619 ref.set(entry, false);
620 }
621 }
622 }
623 releaseSeqNum.incrementAndGet();
624 servicePendingRequests(RequestServiceStrategy.ALL);
625 }
626
627 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
628 for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
629 final PoolEntry<T, C> entry = it.next();
630 callback.execute(entry);
631 if (!entry.hasConnection()) {
632 deallocatePoolEntry();
633 it.remove();
634 }
635 }
636 }
637
638 @Override
639 public String toString() {
640 final StringBuilder buffer = new StringBuilder();
641 buffer.append("[route: ");
642 buffer.append(route);
643 buffer.append("][leased: ");
644 buffer.append(leased.size());
645 buffer.append("][available: ");
646 buffer.append(available.size());
647 buffer.append("][pending: ");
648 buffer.append(pending.size());
649 buffer.append("]");
650 return buffer.toString();
651 }
652
653 }
654
655 }