1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.http.impl.conn.tsccm;
28
29 import java.io.IOException;
30 import java.util.Date;
31 import java.util.HashMap;
32 import java.util.Iterator;
33 import java.util.LinkedList;
34 import java.util.Map;
35 import java.util.Queue;
36 import java.util.Set;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.locks.Condition;
39 import java.util.concurrent.locks.Lock;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.http.conn.ClientConnectionOperator;
44 import org.apache.http.conn.ConnectionPoolTimeoutException;
45 import org.apache.http.conn.OperatedClientConnection;
46 import org.apache.http.conn.params.ConnManagerParams;
47 import org.apache.http.conn.params.ConnPerRoute;
48 import org.apache.http.conn.routing.HttpRoute;
49 import org.apache.http.params.HttpParams;
50 import org.apache.http.util.Args;
51 import org.apache.http.util.Asserts;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 @Deprecated
71 public class ConnPoolByRoute extends AbstractConnPool {
72
73 private final Log log = LogFactory.getLog(getClass());
74
75 private final Lock poolLock;
76
77
78 protected final ClientConnectionOperator operator;
79
80
81 protected final ConnPerRoute connPerRoute;
82
83
84 protected final Set<BasicPoolEntry> leasedConnections;
85
86
87 protected final Queue<BasicPoolEntry> freeConnections;
88
89
90 protected final Queue<WaitingThread> waitingThreads;
91
92
93 protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
94
95 private final long connTTL;
96
97 private final TimeUnit connTTLTimeUnit;
98
99 protected volatile boolean shutdown;
100
101 protected volatile int maxTotalConnections;
102
103 protected volatile int numConnections;
104
105
106
107
108
109
110 public ConnPoolByRoute(
111 final ClientConnectionOperator operator,
112 final ConnPerRoute connPerRoute,
113 final int maxTotalConnections) {
114 this(operator, connPerRoute, maxTotalConnections, -1, TimeUnit.MILLISECONDS);
115 }
116
117
118
119
120 public ConnPoolByRoute(
121 final ClientConnectionOperator operator,
122 final ConnPerRoute connPerRoute,
123 final int maxTotalConnections,
124 final long connTTL,
125 final TimeUnit connTTLTimeUnit) {
126 super();
127 Args.notNull(operator, "Connection operator");
128 Args.notNull(connPerRoute, "Connections per route");
129 this.poolLock = super.poolLock;
130 this.leasedConnections = super.leasedConnections;
131 this.operator = operator;
132 this.connPerRoute = connPerRoute;
133 this.maxTotalConnections = maxTotalConnections;
134 this.freeConnections = createFreeConnQueue();
135 this.waitingThreads = createWaitingThreadQueue();
136 this.routeToPool = createRouteToPoolMap();
137 this.connTTL = connTTL;
138 this.connTTLTimeUnit = connTTLTimeUnit;
139 }
140
141 protected Lock getLock() {
142 return this.poolLock;
143 }
144
145
146
147
148
149
150 @Deprecated
151 public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
152 this(operator,
153 ConnManagerParams.getMaxConnectionsPerRoute(params),
154 ConnManagerParams.getMaxTotalConnections(params));
155 }
156
157
158
159
160
161
162
163 protected Queue<BasicPoolEntry> createFreeConnQueue() {
164 return new LinkedList<BasicPoolEntry>();
165 }
166
167
168
169
170
171
172
173 protected Queue<WaitingThread> createWaitingThreadQueue() {
174 return new LinkedList<WaitingThread>();
175 }
176
177
178
179
180
181
182
183 protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
184 return new HashMap<HttpRoute, RouteSpecificPool>();
185 }
186
187
188
189
190
191
192
193
194
195
196 protected RouteSpecificPool newRouteSpecificPool(final HttpRoute route) {
197 return new RouteSpecificPool(route, this.connPerRoute);
198 }
199
200
201
202
203
204
205
206
207
208
209
210 protected WaitingThread newWaitingThread(final Condition cond,
211 final RouteSpecificPool rospl) {
212 return new WaitingThread(cond, rospl);
213 }
214
215 private void closeConnection(final BasicPoolEntry entry) {
216 final OperatedClientConnection conn = entry.getConnection();
217 if (conn != null) {
218 try {
219 conn.close();
220 } catch (final IOException ex) {
221 log.debug("I/O error closing connection", ex);
222 }
223 }
224 }
225
226
227
228
229
230
231
232
233
234
235 protected RouteSpecificPool getRoutePool(final HttpRoute route,
236 final boolean create) {
237 RouteSpecificPool rospl = null;
238 poolLock.lock();
239 try {
240
241 rospl = routeToPool.get(route);
242 if ((rospl == null) && create) {
243
244 rospl = newRouteSpecificPool(route);
245 routeToPool.put(route, rospl);
246 }
247
248 } finally {
249 poolLock.unlock();
250 }
251
252 return rospl;
253 }
254
255 public int getConnectionsInPool(final HttpRoute route) {
256 poolLock.lock();
257 try {
258
259 final RouteSpecificPool rospl = getRoutePool(route, false);
260 return (rospl != null) ? rospl.getEntryCount() : 0;
261
262 } finally {
263 poolLock.unlock();
264 }
265 }
266
267 public int getConnectionsInPool() {
268 poolLock.lock();
269 try {
270 return numConnections;
271 } finally {
272 poolLock.unlock();
273 }
274 }
275
276 @Override
277 public PoolEntryRequest requestPoolEntry(
278 final HttpRoute route,
279 final Object state) {
280
281 final WaitingThreadAborterngThreadAborter.html#WaitingThreadAborter">WaitingThreadAborter aborter = new WaitingThreadAborter();
282
283 return new PoolEntryRequest() {
284
285 @Override
286 public void abortRequest() {
287 poolLock.lock();
288 try {
289 aborter.abort();
290 } finally {
291 poolLock.unlock();
292 }
293 }
294
295 @Override
296 public BasicPoolEntry getPoolEntry(
297 final long timeout,
298 final TimeUnit timeUnit)
299 throws InterruptedException, ConnectionPoolTimeoutException {
300 return getEntryBlocking(route, state, timeout, timeUnit, aborter);
301 }
302
303 };
304 }
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324 protected BasicPoolEntry getEntryBlocking(
325 final HttpRoute route, final Object state,
326 final long timeout, final TimeUnit timeUnit,
327 final WaitingThreadAborter aborter)
328 throws ConnectionPoolTimeoutException, InterruptedException {
329
330 Date deadline = null;
331 if (timeout > 0) {
332 deadline = new Date
333 (System.currentTimeMillis() + timeUnit.toMillis(timeout));
334 }
335
336 BasicPoolEntry entry = null;
337 poolLock.lock();
338 try {
339
340 RouteSpecificPool rospl = getRoutePool(route, true);
341 WaitingThread waitingThread = null;
342
343 while (entry == null) {
344 Asserts.check(!shutdown, "Connection pool shut down");
345
346 if (log.isDebugEnabled()) {
347 log.debug("[" + route + "] total kept alive: " + freeConnections.size() +
348 ", total issued: " + leasedConnections.size() +
349 ", total allocated: " + numConnections + " out of " + maxTotalConnections);
350 }
351
352
353
354
355
356
357
358 entry = getFreeEntry(rospl, state);
359 if (entry != null) {
360 break;
361 }
362
363 final boolean hasCapacity = rospl.getCapacity() > 0;
364
365 if (log.isDebugEnabled()) {
366 log.debug("Available capacity: " + rospl.getCapacity()
367 + " out of " + rospl.getMaxEntries()
368 + " [" + route + "][" + state + "]");
369 }
370
371 if (hasCapacity && numConnections < maxTotalConnections) {
372
373 entry = createEntry(rospl, operator);
374
375 } else if (hasCapacity && !freeConnections.isEmpty()) {
376
377 deleteLeastUsedEntry();
378
379
380 rospl = getRoutePool(route, true);
381 entry = createEntry(rospl, operator);
382
383 } else {
384
385 if (log.isDebugEnabled()) {
386 log.debug("Need to wait for connection" +
387 " [" + route + "][" + state + "]");
388 }
389
390 if (waitingThread == null) {
391 waitingThread =
392 newWaitingThread(poolLock.newCondition(), rospl);
393 aborter.setWaitingThread(waitingThread);
394 }
395
396 boolean success = false;
397 try {
398 rospl.queueThread(waitingThread);
399 waitingThreads.add(waitingThread);
400 success = waitingThread.await(deadline);
401
402 } finally {
403
404
405
406
407 rospl.removeThread(waitingThread);
408 waitingThreads.remove(waitingThread);
409 }
410
411
412 if (!success && (deadline != null) &&
413 (deadline.getTime() <= System.currentTimeMillis())) {
414 throw new ConnectionPoolTimeoutException
415 ("Timeout waiting for connection from pool");
416 }
417 }
418 }
419
420 } finally {
421 poolLock.unlock();
422 }
423 return entry;
424 }
425
426 @Override
427 public void freeEntry(final BasicPoolEntry entry, final boolean reusable, final long validDuration, final TimeUnit timeUnit) {
428
429 final HttpRoute route = entry.getPlannedRoute();
430 if (log.isDebugEnabled()) {
431 log.debug("Releasing connection" +
432 " [" + route + "][" + entry.getState() + "]");
433 }
434
435 poolLock.lock();
436 try {
437 if (shutdown) {
438
439
440 closeConnection(entry);
441 return;
442 }
443
444
445 leasedConnections.remove(entry);
446
447 final RouteSpecificPool rospl = getRoutePool(route, true);
448
449 if (reusable && rospl.getCapacity() >= 0) {
450 if (log.isDebugEnabled()) {
451 final String s;
452 if (validDuration > 0) {
453 s = "for " + validDuration + " " + timeUnit;
454 } else {
455 s = "indefinitely";
456 }
457 log.debug("Pooling connection" +
458 " [" + route + "][" + entry.getState() + "]; keep alive " + s);
459 }
460 rospl.freeEntry(entry);
461 entry.updateExpiry(validDuration, timeUnit);
462 freeConnections.add(entry);
463 } else {
464 closeConnection(entry);
465 rospl.dropEntry();
466 numConnections--;
467 }
468
469 notifyWaitingThread(rospl);
470
471 } finally {
472 poolLock.unlock();
473 }
474 }
475
476
477
478
479
480
481
482
483
484 protected BasicPoolEntry getFreeEntry(final RouteSpecificPool rospl, final Object state) {
485
486 BasicPoolEntry entry = null;
487 poolLock.lock();
488 try {
489 boolean done = false;
490 while(!done) {
491
492 entry = rospl.allocEntry(state);
493
494 if (entry != null) {
495 if (log.isDebugEnabled()) {
496 log.debug("Getting free connection"
497 + " [" + rospl.getRoute() + "][" + state + "]");
498
499 }
500 freeConnections.remove(entry);
501 if (entry.isExpired(System.currentTimeMillis())) {
502
503
504 if (log.isDebugEnabled()) {
505 log.debug("Closing expired free connection"
506 + " [" + rospl.getRoute() + "][" + state + "]");
507 }
508 closeConnection(entry);
509
510
511
512 rospl.dropEntry();
513 numConnections--;
514 } else {
515 leasedConnections.add(entry);
516 done = true;
517 }
518
519 } else {
520 done = true;
521 if (log.isDebugEnabled()) {
522 log.debug("No free connections"
523 + " [" + rospl.getRoute() + "][" + state + "]");
524 }
525 }
526 }
527 } finally {
528 poolLock.unlock();
529 }
530 return entry;
531 }
532
533
534
535
536
537
538
539
540
541
542
543
544 protected BasicPoolEntry createEntry(final RouteSpecificPool rospl,
545 final ClientConnectionOperator op) {
546
547 if (log.isDebugEnabled()) {
548 log.debug("Creating new connection [" + rospl.getRoute() + "]");
549 }
550
551
552 final BasicPoolEntrycm/BasicPoolEntry.html#BasicPoolEntry">BasicPoolEntry entry = new BasicPoolEntry(op, rospl.getRoute(), connTTL, connTTLTimeUnit);
553
554 poolLock.lock();
555 try {
556 rospl.createdEntry(entry);
557 numConnections++;
558 leasedConnections.add(entry);
559 } finally {
560 poolLock.unlock();
561 }
562
563 return entry;
564 }
565
566
567
568
569
570
571
572
573
574
575
576
577
578 protected void deleteEntry(final BasicPoolEntry entry) {
579
580 final HttpRoute route = entry.getPlannedRoute();
581
582 if (log.isDebugEnabled()) {
583 log.debug("Deleting connection"
584 + " [" + route + "][" + entry.getState() + "]");
585 }
586
587 poolLock.lock();
588 try {
589
590 closeConnection(entry);
591
592 final RouteSpecificPool rospl = getRoutePool(route, true);
593 rospl.deleteEntry(entry);
594 numConnections--;
595 if (rospl.isUnused()) {
596 routeToPool.remove(route);
597 }
598
599 } finally {
600 poolLock.unlock();
601 }
602 }
603
604
605
606
607
608
609 protected void deleteLeastUsedEntry() {
610 poolLock.lock();
611 try {
612
613 final BasicPoolEntry entry = freeConnections.remove();
614
615 if (entry != null) {
616 deleteEntry(entry);
617 } else if (log.isDebugEnabled()) {
618 log.debug("No free connection to delete");
619 }
620
621 } finally {
622 poolLock.unlock();
623 }
624 }
625
626 @Override
627 protected void handleLostEntry(final HttpRoute route) {
628
629 poolLock.lock();
630 try {
631
632 final RouteSpecificPool rospl = getRoutePool(route, true);
633 rospl.dropEntry();
634 if (rospl.isUnused()) {
635 routeToPool.remove(route);
636 }
637
638 numConnections--;
639 notifyWaitingThread(rospl);
640
641 } finally {
642 poolLock.unlock();
643 }
644 }
645
646
647
648
649
650
651
652
653
654 protected void notifyWaitingThread(final RouteSpecificPool rospl) {
655
656
657
658
659
660
661 WaitingThread waitingThread = null;
662
663 poolLock.lock();
664 try {
665
666 if ((rospl != null) && rospl.hasThread()) {
667 if (log.isDebugEnabled()) {
668 log.debug("Notifying thread waiting on pool" +
669 " [" + rospl.getRoute() + "]");
670 }
671 waitingThread = rospl.nextThread();
672 } else if (!waitingThreads.isEmpty()) {
673 if (log.isDebugEnabled()) {
674 log.debug("Notifying thread waiting on any pool");
675 }
676 waitingThread = waitingThreads.remove();
677 } else if (log.isDebugEnabled()) {
678 log.debug("Notifying no-one, there are no waiting threads");
679 }
680
681 if (waitingThread != null) {
682 waitingThread.wakeup();
683 }
684
685 } finally {
686 poolLock.unlock();
687 }
688 }
689
690
691 @Override
692 public void deleteClosedConnections() {
693 poolLock.lock();
694 try {
695 final Iterator<BasicPoolEntry> iter = freeConnections.iterator();
696 while (iter.hasNext()) {
697 final BasicPoolEntry entry = iter.next();
698 if (!entry.getConnection().isOpen()) {
699 iter.remove();
700 deleteEntry(entry);
701 }
702 }
703 } finally {
704 poolLock.unlock();
705 }
706 }
707
708
709
710
711
712
713
714
715 @Override
716 public void closeIdleConnections(final long idletime, final TimeUnit timeUnit) {
717 Args.notNull(timeUnit, "Time unit");
718 final long t = idletime > 0 ? idletime : 0;
719 if (log.isDebugEnabled()) {
720 log.debug("Closing connections idle longer than " + t + " " + timeUnit);
721 }
722
723 final long deadline = System.currentTimeMillis() - timeUnit.toMillis(t);
724 poolLock.lock();
725 try {
726 final Iterator<BasicPoolEntry> iter = freeConnections.iterator();
727 while (iter.hasNext()) {
728 final BasicPoolEntry entry = iter.next();
729 if (entry.getUpdated() <= deadline) {
730 if (log.isDebugEnabled()) {
731 log.debug("Closing connection last used @ " + new Date(entry.getUpdated()));
732 }
733 iter.remove();
734 deleteEntry(entry);
735 }
736 }
737 } finally {
738 poolLock.unlock();
739 }
740 }
741
742 @Override
743 public void closeExpiredConnections() {
744 log.debug("Closing expired connections");
745 final long now = System.currentTimeMillis();
746
747 poolLock.lock();
748 try {
749 final Iterator<BasicPoolEntry> iter = freeConnections.iterator();
750 while (iter.hasNext()) {
751 final BasicPoolEntry entry = iter.next();
752 if (entry.isExpired(now)) {
753 if (log.isDebugEnabled()) {
754 log.debug("Closing connection expired @ " + new Date(entry.getExpiry()));
755 }
756 iter.remove();
757 deleteEntry(entry);
758 }
759 }
760 } finally {
761 poolLock.unlock();
762 }
763 }
764
765 @Override
766 public void shutdown() {
767 poolLock.lock();
768 try {
769 if (shutdown) {
770 return;
771 }
772 shutdown = true;
773
774
775 final Iterator<BasicPoolEntry> iter1 = leasedConnections.iterator();
776 while (iter1.hasNext()) {
777 final BasicPoolEntry entry = iter1.next();
778 iter1.remove();
779 closeConnection(entry);
780 }
781
782
783 final Iterator<BasicPoolEntry> iter2 = freeConnections.iterator();
784 while (iter2.hasNext()) {
785 final BasicPoolEntry entry = iter2.next();
786 iter2.remove();
787
788 if (log.isDebugEnabled()) {
789 log.debug("Closing connection"
790 + " [" + entry.getPlannedRoute() + "][" + entry.getState() + "]");
791 }
792 closeConnection(entry);
793 }
794
795
796 final Iterator<WaitingThread> iwth = waitingThreads.iterator();
797 while (iwth.hasNext()) {
798 final WaitingThread waiter = iwth.next();
799 iwth.remove();
800 waiter.wakeup();
801 }
802
803 routeToPool.clear();
804
805 } finally {
806 poolLock.unlock();
807 }
808 }
809
810
811
812
813 public void setMaxTotalConnections(final int max) {
814 poolLock.lock();
815 try {
816 maxTotalConnections = max;
817 } finally {
818 poolLock.unlock();
819 }
820 }
821
822
823
824
825
826 public int getMaxTotalConnections() {
827 return maxTotalConnections;
828 }
829
830 }
831