1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.pool;
28
29 import java.util.Deque;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentLinkedDeque;
35 import java.util.concurrent.ConcurrentMap;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.concurrent.atomic.AtomicMarkableReference;
41
42 import org.apache.hc.core5.annotation.Contract;
43 import org.apache.hc.core5.annotation.Experimental;
44 import org.apache.hc.core5.annotation.ThreadingBehavior;
45 import org.apache.hc.core5.concurrent.BasicFuture;
46 import org.apache.hc.core5.concurrent.Cancellable;
47 import org.apache.hc.core5.concurrent.FutureCallback;
48 import org.apache.hc.core5.function.Callback;
49 import org.apache.hc.core5.io.CloseMode;
50 import org.apache.hc.core5.io.ModalCloseable;
51 import org.apache.hc.core5.util.Args;
52 import org.apache.hc.core5.util.Asserts;
53 import org.apache.hc.core5.util.Deadline;
54 import org.apache.hc.core5.util.DeadlineTimeoutException;
55 import org.apache.hc.core5.util.LangUtils;
56 import org.apache.hc.core5.util.TimeValue;
57 import org.apache.hc.core5.util.Timeout;
58
59
60
61
62
63
64
65
66
67 @Contract(threading = ThreadingBehavior.SAFE)
68 @Experimental
69 public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70
71 private final TimeValue timeToLive;
72 private final PoolReusePolicy policy;
73 private final DisposalCallback<C> disposalCallback;
74 private final ConnPoolListener<T> connPoolListener;
75 private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
76 private final AtomicBoolean isShutDown;
77
78 private volatile int defaultMaxPerRoute;
79
80
81
82
83 public LaxConnPool(
84 final int defaultMaxPerRoute,
85 final TimeValue timeToLive,
86 final PoolReusePolicy policy,
87 final DisposalCallback<C> disposalCallback,
88 final ConnPoolListener<T> connPoolListener) {
89 super();
90 Args.positive(defaultMaxPerRoute, "Max per route value");
91 this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
92 this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
93 this.disposalCallback = disposalCallback;
94 this.connPoolListener = connPoolListener;
95 this.routeToPool = new ConcurrentHashMap<>();
96 this.isShutDown = new AtomicBoolean(false);
97 this.defaultMaxPerRoute = defaultMaxPerRoute;
98 }
99
100
101
102
103 public LaxConnPool(
104 final int defaultMaxPerRoute,
105 final TimeValue timeToLive,
106 final PoolReusePolicy policy,
107 final ConnPoolListener<T> connPoolListener) {
108 this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
109 }
110
111 public LaxConnPool(final int defaultMaxPerRoute) {
112 this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
113 }
114
115 public boolean isShutdown() {
116 return isShutDown.get();
117 }
118
119 @Override
120 public void close(final CloseMode closeMode) {
121 if (isShutDown.compareAndSet(false, true)) {
122 for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
123 final PerRoutePool<T, C> routePool = it.next();
124 routePool.shutdown(closeMode);
125 }
126 routeToPool.clear();
127 }
128 }
129
130 @Override
131 public void close() {
132 close(CloseMode.GRACEFUL);
133 }
134
135 private PerRoutePool<T, C> getPool(final T route) {
136 PerRoutePool<T, C> routePool = routeToPool.get(route);
137 if (routePool == null) {
138 final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
139 route,
140 defaultMaxPerRoute,
141 timeToLive,
142 policy,
143 this,
144 disposalCallback,
145 connPoolListener);
146 routePool = routeToPool.putIfAbsent(route, newRoutePool);
147 if (routePool == null) {
148 routePool = newRoutePool;
149 }
150 }
151 return routePool;
152 }
153
154 @Override
155 public Future<PoolEntry<T, C>> lease(
156 final T route, final Object state,
157 final Timeout requestTimeout,
158 final FutureCallback<PoolEntry<T, C>> callback) {
159 Args.notNull(route, "Route");
160 Asserts.check(!isShutDown.get(), "Connection pool shut down");
161 final PerRoutePool<T, C> routePool = getPool(route);
162 return routePool.lease(state, requestTimeout, callback);
163 }
164
165 public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
166 return lease(route, state, Timeout.DISABLED, null);
167 }
168
169 @Override
170 public void release(final PoolEntry<T, C> entry, final boolean reusable) {
171 if (entry == null) {
172 return;
173 }
174 if (isShutDown.get()) {
175 return;
176 }
177 final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
178 routePool.release(entry, reusable);
179 }
180
181 public void validatePendingRequests() {
182 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
183 routePool.validatePendingRequests();
184 }
185 }
186
187 @Override
188 public void setMaxTotal(final int max) {
189 }
190
191 @Override
192 public int getMaxTotal() {
193 return 0;
194 }
195
196 @Override
197 public void setDefaultMaxPerRoute(final int max) {
198 Args.positive(max, "Max value");
199 defaultMaxPerRoute = max;
200 }
201
202 @Override
203 public int getDefaultMaxPerRoute() {
204 return defaultMaxPerRoute;
205 }
206
207 @Override
208 public void setMaxPerRoute(final T route, final int max) {
209 Args.notNull(route, "Route");
210 final PerRoutePool<T, C> routePool = getPool(route);
211 routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
212 }
213
214 @Override
215 public int getMaxPerRoute(final T route) {
216 Args.notNull(route, "Route");
217 final PerRoutePool<T, C> routePool = getPool(route);
218 return routePool.getMax();
219 }
220
221 @Override
222 public PoolStats getTotalStats() {
223 int leasedTotal = 0;
224 int pendingTotal = 0;
225 int availableTotal = 0;
226 int maxTotal = 0;
227 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
228 leasedTotal += routePool.getLeasedCount();
229 pendingTotal += routePool.getPendingCount();
230 availableTotal += routePool.getAvailableCount();
231 maxTotal += routePool.getMax();
232 }
233 return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
234 }
235
236 @Override
237 public PoolStats getStats(final T route) {
238 Args.notNull(route, "Route");
239 final PerRoutePool<T, C> routePool = getPool(route);
240 return new PoolStats(
241 routePool.getLeasedCount(),
242 routePool.getPendingCount(),
243 routePool.getAvailableCount(),
244 routePool.getMax());
245 }
246
247 @Override
248 public Set<T> getRoutes() {
249 return new HashSet<>(routeToPool.keySet());
250 }
251
252 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
253 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
254 routePool.enumAvailable(callback);
255 }
256 }
257
258 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
259 for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
260 routePool.enumLeased(callback);
261 }
262 }
263
264 @Override
265 public void closeIdle(final TimeValue idleTime) {
266 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
267 enumAvailable(new Callback<PoolEntry<T, C>>() {
268
269 @Override
270 public void execute(final PoolEntry<T, C> entry) {
271 if (entry.getUpdated() <= deadline) {
272 entry.discardConnection(CloseMode.GRACEFUL);
273 }
274 }
275
276 });
277 }
278
279 @Override
280 public void closeExpired() {
281 final long now = System.currentTimeMillis();
282 enumAvailable(new Callback<PoolEntry<T, C>>() {
283
284 @Override
285 public void execute(final PoolEntry<T, C> entry) {
286 if (entry.getExpiryDeadline().isBefore(now)) {
287 entry.discardConnection(CloseMode.GRACEFUL);
288 }
289 }
290
291 });
292 }
293
294 @Override
295 public String toString() {
296 final PoolStats totalStats = getTotalStats();
297 final StringBuilder buffer = new StringBuilder();
298 buffer.append("[leased: ");
299 buffer.append(totalStats.getLeased());
300 buffer.append("][available: ");
301 buffer.append(totalStats.getAvailable());
302 buffer.append("][pending: ");
303 buffer.append(totalStats.getPending());
304 buffer.append("]");
305 return buffer.toString();
306 }
307
308 static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
309
310 private final Object state;
311 private final Deadline deadline;
312 private final BasicFuture<PoolEntry<T, C>> future;
313
314 LeaseRequest(
315 final Object state,
316 final Timeout requestTimeout,
317 final BasicFuture<PoolEntry<T, C>> future) {
318 super();
319 this.state = state;
320 this.deadline = Deadline.calculate(requestTimeout);
321 this.future = future;
322 }
323
324 BasicFuture<PoolEntry<T, C>> getFuture() {
325 return this.future;
326 }
327
328 public Object getState() {
329 return this.state;
330 }
331
332 public Deadline getDeadline() {
333 return this.deadline;
334 }
335
336 public boolean isDone() {
337 return this.future.isDone();
338 }
339
340 public boolean completed(final PoolEntry<T, C> result) {
341 return future.completed(result);
342 }
343
344 public boolean failed(final Exception ex) {
345 return future.failed(ex);
346 }
347
348 @Override
349 public boolean cancel() {
350 return future.cancel();
351 }
352
353 }
354
355 static class PerRoutePool<T, C extends ModalCloseable> {
356
357 private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
358
359 private final T route;
360 private final TimeValue timeToLive;
361 private final PoolReusePolicy policy;
362 private final DisposalCallback<C> disposalCallback;
363 private final ConnPoolListener<T> connPoolListener;
364 private final ConnPoolStats<T> connPoolStats;
365 private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
366 private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
367 private final Deque<LeaseRequest<T, C>> pending;
368 private final AtomicBoolean terminated;
369 private final AtomicInteger allocated;
370 private final AtomicLong releaseSeqNum;
371
372 private volatile int max;
373
374 PerRoutePool(
375 final T route,
376 final int max,
377 final TimeValue timeToLive,
378 final PoolReusePolicy policy,
379 final ConnPoolStats<T> connPoolStats,
380 final DisposalCallback<C> disposalCallback,
381 final ConnPoolListener<T> connPoolListener) {
382 super();
383 this.route = route;
384 this.timeToLive = timeToLive;
385 this.policy = policy;
386 this.connPoolStats = connPoolStats;
387 this.disposalCallback = disposalCallback;
388 this.connPoolListener = connPoolListener;
389 this.leased = new ConcurrentHashMap<>();
390 this.available = new ConcurrentLinkedDeque<>();
391 this.pending = new ConcurrentLinkedDeque<>();
392 this.terminated = new AtomicBoolean(false);
393 this.allocated = new AtomicInteger(0);
394 this.releaseSeqNum = new AtomicLong(0);
395 this.max = max;
396 }
397
398 public void shutdown(final CloseMode closeMode) {
399 if (terminated.compareAndSet(false, true)) {
400 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
401 while ((entryRef = available.poll()) != null) {
402 entryRef.getReference().discardConnection(closeMode);
403 }
404 for (final PoolEntry<T, C> entry : leased.keySet()) {
405 entry.discardConnection(closeMode);
406 }
407 leased.clear();
408 LeaseRequest<T, C> leaseRequest;
409 while ((leaseRequest = pending.poll()) != null) {
410 leaseRequest.cancel();
411 }
412 }
413 }
414
415 private PoolEntry<T, C> createPoolEntry() {
416 final int poolmax = max;
417 int prev, next;
418 do {
419 prev = allocated.get();
420 next = (prev<poolmax)? prev+1 : prev;
421 } while (!allocated.compareAndSet(prev, next));
422 return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
423 }
424
425 private void deallocatePoolEntry() {
426 allocated.decrementAndGet();
427 }
428
429 private void addLeased(final PoolEntry<T, C> entry) {
430 if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
431 throw new IllegalStateException("Pool entry already present in the set of leased entries");
432 } else if (connPoolListener != null) {
433 connPoolListener.onLease(route, connPoolStats);
434 }
435 }
436
437 private void removeLeased(final PoolEntry<T, C> entry) {
438 if (connPoolListener != null) {
439 connPoolListener.onRelease(route, connPoolStats);
440 }
441 if (!leased.remove(entry, Boolean.TRUE)) {
442 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
443 }
444 }
445
446 private PoolEntry<T, C> getAvailableEntry(final Object state) {
447 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
448 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
449 final PoolEntry<T, C> entry = ref.getReference();
450 if (ref.compareAndSet(entry, entry, false, true)) {
451 it.remove();
452 if (entry.getExpiryDeadline().isExpired()) {
453 entry.discardConnection(CloseMode.GRACEFUL);
454 }
455 if (!LangUtils.equals(entry.getState(), state)) {
456 entry.discardConnection(CloseMode.GRACEFUL);
457 }
458 return entry;
459 }
460 }
461 return null;
462 }
463
464 public Future<PoolEntry<T, C>> lease(
465 final Object state,
466 final Timeout requestTimeout,
467 final FutureCallback<PoolEntry<T, C>> callback) {
468 Asserts.check(!terminated.get(), "Connection pool shut down");
469 final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
470 final long releaseState = releaseSeqNum.get();
471 PoolEntry<T, C> entry = null;
472 if (pending.isEmpty()) {
473 entry = getAvailableEntry(state);
474 if (entry == null) {
475 entry = createPoolEntry();
476 }
477 }
478 if (entry != null) {
479 addLeased(entry);
480 future.completed(entry);
481 } else {
482 pending.add(new LeaseRequest<>(state, requestTimeout, future));
483 if (releaseState != releaseSeqNum.get()) {
484 servicePendingRequest();
485 }
486 }
487 return future;
488 }
489
490 public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
491 removeLeased(releasedEntry);
492 if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
493 releasedEntry.discardConnection(CloseMode.GRACEFUL);
494 }
495 if (releasedEntry.hasConnection()) {
496 switch (policy) {
497 case LIFO:
498 available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
499 break;
500 case FIFO:
501 available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
502 break;
503 default:
504 throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
505 }
506 }
507 else {
508 deallocatePoolEntry();
509 }
510 releaseSeqNum.incrementAndGet();
511 servicePendingRequest();
512 }
513
514
515 private void servicePendingRequest() {
516 servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
517 }
518
519 private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
520 LeaseRequest<T, C> leaseRequest;
521 while ((leaseRequest = pending.poll()) != null) {
522 if (leaseRequest.isDone()) {
523 continue;
524 }
525 final Object state = leaseRequest.getState();
526 final Deadline deadline = leaseRequest.getDeadline();
527
528 if (deadline.isExpired()) {
529 leaseRequest.failed(DeadlineTimeoutException.from(deadline));
530 } else {
531 final long releaseState = releaseSeqNum.get();
532 PoolEntry<T, C> entry = getAvailableEntry(state);
533 if (entry == null) {
534 entry = createPoolEntry();
535 }
536 if (entry != null) {
537 addLeased(entry);
538 if (!leaseRequest.completed(entry)) {
539 release(entry, true);
540 }
541 if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
542 break;
543 }
544 }
545 else {
546 pending.addFirst(leaseRequest);
547 if (releaseState == releaseSeqNum.get()) {
548 break;
549 }
550 }
551 }
552 }
553 }
554
555 public void validatePendingRequests() {
556 final Iterator<LeaseRequest<T, C>> it = pending.iterator();
557 while (it.hasNext()) {
558 final LeaseRequest<T, C> request = it.next();
559 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
560 if (future.isCancelled() && !request.isDone()) {
561 it.remove();
562 } else {
563 final Deadline deadline = request.getDeadline();
564 if (deadline.isExpired()) {
565 request.failed(DeadlineTimeoutException.from(deadline));
566 }
567 if (request.isDone()) {
568 it.remove();
569 }
570 }
571 }
572 }
573
574 public final T getRoute() {
575 return route;
576 }
577
578 public int getMax() {
579 return max;
580 }
581
582 public void setMax(final int max) {
583 this.max = max;
584 }
585
586 public int getPendingCount() {
587 return pending.size();
588 }
589
590 public int getLeasedCount() {
591 return leased.size();
592 }
593
594 public int getAvailableCount() {
595 return available.size();
596 }
597
598 public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
599 for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
600 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
601 final PoolEntry<T, C> entry = ref.getReference();
602 if (ref.compareAndSet(entry, entry, false, true)) {
603 callback.execute(entry);
604 if (!entry.hasConnection()) {
605 deallocatePoolEntry();
606 it.remove();
607 }
608 else {
609 ref.set(entry, false);
610 }
611 }
612 }
613 releaseSeqNum.incrementAndGet();
614 servicePendingRequests(RequestServiceStrategy.ALL);
615 }
616
617 public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
618 for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
619 final PoolEntry<T, C> entry = it.next();
620 callback.execute(entry);
621 if (!entry.hasConnection()) {
622 deallocatePoolEntry();
623 it.remove();
624 }
625 }
626 }
627
628 @Override
629 public String toString() {
630 final StringBuilder buffer = new StringBuilder();
631 buffer.append("[route: ");
632 buffer.append(route);
633 buffer.append("][leased: ");
634 buffer.append(leased.size());
635 buffer.append("][available: ");
636 buffer.append(available.size());
637 buffer.append("][pending: ");
638 buffer.append(pending.size());
639 buffer.append("]");
640 return buffer.toString();
641 }
642
643 }
644
645 }