1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.Deque;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.concurrent.atomic.AtomicLong;
43 import java.util.concurrent.atomic.AtomicMarkableReference;
44
45 import org.apache.hc.core5.annotation.Contract;
46 import org.apache.hc.core5.annotation.Experimental;
47 import org.apache.hc.core5.annotation.ThreadingBehavior;
48 import org.apache.hc.core5.concurrent.BasicFuture;
49 import org.apache.hc.core5.concurrent.Cancellable;
50 import org.apache.hc.core5.concurrent.FutureCallback;
51 import org.apache.hc.core5.function.Callback;
52 import org.apache.hc.core5.io.CloseMode;
53 import org.apache.hc.core5.io.ModalCloseable;
54 import org.apache.hc.core5.util.Args;
55 import org.apache.hc.core5.util.Asserts;
56 import org.apache.hc.core5.util.Deadline;
57 import org.apache.hc.core5.util.DeadlineTimeoutException;
58 import org.apache.hc.core5.util.LangUtils;
59 import org.apache.hc.core5.util.TimeValue;
60 import org.apache.hc.core5.util.Timeout;
61
62
63
64
65
66
67
68
69
70 @Contract(threading = ThreadingBehavior.SAFE)
71 @Experimental
72 public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
73
74 private final TimeValue timeToLive;
75 private final PoolReusePolicy policy;
76 private final DisposalCallback<C> disposalCallback;
77 private final ConnPoolListener<T> connPoolListener;
78 private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
79 private final AtomicBoolean isShutDown;
80
81 private volatile int defaultMaxPerRoute;
82
83
84
85
86 public LaxConnPool(
87 final int defaultMaxPerRoute,
88 final TimeValue timeToLive,
89 final PoolReusePolicy policy,
90 final DisposalCallback<C> disposalCallback,
91 final ConnPoolListener<T> connPoolListener) {
92 super();
93 Args.positive(defaultMaxPerRoute, "Max per route value");
94 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
95 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
96 this.disposalCallback = disposalCallback;
97 this.connPoolListener = connPoolListener;
98 this.routeToPool = new ConcurrentHashMap<>();
99 this.isShutDown = new AtomicBoolean(false);
100 this.defaultMaxPerRoute = defaultMaxPerRoute;
101 }
102
103
104
105
106 public LaxConnPool(
107 final int defaultMaxPerRoute,
108 final TimeValue timeToLive,
109 final PoolReusePolicy policy,
110 final ConnPoolListener<T> connPoolListener) {
111 this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
112 }
113
114 public LaxConnPool(final int defaultMaxPerRoute) {
115 this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
116 }
117
118 public boolean isShutdown() {
119 return isShutDown.get();
120 }
121
122 @Override
123 public void close(final CloseMode closeMode) {
124 if (isShutDown.compareAndSet(false, true)) {
125 for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
126 final PerRoutePool<T, C> routePool = it.next();
127 routePool.shutdown(closeMode);
128 }
129 routeToPool.clear();
130 }
131 }
132
133 @Override
134 public void close() {
135 close(CloseMode.GRACEFUL);
136 }
137
138 private PerRoutePool<T, C> getPool(final T route) {
139 PerRoutePool<T, C> routePool = routeToPool.get(route);
140 if (routePool == null) {
141 final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
142 route,
143 defaultMaxPerRoute,
144 timeToLive,
145 policy,
146 this,
147 disposalCallback,
148 connPoolListener);
149 routePool = routeToPool.putIfAbsent(route, newRoutePool);
150 if (routePool == null) {
151 routePool = newRoutePool;
152 }
153 }
154 return routePool;
155 }
156
157 @Override
158 public Future<PoolEntry<T, C>> lease(
159 final T route, final Object state,
160 final Timeout requestTimeout,
161 final FutureCallback<PoolEntry<T, C>> callback) {
162 Args.notNull(route, "Route");
163 Asserts.check(!isShutDown.get(), "Connection pool shut down");
164 final PerRoutePool<T, C> routePool = getPool(route);
165 return routePool.lease(state, requestTimeout, callback);
166 }
167
168 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
169 return lease(route, state, Timeout.DISABLED, null);
170 }
171
172 @Override
173 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
174 if (entry == null) {
175 return;
176 }
177 if (isShutDown.get()) {
178 return;
179 }
180 final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
181 routePool.release(entry, reusable);
182 }
183
184 public void validatePendingRequests() {
185 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
186 routePool.validatePendingRequests();
187 }
188 }
189
190 @Override
191 public void setMaxTotal(final int max) {
192 }
193
194 @Override
195 public int getMaxTotal() {
196 return 0;
197 }
198
199 @Override
200 public void setDefaultMaxPerRoute(final int max) {
201 Args.positive(max, "Max value");
202 defaultMaxPerRoute = max;
203 }
204
205 @Override
206 public int getDefaultMaxPerRoute() {
207 return defaultMaxPerRoute;
208 }
209
210 @Override
211 public void setMaxPerRoute(final T route, final int max) {
212 Args.notNull(route, "Route");
213 final PerRoutePool<T, C> routePool = getPool(route);
214 routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
215 }
216
217 @Override
218 public int getMaxPerRoute(final T route) {
219 Args.notNull(route, "Route");
220 final PerRoutePool<T, C> routePool = getPool(route);
221 return routePool.getMax();
222 }
223
224 @Override
225 public PoolStats getTotalStats() {
226 int leasedTotal = 0;
227 int pendingTotal = 0;
228 int availableTotal = 0;
229 int maxTotal = 0;
230 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
231 leasedTotal += routePool.getLeasedCount();
232 pendingTotal += routePool.getPendingCount();
233 availableTotal += routePool.getAvailableCount();
234 maxTotal += routePool.getMax();
235 }
236 return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
237 }
238
239 @Override
240 public PoolStats getStats(final T route) {
241 Args.notNull(route, "Route");
242 final PerRoutePool<T, C> routePool = getPool(route);
243 return new PoolStats(
244 routePool.getLeasedCount(),
245 routePool.getPendingCount(),
246 routePool.getAvailableCount(),
247 routePool.getMax());
248 }
249
250 @Override
251 public Set<T> getRoutes() {
252 return new HashSet<>(routeToPool.keySet());
253 }
254
255 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
256 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
257 routePool.enumAvailable(callback);
258 }
259 }
260
261 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
262 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
263 routePool.enumLeased(callback);
264 }
265 }
266
267 @Override
268 public void closeIdle(final TimeValue idleTime) {
269 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
270 enumAvailable(new Callback<PoolEntry<T, C>>() {
271
272 @Override
273 public void execute(final PoolEntry<T, C> entry) {
274 if (entry.getUpdated() <= deadline) {
275 entry.discardConnection(CloseMode.GRACEFUL);
276 }
277 }
278
279 });
280 }
281
282 @Override
283 public void closeExpired() {
284 final long now = System.currentTimeMillis();
285 enumAvailable(new Callback<PoolEntry<T, C>>() {
286
287 @Override
288 public void execute(final PoolEntry<T, C> entry) {
289 if (entry.getExpiryDeadline().isBefore(now)) {
290 entry.discardConnection(CloseMode.GRACEFUL);
291 }
292 }
293
294 });
295 }
296
297 @Override
298 public String toString() {
299 final PoolStats totalStats = getTotalStats();
300 final StringBuilder buffer = new StringBuilder();
301 buffer.append("[leased: ");
302 buffer.append(totalStats.getLeased());
303 buffer.append("][available: ");
304 buffer.append(totalStats.getAvailable());
305 buffer.append("][pending: ");
306 buffer.append(totalStats.getPending());
307 buffer.append("]");
308 return buffer.toString();
309 }
310
311 static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
312
313 private final Object state;
314 private final Deadline deadline;
315 private final BasicFuture<PoolEntry<T, C>> future;
316
317 LeaseRequest(
318 final Object state,
319 final Timeout requestTimeout,
320 final BasicFuture<PoolEntry<T, C>> future) {
321 super();
322 this.state = state;
323 this.deadline = Deadline.calculate(requestTimeout);
324 this.future = future;
325 }
326
327 BasicFuture<PoolEntry<T, C>> getFuture() {
328 return this.future;
329 }
330
331 public Object getState() {
332 return this.state;
333 }
334
335 public Deadline getDeadline() {
336 return this.deadline;
337 }
338
339 public boolean isDone() {
340 return this.future.isDone();
341 }
342
343 public boolean completed(final PoolEntry<T, C> result) {
344 return future.completed(result);
345 }
346
347 public boolean failed(final Exception ex) {
348 return future.failed(ex);
349 }
350
351 @Override
352 public boolean cancel() {
353 return future.cancel();
354 }
355
356 }
357
358 static class PerRoutePool<T, C extends ModalCloseable> {
359
360 private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
361
362 private final T route;
363 private final TimeValue timeToLive;
364 private final PoolReusePolicy policy;
365 private final DisposalCallback<C> disposalCallback;
366 private final ConnPoolListener<T> connPoolListener;
367 private final ConnPoolStats<T> connPoolStats;
368 private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
369 private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
370 private final Deque<LeaseRequest<T, C>> pending;
371 private final AtomicBoolean terminated;
372 private final AtomicInteger allocated;
373 private final AtomicLong releaseSeqNum;
374
375 private volatile int max;
376
377 PerRoutePool(
378 final T route,
379 final int max,
380 final TimeValue timeToLive,
381 final PoolReusePolicy policy,
382 final ConnPoolStats<T> connPoolStats,
383 final DisposalCallback<C> disposalCallback,
384 final ConnPoolListener<T> connPoolListener) {
385 super();
386 this.route = route;
387 this.timeToLive = timeToLive;
388 this.policy = policy;
389 this.connPoolStats = connPoolStats;
390 this.disposalCallback = disposalCallback;
391 this.connPoolListener = connPoolListener;
392 this.leased = new ConcurrentHashMap<>();
393 this.available = new ConcurrentLinkedDeque<>();
394 this.pending = new ConcurrentLinkedDeque<>();
395 this.terminated = new AtomicBoolean(false);
396 this.allocated = new AtomicInteger(0);
397 this.releaseSeqNum = new AtomicLong(0);
398 this.max = max;
399 }
400
401 public void shutdown(final CloseMode closeMode) {
402 if (terminated.compareAndSet(false, true)) {
403 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
404 while ((entryRef = available.poll()) != null) {
405 entryRef.getReference().discardConnection(closeMode);
406 }
407 for (final PoolEntry<T, C> entry : leased.keySet()) {
408 entry.discardConnection(closeMode);
409 }
410 leased.clear();
411 LeaseRequest<T, C> leaseRequest;
412 while ((leaseRequest = pending.poll()) != null) {
413 leaseRequest.cancel();
414 }
415 }
416 }
417
418 private PoolEntry<T, C> createPoolEntry() {
419 final int poolmax = max;
420 int prev, next;
421 do {
422 prev = allocated.get();
423 next = (prev<poolmax)? prev+1 : prev;
424 } while (!allocated.compareAndSet(prev, next));
425 return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
426 }
427
428 private void deallocatePoolEntry() {
429 allocated.decrementAndGet();
430 }
431
432 private void addLeased(final PoolEntry<T, C> entry) {
433 if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
434 throw new IllegalStateException("Pool entry already present in the set of leased entries");
435 } else if (connPoolListener != null) {
436 connPoolListener.onLease(route, connPoolStats);
437 }
438 }
439
440 private void removeLeased(final PoolEntry<T, C> entry) {
441 if (connPoolListener != null) {
442 connPoolListener.onRelease(route, connPoolStats);
443 }
444 if (!leased.remove(entry, Boolean.TRUE)) {
445 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
446 }
447 }
448
449 private PoolEntry<T, C> getAvailableEntry(final Object state) {
450 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
451 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
452 final PoolEntry<T, C> entry = ref.getReference();
453 if (ref.compareAndSet(entry, entry, false, true)) {
454 it.remove();
455 if (entry.getExpiryDeadline().isExpired()) {
456 entry.discardConnection(CloseMode.GRACEFUL);
457 }
458 if (!LangUtils.equals(entry.getState(), state)) {
459 entry.discardConnection(CloseMode.GRACEFUL);
460 }
461 return entry;
462 }
463 }
464 return null;
465 }
466
467 public Future<PoolEntry<T, C>> lease(
468 final Object state,
469 final Timeout requestTimeout,
470 final FutureCallback<PoolEntry<T, C>> callback) {
471 Asserts.check(!terminated.get(), "Connection pool shut down");
472 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
473
474 @Override
475 public synchronized PoolEntry<T, C> get(
476 final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
477 try {
478 return super.get(timeout, unit);
479 } catch (final TimeoutException ex) {
480 cancel();
481 throw ex;
482 }
483 }
484
485 };
486 final long releaseState = releaseSeqNum.get();
487 PoolEntry<T, C> entry = null;
488 if (pending.isEmpty()) {
489 entry = getAvailableEntry(state);
490 if (entry == null) {
491 entry = createPoolEntry();
492 }
493 }
494 if (entry != null) {
495 addLeased(entry);
496 future.completed(entry);
497 } else {
498 pending.add(new LeaseRequest<>(state, requestTimeout, future));
499 if (releaseState != releaseSeqNum.get()) {
500 servicePendingRequest();
501 }
502 }
503 return future;
504 }
505
506 public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
507 removeLeased(releasedEntry);
508 if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
509 releasedEntry.discardConnection(CloseMode.GRACEFUL);
510 }
511 if (releasedEntry.hasConnection()) {
512 switch (policy) {
513 case LIFO:
514 available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
515 break;
516 case FIFO:
517 available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
518 break;
519 default:
520 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
521 }
522 }
523 else {
524 deallocatePoolEntry();
525 }
526 releaseSeqNum.incrementAndGet();
527 servicePendingRequest();
528 }
529
530
531 private void servicePendingRequest() {
532 servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
533 }
534
535 private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
536 LeaseRequest<T, C> leaseRequest;
537 while ((leaseRequest = pending.poll()) != null) {
538 if (leaseRequest.isDone()) {
539 continue;
540 }
541 final Object state = leaseRequest.getState();
542 final Deadline deadline = leaseRequest.getDeadline();
543
544 if (deadline.isExpired()) {
545 leaseRequest.failed(DeadlineTimeoutException.from(deadline));
546 } else {
547 final long releaseState = releaseSeqNum.get();
548 PoolEntry<T, C> entry = getAvailableEntry(state);
549 if (entry == null) {
550 entry = createPoolEntry();
551 }
552 if (entry != null) {
553 addLeased(entry);
554 if (!leaseRequest.completed(entry)) {
555 release(entry, true);
556 }
557 if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
558 break;
559 }
560 }
561 else {
562 pending.addFirst(leaseRequest);
563 if (releaseState == releaseSeqNum.get()) {
564 break;
565 }
566 }
567 }
568 }
569 }
570
571 public void validatePendingRequests() {
572 final Iterator<LeaseRequest<T, C>> it = pending.iterator();
573 while (it.hasNext()) {
574 final LeaseRequest<T, C> request = it.next();
575 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
576 if (future.isCancelled() && !request.isDone()) {
577 it.remove();
578 } else {
579 final Deadline deadline = request.getDeadline();
580 if (deadline.isExpired()) {
581 request.failed(DeadlineTimeoutException.from(deadline));
582 }
583 if (request.isDone()) {
584 it.remove();
585 }
586 }
587 }
588 }
589
590 public final T getRoute() {
591 return route;
592 }
593
594 public int getMax() {
595 return max;
596 }
597
598 public void setMax(final int max) {
599 this.max = max;
600 }
601
602 public int getPendingCount() {
603 return pending.size();
604 }
605
606 public int getLeasedCount() {
607 return leased.size();
608 }
609
610 public int getAvailableCount() {
611 return available.size();
612 }
613
614 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
615 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
616 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
617 final PoolEntry<T, C> entry = ref.getReference();
618 if (ref.compareAndSet(entry, entry, false, true)) {
619 callback.execute(entry);
620 if (!entry.hasConnection()) {
621 deallocatePoolEntry();
622 it.remove();
623 }
624 else {
625 ref.set(entry, false);
626 }
627 }
628 }
629 releaseSeqNum.incrementAndGet();
630 servicePendingRequests(RequestServiceStrategy.ALL);
631 }
632
633 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
634 for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
635 final PoolEntry<T, C> entry = it.next();
636 callback.execute(entry);
637 if (!entry.hasConnection()) {
638 deallocatePoolEntry();
639 it.remove();
640 }
641 }
642 }
643
644 @Override
645 public String toString() {
646 final StringBuilder buffer = new StringBuilder();
647 buffer.append("[route: ");
648 buffer.append(route);
649 buffer.append("][leased: ");
650 buffer.append(leased.size());
651 buffer.append("][available: ");
652 buffer.append(available.size());
653 buffer.append("][pending: ");
654 buffer.append(pending.size());
655 buffer.append("]");
656 return buffer.toString();
657 }
658
659 }
660
661 }