1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.pool;
28
29 import java.io.IOException;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.Iterator;
34 import java.util.LinkedList;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CancellationException;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Future;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.TimeoutException;
42 import java.util.concurrent.atomic.AtomicBoolean;
43 import java.util.concurrent.atomic.AtomicReference;
44 import java.util.concurrent.locks.Condition;
45 import java.util.concurrent.locks.Lock;
46 import java.util.concurrent.locks.ReentrantLock;
47
48 import org.apache.http.annotation.Contract;
49 import org.apache.http.annotation.ThreadingBehavior;
50 import org.apache.http.concurrent.FutureCallback;
51 import org.apache.http.util.Args;
52 import org.apache.http.util.Asserts;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
70 public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
71 implements ConnPool<T, E>, ConnPoolControl<T> {
72
73 private final Lock lock;
74 private final Condition condition;
75 private final ConnFactory<T, C> connFactory;
76 private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
77 private final Set<E> leased;
78 private final LinkedList<E> available;
79 private final LinkedList<Future<E>> pending;
80 private final Map<T, Integer> maxPerRoute;
81
82 private volatile boolean isShutDown;
83 private volatile int defaultMaxPerRoute;
84 private volatile int maxTotal;
85 private volatile int validateAfterInactivity;
86
87 public AbstractConnPool(
88 final ConnFactory<T, C> connFactory,
89 final int defaultMaxPerRoute,
90 final int maxTotal) {
91 super();
92 this.connFactory = Args.notNull(connFactory, "Connection factory");
93 this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
94 this.maxTotal = Args.positive(maxTotal, "Max total value");
95 this.lock = new ReentrantLock();
96 this.condition = this.lock.newCondition();
97 this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
98 this.leased = new HashSet<E>();
99 this.available = new LinkedList<E>();
100 this.pending = new LinkedList<Future<E>>();
101 this.maxPerRoute = new HashMap<T, Integer>();
102 }
103
104
105
106
107 protected abstract E createEntry(T route, C conn);
108
109
110
111
112 protected void onLease(final E entry) {
113 }
114
115
116
117
118 protected void onRelease(final E entry) {
119 }
120
121
122
123
124 protected void onReuse(final E entry) {
125 }
126
127
128
129
130 protected boolean validate(final E entry) {
131 return true;
132 }
133
134 public boolean isShutdown() {
135 return this.isShutDown;
136 }
137
138
139
140
141 public void shutdown() throws IOException {
142 if (this.isShutDown) {
143 return ;
144 }
145 this.isShutDown = true;
146 this.lock.lock();
147 try {
148 for (final E entry: this.available) {
149 entry.close();
150 }
151 for (final E entry: this.leased) {
152 entry.close();
153 }
154 for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
155 pool.shutdown();
156 }
157 this.routeToPool.clear();
158 this.leased.clear();
159 this.available.clear();
160 } finally {
161 this.lock.unlock();
162 }
163 }
164
165 private RouteSpecificPool<T, C, E> getPool(final T route) {
166 RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
167 if (pool == null) {
168 pool = new RouteSpecificPool<T, C, E>(route) {
169
170 @Override
171 protected E createEntry(final C conn) {
172 return AbstractConnPool.this.createEntry(route, conn);
173 }
174
175 };
176 this.routeToPool.put(route, pool);
177 }
178 return pool;
179 }
180
181 private static Exception operationAborted() {
182 return new CancellationException("Operation aborted");
183 }
184
185
186
187
188
189
190
191
192
193 @Override
194 public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
195 Args.notNull(route, "Route");
196 Asserts.check(!this.isShutDown, "Connection pool shut down");
197
198 return new Future<E>() {
199
200 private final AtomicBoolean cancelled = new AtomicBoolean(false);
201 private final AtomicBoolean done = new AtomicBoolean(false);
202 private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
203
204 @Override
205 public boolean cancel(final boolean mayInterruptIfRunning) {
206 if (done.compareAndSet(false, true)) {
207 cancelled.set(true);
208 lock.lock();
209 try {
210 condition.signalAll();
211 } finally {
212 lock.unlock();
213 }
214 if (callback != null) {
215 callback.cancelled();
216 }
217 return true;
218 }
219 return false;
220 }
221
222 @Override
223 public boolean isCancelled() {
224 return cancelled.get();
225 }
226
227 @Override
228 public boolean isDone() {
229 return done.get();
230 }
231
232 @Override
233 public E get() throws InterruptedException, ExecutionException {
234 try {
235 return get(0L, TimeUnit.MILLISECONDS);
236 } catch (final TimeoutException ex) {
237 throw new ExecutionException(ex);
238 }
239 }
240
241 @Override
242 public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
243 for (;;) {
244 synchronized (this) {
245 try {
246 final E entry = entryRef.get();
247 if (entry != null) {
248 return entry;
249 }
250 if (done.get()) {
251 throw new ExecutionException(operationAborted());
252 }
253 final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
254 if (validateAfterInactivity > 0) {
255 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
256 if (!validate(leasedEntry)) {
257 leasedEntry.close();
258 release(leasedEntry, false);
259 continue;
260 }
261 }
262 }
263 if (done.compareAndSet(false, true)) {
264 entryRef.set(leasedEntry);
265 done.set(true);
266 onLease(leasedEntry);
267 if (callback != null) {
268 callback.completed(leasedEntry);
269 }
270 return leasedEntry;
271 } else {
272 release(leasedEntry, true);
273 throw new ExecutionException(operationAborted());
274 }
275 } catch (final IOException ex) {
276 if (done.compareAndSet(false, true)) {
277 if (callback != null) {
278 callback.failed(ex);
279 }
280 }
281 throw new ExecutionException(ex);
282 }
283 }
284 }
285 }
286
287 };
288 }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306 public Future<E> lease(final T route, final Object state) {
307 return lease(route, state, null);
308 }
309
310 private E getPoolEntryBlocking(
311 final T route, final Object state,
312 final long timeout, final TimeUnit timeUnit,
313 final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
314
315 Date deadline = null;
316 if (timeout > 0) {
317 deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
318 }
319 this.lock.lock();
320 try {
321 E entry;
322 for (;;) {
323 Asserts.check(!this.isShutDown, "Connection pool shut down");
324 if (future.isCancelled()) {
325 throw new ExecutionException(operationAborted());
326 }
327 final RouteSpecificPool<T, C, E> pool = getPool(route);
328 for (;;) {
329 entry = pool.getFree(state);
330 if (entry == null) {
331 break;
332 }
333 if (entry.isExpired(System.currentTimeMillis())) {
334 entry.close();
335 }
336 if (entry.isClosed()) {
337 this.available.remove(entry);
338 pool.free(entry, false);
339 } else {
340 break;
341 }
342 }
343 if (entry != null) {
344 this.available.remove(entry);
345 this.leased.add(entry);
346 onReuse(entry);
347 return entry;
348 }
349
350
351 final int maxPerRoute = getMax(route);
352
353 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
354 if (excess > 0) {
355 for (int i = 0; i < excess; i++) {
356 final E lastUsed = pool.getLastUsed();
357 if (lastUsed == null) {
358 break;
359 }
360 lastUsed.close();
361 this.available.remove(lastUsed);
362 pool.remove(lastUsed);
363 }
364 }
365
366 if (pool.getAllocatedCount() < maxPerRoute) {
367 final int totalUsed = this.leased.size();
368 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
369 if (freeCapacity > 0) {
370 final int totalAvailable = this.available.size();
371 if (totalAvailable > freeCapacity - 1) {
372 final E lastUsed = this.available.removeLast();
373 lastUsed.close();
374 final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
375 otherpool.remove(lastUsed);
376 }
377 final C conn = this.connFactory.create(route);
378 entry = pool.add(conn);
379 this.leased.add(entry);
380 return entry;
381 }
382 }
383
384 boolean success = false;
385 try {
386 pool.queue(future);
387 this.pending.add(future);
388 if (deadline != null) {
389 success = this.condition.awaitUntil(deadline);
390 } else {
391 this.condition.await();
392 success = true;
393 }
394 if (future.isCancelled()) {
395 throw new ExecutionException(operationAborted());
396 }
397 } finally {
398
399
400
401
402 pool.unqueue(future);
403 this.pending.remove(future);
404 }
405
406 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
407 break;
408 }
409 }
410 throw new TimeoutException("Timeout waiting for connection");
411 } finally {
412 this.lock.unlock();
413 }
414 }
415
416 @Override
417 public void release(final E entry, final boolean reusable) {
418 this.lock.lock();
419 try {
420 if (this.leased.remove(entry)) {
421 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
422 pool.free(entry, reusable);
423 if (reusable && !this.isShutDown) {
424 this.available.addFirst(entry);
425 } else {
426 entry.close();
427 }
428 onRelease(entry);
429 Future<E> future = pool.nextPending();
430 if (future != null) {
431 this.pending.remove(future);
432 } else {
433 future = this.pending.poll();
434 }
435 if (future != null) {
436 this.condition.signalAll();
437 }
438 }
439 } finally {
440 this.lock.unlock();
441 }
442 }
443
444 private int getMax(final T route) {
445 final Integer v = this.maxPerRoute.get(route);
446 return v != null ? v.intValue() : this.defaultMaxPerRoute;
447 }
448
449 @Override
450 public void setMaxTotal(final int max) {
451 Args.positive(max, "Max value");
452 this.lock.lock();
453 try {
454 this.maxTotal = max;
455 } finally {
456 this.lock.unlock();
457 }
458 }
459
460 @Override
461 public int getMaxTotal() {
462 this.lock.lock();
463 try {
464 return this.maxTotal;
465 } finally {
466 this.lock.unlock();
467 }
468 }
469
470 @Override
471 public void setDefaultMaxPerRoute(final int max) {
472 Args.positive(max, "Max per route value");
473 this.lock.lock();
474 try {
475 this.defaultMaxPerRoute = max;
476 } finally {
477 this.lock.unlock();
478 }
479 }
480
481 @Override
482 public int getDefaultMaxPerRoute() {
483 this.lock.lock();
484 try {
485 return this.defaultMaxPerRoute;
486 } finally {
487 this.lock.unlock();
488 }
489 }
490
491 @Override
492 public void setMaxPerRoute(final T route, final int max) {
493 Args.notNull(route, "Route");
494 this.lock.lock();
495 try {
496 if (max > -1) {
497 this.maxPerRoute.put(route, Integer.valueOf(max));
498 } else {
499 this.maxPerRoute.remove(route);
500 }
501 } finally {
502 this.lock.unlock();
503 }
504 }
505
506 @Override
507 public int getMaxPerRoute(final T route) {
508 Args.notNull(route, "Route");
509 this.lock.lock();
510 try {
511 return getMax(route);
512 } finally {
513 this.lock.unlock();
514 }
515 }
516
517 @Override
518 public PoolStats getTotalStats() {
519 this.lock.lock();
520 try {
521 return new PoolStats(
522 this.leased.size(),
523 this.pending.size(),
524 this.available.size(),
525 this.maxTotal);
526 } finally {
527 this.lock.unlock();
528 }
529 }
530
531 @Override
532 public PoolStats getStats(final T route) {
533 Args.notNull(route, "Route");
534 this.lock.lock();
535 try {
536 final RouteSpecificPool<T, C, E> pool = getPool(route);
537 return new PoolStats(
538 pool.getLeasedCount(),
539 pool.getPendingCount(),
540 pool.getAvailableCount(),
541 getMax(route));
542 } finally {
543 this.lock.unlock();
544 }
545 }
546
547
548
549
550
551
552
553 public Set<T> getRoutes() {
554 this.lock.lock();
555 try {
556 return new HashSet<T>(routeToPool.keySet());
557 } finally {
558 this.lock.unlock();
559 }
560 }
561
562
563
564
565
566
567 protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
568 this.lock.lock();
569 try {
570 final Iterator<E> it = this.available.iterator();
571 while (it.hasNext()) {
572 final E entry = it.next();
573 callback.process(entry);
574 if (entry.isClosed()) {
575 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
576 pool.remove(entry);
577 it.remove();
578 }
579 }
580 purgePoolMap();
581 } finally {
582 this.lock.unlock();
583 }
584 }
585
586
587
588
589
590
591 protected void enumLeased(final PoolEntryCallback<T, C> callback) {
592 this.lock.lock();
593 try {
594 final Iterator<E> it = this.leased.iterator();
595 while (it.hasNext()) {
596 final E entry = it.next();
597 callback.process(entry);
598 }
599 } finally {
600 this.lock.unlock();
601 }
602 }
603
604 private void purgePoolMap() {
605 final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
606 while (it.hasNext()) {
607 final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
608 final RouteSpecificPool<T, C, E> pool = entry.getValue();
609 if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
610 it.remove();
611 }
612 }
613 }
614
615
616
617
618
619
620
621
622 public void closeIdle(final long idletime, final TimeUnit timeUnit) {
623 Args.notNull(timeUnit, "Time unit");
624 long time = timeUnit.toMillis(idletime);
625 if (time < 0) {
626 time = 0;
627 }
628 final long deadline = System.currentTimeMillis() - time;
629 enumAvailable(new PoolEntryCallback<T, C>() {
630
631 @Override
632 public void process(final PoolEntry<T, C> entry) {
633 if (entry.getUpdated() <= deadline) {
634 entry.close();
635 }
636 }
637
638 });
639 }
640
641
642
643
644 public void closeExpired() {
645 final long now = System.currentTimeMillis();
646 enumAvailable(new PoolEntryCallback<T, C>() {
647
648 @Override
649 public void process(final PoolEntry<T, C> entry) {
650 if (entry.isExpired(now)) {
651 entry.close();
652 }
653 }
654
655 });
656 }
657
658
659
660
661
662 public int getValidateAfterInactivity() {
663 return this.validateAfterInactivity;
664 }
665
666
667
668
669
670 public void setValidateAfterInactivity(final int ms) {
671 this.validateAfterInactivity = ms;
672 }
673
674 @Override
675 public String toString() {
676 this.lock.lock();
677 try {
678 final StringBuilder buffer = new StringBuilder();
679 buffer.append("[leased: ");
680 buffer.append(this.leased);
681 buffer.append("][available: ");
682 buffer.append(this.available);
683 buffer.append("][pending: ");
684 buffer.append(this.pending);
685 buffer.append("]");
686 return buffer.toString();
687 } finally {
688 this.lock.unlock();
689 }
690 }
691
692 }