View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.http.pool;
28  
29  import java.io.IOException;
30  import java.util.Date;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.LinkedList;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.CancellationException;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.TimeoutException;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.atomic.AtomicReference;
44  import java.util.concurrent.locks.Condition;
45  import java.util.concurrent.locks.Lock;
46  import java.util.concurrent.locks.ReentrantLock;
47  
48  import org.apache.http.annotation.Contract;
49  import org.apache.http.annotation.ThreadingBehavior;
50  import org.apache.http.concurrent.FutureCallback;
51  import org.apache.http.util.Args;
52  import org.apache.http.util.Asserts;
53  
54  /**
55   * Abstract synchronous (blocking) pool of connections.
56   * <p>
57   * Please note that this class does not maintain its own pool of execution {@link Thread}s.
58   * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
59   * method on the {@link Future} object returned by the
60   * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
61   * to complete.
62   *
63   * @param <T> the route type that represents the opposite endpoint of a pooled
64   *   connection.
65   * @param <C> the connection type.
66   * @param <E> the type of the pool entry containing a pooled connection.
67   * @since 4.2
68   */
69  @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
70  public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
71                                                 implements ConnPool<T, E>, ConnPoolControl<T> {
72  
73      private final Lock lock;
74      private final Condition condition;
75      private final ConnFactory<T, C> connFactory;
76      private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
77      private final Set<E> leased;
78      private final LinkedList<E> available;
79      private final LinkedList<Future<E>> pending;
80      private final Map<T, Integer> maxPerRoute;
81  
82      private volatile boolean isShutDown;
83      private volatile int defaultMaxPerRoute;
84      private volatile int maxTotal;
85      private volatile int validateAfterInactivity;
86  
87      public AbstractConnPool(
88              final ConnFactory<T, C> connFactory,
89              final int defaultMaxPerRoute,
90              final int maxTotal) {
91          super();
92          this.connFactory = Args.notNull(connFactory, "Connection factory");
93          this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
94          this.maxTotal = Args.positive(maxTotal, "Max total value");
95          this.lock = new ReentrantLock();
96          this.condition = this.lock.newCondition();
97          this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
98          this.leased = new HashSet<E>();
99          this.available = new LinkedList<E>();
100         this.pending = new LinkedList<Future<E>>();
101         this.maxPerRoute = new HashMap<T, Integer>();
102     }
103 
104     /**
105      * Creates a new entry for the given connection with the given route.
106      */
107     protected abstract E createEntry(T route, C conn);
108 
109     /**
110      * @since 4.3
111      */
112     protected void onLease(final E entry) {
113     }
114 
115     /**
116      * @since 4.3
117      */
118     protected void onRelease(final E entry) {
119     }
120 
121     /**
122      * @since 4.4
123      */
124     protected void onReuse(final E entry) {
125     }
126 
127     /**
128      * @since 4.4
129      */
130     protected boolean validate(final E entry) {
131         return true;
132     }
133 
134     public boolean isShutdown() {
135         return this.isShutDown;
136     }
137 
138     /**
139      * Shuts down the pool.
140      */
141     public void shutdown() throws IOException {
142         if (this.isShutDown) {
143             return ;
144         }
145         this.isShutDown = true;
146         this.lock.lock();
147         try {
148             for (final E entry: this.available) {
149                 entry.close();
150             }
151             for (final E entry: this.leased) {
152                 entry.close();
153             }
154             for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
155                 pool.shutdown();
156             }
157             this.routeToPool.clear();
158             this.leased.clear();
159             this.available.clear();
160         } finally {
161             this.lock.unlock();
162         }
163     }
164 
165     private RouteSpecificPool<T, C, E> getPool(final T route) {
166         RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
167         if (pool == null) {
168             pool = new RouteSpecificPool<T, C, E>(route) {
169 
170                 @Override
171                 protected E createEntry(final C conn) {
172                     return AbstractConnPool.this.createEntry(route, conn);
173                 }
174 
175             };
176             this.routeToPool.put(route, pool);
177         }
178         return pool;
179     }
180 
181     private static Exception operationAborted() {
182         return new CancellationException("Operation aborted");
183     }
184 
185     /**
186      * {@inheritDoc}
187      * <p>
188      * Please note that this class does not maintain its own pool of execution
189      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
190      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
191      * returned by this method in order for the lease operation to complete.
192      */
193     @Override
194     public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
195         Args.notNull(route, "Route");
196         Asserts.check(!this.isShutDown, "Connection pool shut down");
197 
198         return new Future<E>() {
199 
200             private final AtomicBoolean cancelled = new AtomicBoolean(false);
201             private final AtomicBoolean done = new AtomicBoolean(false);
202             private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
203 
204             @Override
205             public boolean cancel(final boolean mayInterruptIfRunning) {
206                 if (done.compareAndSet(false, true)) {
207                     cancelled.set(true);
208                     lock.lock();
209                     try {
210                         condition.signalAll();
211                     } finally {
212                         lock.unlock();
213                     }
214                     if (callback != null) {
215                         callback.cancelled();
216                     }
217                     return true;
218                 }
219                 return false;
220             }
221 
222             @Override
223             public boolean isCancelled() {
224                 return cancelled.get();
225             }
226 
227             @Override
228             public boolean isDone() {
229                 return done.get();
230             }
231 
232             @Override
233             public E get() throws InterruptedException, ExecutionException {
234                 try {
235                     return get(0L, TimeUnit.MILLISECONDS);
236                 } catch (final TimeoutException ex) {
237                     throw new ExecutionException(ex);
238                 }
239             }
240 
241             @Override
242             public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
243                 for (;;) {
244                     synchronized (this) {
245                         try {
246                             final E entry = entryRef.get();
247                             if (entry != null) {
248                                 return entry;
249                             }
250                             if (done.get()) {
251                                 throw new ExecutionException(operationAborted());
252                             }
253                             final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
254                             if (validateAfterInactivity > 0)  {
255                                 if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
256                                     if (!validate(leasedEntry)) {
257                                         leasedEntry.close();
258                                         release(leasedEntry, false);
259                                         continue;
260                                     }
261                                 }
262                             }
263                             if (done.compareAndSet(false, true)) {
264                                 entryRef.set(leasedEntry);
265                                 done.set(true);
266                                 onLease(leasedEntry);
267                                 if (callback != null) {
268                                     callback.completed(leasedEntry);
269                                 }
270                                 return leasedEntry;
271                             } else {
272                                 release(leasedEntry, true);
273                                 throw new ExecutionException(operationAborted());
274                             }
275                         } catch (final IOException ex) {
276                             if (done.compareAndSet(false, true)) {
277                                 if (callback != null) {
278                                     callback.failed(ex);
279                                 }
280                             }
281                             throw new ExecutionException(ex);
282                         }
283                     }
284                 }
285             }
286 
287         };
288     }
289 
290     /**
291      * Attempts to lease a connection for the given route and with the given
292      * state from the pool.
293      * <p>
294      * Please note that this class does not maintain its own pool of execution
295      * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
296      * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
297      * returned by this method in order for the lease operation to complete.
298      *
299      * @param route route of the connection.
300      * @param state arbitrary object that represents a particular state
301      *  (usually a security principal or a unique token identifying
302      *  the user whose credentials have been used while establishing the connection).
303      *  May be {@code null}.
304      * @return future for a leased pool entry.
305      */
306     public Future<E> lease(final T route, final Object state) {
307         return lease(route, state, null);
308     }
309 
310     private E getPoolEntryBlocking(
311             final T route, final Object state,
312             final long timeout, final TimeUnit timeUnit,
313             final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
314 
315         Date deadline = null;
316         if (timeout > 0) {
317             deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
318         }
319         this.lock.lock();
320         try {
321             E entry;
322             for (;;) {
323                 Asserts.check(!this.isShutDown, "Connection pool shut down");
324                 if (future.isCancelled()) {
325                     throw new ExecutionException(operationAborted());
326                 }
327                 final RouteSpecificPool<T, C, E> pool = getPool(route);
328                 for (;;) {
329                     entry = pool.getFree(state);
330                     if (entry == null) {
331                         break;
332                     }
333                     if (entry.isExpired(System.currentTimeMillis())) {
334                         entry.close();
335                     }
336                     if (entry.isClosed()) {
337                         this.available.remove(entry);
338                         pool.free(entry, false);
339                     } else {
340                         break;
341                     }
342                 }
343                 if (entry != null) {
344                     this.available.remove(entry);
345                     this.leased.add(entry);
346                     onReuse(entry);
347                     return entry;
348                 }
349 
350                 // New connection is needed
351                 final int maxPerRoute = getMax(route);
352                 // Shrink the pool prior to allocating a new connection
353                 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
354                 if (excess > 0) {
355                     for (int i = 0; i < excess; i++) {
356                         final E lastUsed = pool.getLastUsed();
357                         if (lastUsed == null) {
358                             break;
359                         }
360                         lastUsed.close();
361                         this.available.remove(lastUsed);
362                         pool.remove(lastUsed);
363                     }
364                 }
365 
366                 if (pool.getAllocatedCount() < maxPerRoute) {
367                     final int totalUsed = this.leased.size();
368                     final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
369                     if (freeCapacity > 0) {
370                         final int totalAvailable = this.available.size();
371                         if (totalAvailable > freeCapacity - 1) {
372                             final E lastUsed = this.available.removeLast();
373                             lastUsed.close();
374                             final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
375                             otherpool.remove(lastUsed);
376                         }
377                         final C conn = this.connFactory.create(route);
378                         entry = pool.add(conn);
379                         this.leased.add(entry);
380                         return entry;
381                     }
382                 }
383 
384                 boolean success = false;
385                 try {
386                     pool.queue(future);
387                     this.pending.add(future);
388                     if (deadline != null) {
389                         success = this.condition.awaitUntil(deadline);
390                     } else {
391                         this.condition.await();
392                         success = true;
393                     }
394                     if (future.isCancelled()) {
395                         throw new ExecutionException(operationAborted());
396                     }
397                 } finally {
398                     // In case of 'success', we were woken up by the
399                     // connection pool and should now have a connection
400                     // waiting for us, or else we're shutting down.
401                     // Just continue in the loop, both cases are checked.
402                     pool.unqueue(future);
403                     this.pending.remove(future);
404                 }
405                 // check for spurious wakeup vs. timeout
406                 if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
407                     break;
408                 }
409             }
410             throw new TimeoutException("Timeout waiting for connection");
411         } finally {
412             this.lock.unlock();
413         }
414     }
415 
416     @Override
417     public void release(final E entry, final boolean reusable) {
418         this.lock.lock();
419         try {
420             if (this.leased.remove(entry)) {
421                 final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
422                 pool.free(entry, reusable);
423                 if (reusable && !this.isShutDown) {
424                     this.available.addFirst(entry);
425                 } else {
426                     entry.close();
427                 }
428                 onRelease(entry);
429                 Future<E> future = pool.nextPending();
430                 if (future != null) {
431                     this.pending.remove(future);
432                 } else {
433                     future = this.pending.poll();
434                 }
435                 if (future != null) {
436                     this.condition.signalAll();
437                 }
438             }
439         } finally {
440             this.lock.unlock();
441         }
442     }
443 
444     private int getMax(final T route) {
445         final Integer v = this.maxPerRoute.get(route);
446         return v != null ? v.intValue() : this.defaultMaxPerRoute;
447     }
448 
449     @Override
450     public void setMaxTotal(final int max) {
451         Args.positive(max, "Max value");
452         this.lock.lock();
453         try {
454             this.maxTotal = max;
455         } finally {
456             this.lock.unlock();
457         }
458     }
459 
460     @Override
461     public int getMaxTotal() {
462         this.lock.lock();
463         try {
464             return this.maxTotal;
465         } finally {
466             this.lock.unlock();
467         }
468     }
469 
470     @Override
471     public void setDefaultMaxPerRoute(final int max) {
472         Args.positive(max, "Max per route value");
473         this.lock.lock();
474         try {
475             this.defaultMaxPerRoute = max;
476         } finally {
477             this.lock.unlock();
478         }
479     }
480 
481     @Override
482     public int getDefaultMaxPerRoute() {
483         this.lock.lock();
484         try {
485             return this.defaultMaxPerRoute;
486         } finally {
487             this.lock.unlock();
488         }
489     }
490 
491     @Override
492     public void setMaxPerRoute(final T route, final int max) {
493         Args.notNull(route, "Route");
494         this.lock.lock();
495         try {
496             if (max > -1) {
497                 this.maxPerRoute.put(route, Integer.valueOf(max));
498             } else {
499                 this.maxPerRoute.remove(route);
500             }
501         } finally {
502             this.lock.unlock();
503         }
504     }
505 
506     @Override
507     public int getMaxPerRoute(final T route) {
508         Args.notNull(route, "Route");
509         this.lock.lock();
510         try {
511             return getMax(route);
512         } finally {
513             this.lock.unlock();
514         }
515     }
516 
517     @Override
518     public PoolStats getTotalStats() {
519         this.lock.lock();
520         try {
521             return new PoolStats(
522                     this.leased.size(),
523                     this.pending.size(),
524                     this.available.size(),
525                     this.maxTotal);
526         } finally {
527             this.lock.unlock();
528         }
529     }
530 
531     @Override
532     public PoolStats getStats(final T route) {
533         Args.notNull(route, "Route");
534         this.lock.lock();
535         try {
536             final RouteSpecificPool<T, C, E> pool = getPool(route);
537             return new PoolStats(
538                     pool.getLeasedCount(),
539                     pool.getPendingCount(),
540                     pool.getAvailableCount(),
541                     getMax(route));
542         } finally {
543             this.lock.unlock();
544         }
545     }
546 
547     /**
548      * Returns snapshot of all knows routes
549      * @return the set of routes
550      *
551      * @since 4.4
552      */
553     public Set<T> getRoutes() {
554         this.lock.lock();
555         try {
556             return new HashSet<T>(routeToPool.keySet());
557         } finally {
558             this.lock.unlock();
559         }
560     }
561 
562     /**
563      * Enumerates all available connections.
564      *
565      * @since 4.3
566      */
567     protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
568         this.lock.lock();
569         try {
570             final Iterator<E> it = this.available.iterator();
571             while (it.hasNext()) {
572                 final E entry = it.next();
573                 callback.process(entry);
574                 if (entry.isClosed()) {
575                     final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
576                     pool.remove(entry);
577                     it.remove();
578                 }
579             }
580             purgePoolMap();
581         } finally {
582             this.lock.unlock();
583         }
584     }
585 
586     /**
587      * Enumerates all leased connections.
588      *
589      * @since 4.3
590      */
591     protected void enumLeased(final PoolEntryCallback<T, C> callback) {
592         this.lock.lock();
593         try {
594             final Iterator<E> it = this.leased.iterator();
595             while (it.hasNext()) {
596                 final E entry = it.next();
597                 callback.process(entry);
598             }
599         } finally {
600             this.lock.unlock();
601         }
602     }
603 
604     private void purgePoolMap() {
605         final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
606         while (it.hasNext()) {
607             final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
608             final RouteSpecificPool<T, C, E> pool = entry.getValue();
609             if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
610                 it.remove();
611             }
612         }
613     }
614 
615     /**
616      * Closes connections that have been idle longer than the given period
617      * of time and evicts them from the pool.
618      *
619      * @param idletime maximum idle time.
620      * @param timeUnit time unit.
621      */
622     public void closeIdle(final long idletime, final TimeUnit timeUnit) {
623         Args.notNull(timeUnit, "Time unit");
624         long time = timeUnit.toMillis(idletime);
625         if (time < 0) {
626             time = 0;
627         }
628         final long deadline = System.currentTimeMillis() - time;
629         enumAvailable(new PoolEntryCallback<T, C>() {
630 
631             @Override
632             public void process(final PoolEntry<T, C> entry) {
633                 if (entry.getUpdated() <= deadline) {
634                     entry.close();
635                 }
636             }
637 
638         });
639     }
640 
641     /**
642      * Closes expired connections and evicts them from the pool.
643      */
644     public void closeExpired() {
645         final long now = System.currentTimeMillis();
646         enumAvailable(new PoolEntryCallback<T, C>() {
647 
648             @Override
649             public void process(final PoolEntry<T, C> entry) {
650                 if (entry.isExpired(now)) {
651                     entry.close();
652                 }
653             }
654 
655         });
656     }
657 
658     /**
659      * @return the number of milliseconds
660      * @since 4.4
661      */
662     public int getValidateAfterInactivity() {
663         return this.validateAfterInactivity;
664     }
665 
666     /**
667      * @param ms the number of milliseconds
668      * @since 4.4
669      */
670     public void setValidateAfterInactivity(final int ms) {
671         this.validateAfterInactivity = ms;
672     }
673 
674     @Override
675     public String toString() {
676         this.lock.lock();
677         try {
678             final StringBuilder buffer = new StringBuilder();
679             buffer.append("[leased: ");
680             buffer.append(this.leased);
681             buffer.append("][available: ");
682             buffer.append(this.available);
683             buffer.append("][pending: ");
684             buffer.append(this.pending);
685             buffer.append("]");
686             return buffer.toString();
687         } finally {
688             this.lock.unlock();
689         }
690     }
691 
692 }