View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.Deque;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Objects;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentLinkedDeque;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.TimeoutException;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.atomic.AtomicLong;
44  import java.util.concurrent.atomic.AtomicMarkableReference;
45  import java.util.concurrent.locks.ReentrantLock;
46  
47  import org.apache.hc.core5.annotation.Contract;
48  import org.apache.hc.core5.annotation.Experimental;
49  import org.apache.hc.core5.annotation.ThreadingBehavior;
50  import org.apache.hc.core5.concurrent.BasicFuture;
51  import org.apache.hc.core5.concurrent.Cancellable;
52  import org.apache.hc.core5.concurrent.FutureCallback;
53  import org.apache.hc.core5.function.Callback;
54  import org.apache.hc.core5.io.CloseMode;
55  import org.apache.hc.core5.io.ModalCloseable;
56  import org.apache.hc.core5.util.Args;
57  import org.apache.hc.core5.util.Asserts;
58  import org.apache.hc.core5.util.Deadline;
59  import org.apache.hc.core5.util.DeadlineTimeoutException;
60  import org.apache.hc.core5.util.TimeValue;
61  import org.apache.hc.core5.util.Timeout;
62  
63  /**
64   * Connection pool with higher concurrency but with lax connection limit guarantees.
65   *
66   * @param <T> route
67   * @param <C> connection object
68   *
69   * @since 5.0
70   */
71  @Contract(threading = ThreadingBehavior.SAFE)
72  @Experimental
73  public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
74  
75      private final TimeValue timeToLive;
76      private final PoolReusePolicy policy;
77      private final DisposalCallback<C> disposalCallback;
78      private final ConnPoolListener<T> connPoolListener;
79      private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
80      private final AtomicBoolean isShutDown;
81  
82      private volatile int defaultMaxPerRoute;
83  
84      /**
85       * @since 5.0
86       */
87      public LaxConnPool(
88              final int defaultMaxPerRoute,
89              final TimeValue timeToLive,
90              final PoolReusePolicy policy,
91              final DisposalCallback<C> disposalCallback,
92              final ConnPoolListener<T> connPoolListener) {
93          super();
94          Args.positive(defaultMaxPerRoute, "Max per route value");
95          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
96          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
97          this.disposalCallback = disposalCallback;
98          this.connPoolListener = connPoolListener;
99          this.routeToPool = new ConcurrentHashMap<>();
100         this.isShutDown = new AtomicBoolean(false);
101         this.defaultMaxPerRoute = defaultMaxPerRoute;
102     }
103 
104     /**
105      * @since 5.0
106      */
107     public LaxConnPool(
108             final int defaultMaxPerRoute,
109             final TimeValue timeToLive,
110             final PoolReusePolicy policy,
111             final ConnPoolListener<T> connPoolListener) {
112         this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
113     }
114 
115     public LaxConnPool(final int defaultMaxPerRoute) {
116         this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
117     }
118 
119     public boolean isShutdown() {
120         return isShutDown.get();
121     }
122 
123     @Override
124     public void close(final CloseMode closeMode) {
125         if (isShutDown.compareAndSet(false, true)) {
126             for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
127                 final PerRoutePool<T, C> routePool = it.next();
128                 routePool.shutdown(closeMode);
129             }
130             routeToPool.clear();
131         }
132     }
133 
134     @Override
135     public void close() {
136         close(CloseMode.GRACEFUL);
137     }
138 
139     private PerRoutePool<T, C> getPool(final T route) {
140         PerRoutePool<T, C> routePool = routeToPool.get(route);
141         if (routePool == null) {
142             final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
143                     route,
144                     defaultMaxPerRoute,
145                     timeToLive,
146                     policy,
147                     this,
148                     disposalCallback,
149                     connPoolListener);
150             routePool = routeToPool.putIfAbsent(route, newRoutePool);
151             if (routePool == null) {
152                 routePool = newRoutePool;
153             }
154         }
155         return routePool;
156     }
157 
158     @Override
159     public Future<PoolEntry<T, C>> lease(
160             final T route, final Object state,
161             final Timeout requestTimeout,
162             final FutureCallback<PoolEntry<T, C>> callback) {
163         Args.notNull(route, "Route");
164         Asserts.check(!isShutDown.get(), "Connection pool shut down");
165         final PerRoutePool<T, C> routePool = getPool(route);
166         return routePool.lease(state, requestTimeout, callback);
167     }
168 
169     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
170         return lease(route, state, Timeout.DISABLED, null);
171     }
172 
173     @Override
174     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
175         if (entry == null) {
176             return;
177         }
178         if (isShutDown.get()) {
179             return;
180         }
181         final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
182         routePool.release(entry, reusable);
183     }
184 
185     public void validatePendingRequests() {
186         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
187             routePool.validatePendingRequests();
188         }
189     }
190 
191     @Override
192     public void setMaxTotal(final int max) {
193     }
194 
195     @Override
196     public int getMaxTotal() {
197         return 0;
198     }
199 
200     @Override
201     public void setDefaultMaxPerRoute(final int max) {
202         Args.positive(max, "Max value");
203         defaultMaxPerRoute = max;
204     }
205 
206     @Override
207     public int getDefaultMaxPerRoute() {
208         return defaultMaxPerRoute;
209     }
210 
211     @Override
212     public void setMaxPerRoute(final T route, final int max) {
213         Args.notNull(route, "Route");
214         final PerRoutePool<T, C> routePool = getPool(route);
215         routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
216     }
217 
218     @Override
219     public int getMaxPerRoute(final T route) {
220         Args.notNull(route, "Route");
221         final PerRoutePool<T, C> routePool = getPool(route);
222         return routePool.getMax();
223     }
224 
225     @Override
226     public PoolStats getTotalStats() {
227         int leasedTotal = 0;
228         int pendingTotal = 0;
229         int availableTotal = 0;
230         int maxTotal = 0;
231         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
232             leasedTotal += routePool.getLeasedCount();
233             pendingTotal += routePool.getPendingCount();
234             availableTotal += routePool.getAvailableCount();
235             maxTotal += routePool.getMax();
236         }
237         return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
238     }
239 
240     @Override
241     public PoolStats getStats(final T route) {
242         Args.notNull(route, "Route");
243         final PerRoutePool<T, C> routePool = getPool(route);
244         return new PoolStats(
245                 routePool.getLeasedCount(),
246                 routePool.getPendingCount(),
247                 routePool.getAvailableCount(),
248                 routePool.getMax());
249     }
250 
251     @Override
252     public Set<T> getRoutes() {
253         return new HashSet<>(routeToPool.keySet());
254     }
255 
256     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
257         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
258             routePool.enumAvailable(callback);
259         }
260     }
261 
262     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
263         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
264             routePool.enumLeased(callback);
265         }
266     }
267 
268     @Override
269     public void closeIdle(final TimeValue idleTime) {
270         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
271         enumAvailable(entry -> {
272             if (entry.getUpdated() <= deadline) {
273                 entry.discardConnection(CloseMode.GRACEFUL);
274             }
275         });
276     }
277 
278     @Override
279     public void closeExpired() {
280         final long now = System.currentTimeMillis();
281         enumAvailable(entry -> {
282             if (entry.getExpiryDeadline().isBefore(now)) {
283                 entry.discardConnection(CloseMode.GRACEFUL);
284             }
285         });
286     }
287 
288     @Override
289     public String toString() {
290         final PoolStats totalStats = getTotalStats();
291         final StringBuilder buffer = new StringBuilder();
292         buffer.append("[leased: ");
293         buffer.append(totalStats.getLeased());
294         buffer.append("][available: ");
295         buffer.append(totalStats.getAvailable());
296         buffer.append("][pending: ");
297         buffer.append(totalStats.getPending());
298         buffer.append("]");
299         return buffer.toString();
300     }
301 
302     static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
303 
304         private final Object state;
305         private final Deadline deadline;
306         private final BasicFuture<PoolEntry<T, C>> future;
307 
308         LeaseRequest(
309                 final Object state,
310                 final Timeout requestTimeout,
311                 final BasicFuture<PoolEntry<T, C>> future) {
312             super();
313             this.state = state;
314             this.deadline = Deadline.calculate(requestTimeout);
315             this.future = future;
316         }
317 
318         BasicFuture<PoolEntry<T, C>> getFuture() {
319             return this.future;
320         }
321 
322         public Object getState() {
323             return this.state;
324         }
325 
326         public Deadline getDeadline() {
327             return this.deadline;
328         }
329 
330         public boolean isDone() {
331             return this.future.isDone();
332         }
333 
334         public boolean completed(final PoolEntry<T, C> result) {
335             return future.completed(result);
336         }
337 
338         public boolean failed(final Exception ex) {
339             return future.failed(ex);
340         }
341 
342         @Override
343         public boolean cancel() {
344             return future.cancel();
345         }
346 
347     }
348 
349     static class PerRoutePool<T, C extends ModalCloseable> {
350 
351         private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
352 
353         private final T route;
354         private final TimeValue timeToLive;
355         private final PoolReusePolicy policy;
356         private final DisposalCallback<C> disposalCallback;
357         private final ConnPoolListener<T> connPoolListener;
358         private final ConnPoolStats<T> connPoolStats;
359         private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
360         private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
361         private final Deque<LeaseRequest<T, C>> pending;
362         private final AtomicBoolean terminated;
363         private final AtomicInteger allocated;
364         private final AtomicLong releaseSeqNum;
365 
366         private final ReentrantLock lock;
367 
368         private volatile int max;
369 
370         PerRoutePool(
371                 final T route,
372                 final int max,
373                 final TimeValue timeToLive,
374                 final PoolReusePolicy policy,
375                 final ConnPoolStats<T> connPoolStats,
376                 final DisposalCallback<C> disposalCallback,
377                 final ConnPoolListener<T> connPoolListener) {
378             super();
379             this.route = route;
380             this.timeToLive = timeToLive;
381             this.policy = policy;
382             this.connPoolStats = connPoolStats;
383             this.disposalCallback = disposalCallback;
384             this.connPoolListener = connPoolListener;
385             this.leased = new ConcurrentHashMap<>();
386             this.available = new ConcurrentLinkedDeque<>();
387             this.pending = new ConcurrentLinkedDeque<>();
388             this.terminated = new AtomicBoolean(false);
389             this.allocated = new AtomicInteger(0);
390             this.releaseSeqNum = new AtomicLong(0);
391             this.max = max;
392             this.lock = new ReentrantLock();
393         }
394 
395         public void shutdown(final CloseMode closeMode) {
396             if (terminated.compareAndSet(false, true)) {
397                 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
398                 while ((entryRef = available.poll()) != null) {
399                     entryRef.getReference().discardConnection(closeMode);
400                 }
401                 for (final PoolEntry<T, C> entry : leased.keySet()) {
402                     entry.discardConnection(closeMode);
403                 }
404                 leased.clear();
405                 LeaseRequest<T, C> leaseRequest;
406                 while ((leaseRequest = pending.poll()) != null) {
407                     leaseRequest.cancel();
408                 }
409             }
410         }
411 
412         private PoolEntry<T, C> createPoolEntry() {
413             final int poolMax = max;
414             int prev, next;
415             do {
416                 prev = allocated.get();
417                 next = (prev<poolMax)? prev+1 : prev;
418             } while (!allocated.compareAndSet(prev, next));
419             return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
420         }
421 
422         private void deallocatePoolEntry() {
423             allocated.decrementAndGet();
424         }
425 
426         private void addLeased(final PoolEntry<T, C> entry) {
427             if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
428                 throw new IllegalStateException("Pool entry already present in the set of leased entries");
429             } else if (connPoolListener != null) {
430                 connPoolListener.onLease(route, connPoolStats);
431             }
432         }
433 
434         private void removeLeased(final PoolEntry<T, C> entry) {
435             if (connPoolListener != null) {
436                 connPoolListener.onRelease(route, connPoolStats);
437             }
438             if (!leased.remove(entry, Boolean.TRUE)) {
439                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
440             }
441         }
442 
443         private PoolEntry<T, C> getAvailableEntry(final Object state) {
444             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
445                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
446                 final PoolEntry<T, C> entry = ref.getReference();
447                 if (ref.compareAndSet(entry, entry, false, true)) {
448                     it.remove();
449                     if (entry.getExpiryDeadline().isExpired()) {
450                         entry.discardConnection(CloseMode.GRACEFUL);
451                     }
452                     if (!Objects.equals(entry.getState(), state)) {
453                         entry.discardConnection(CloseMode.GRACEFUL);
454                     }
455                     return entry;
456                 }
457             }
458             return null;
459         }
460 
461         public Future<PoolEntry<T, C>> lease(
462                 final Object state,
463                 final Timeout requestTimeout,
464                 final FutureCallback<PoolEntry<T, C>> callback) {
465             Asserts.check(!terminated.get(), "Connection pool shut down");
466             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<PoolEntry<T, C>>(callback) {
467 
468                 @Override
469                 public PoolEntry<T, C> get(
470                         final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
471                     try {
472                         return super.get(timeout, unit);
473                     } catch (final TimeoutException ex) {
474                         cancel();
475                         throw ex;
476                     }
477                 }
478 
479             };
480             final long releaseState = releaseSeqNum.get();
481             PoolEntry<T, C> entry = null;
482             if (pending.isEmpty()) {
483                 entry = getAvailableEntry(state);
484                 if (entry == null) {
485                     entry = createPoolEntry();
486                 }
487             }
488             if (entry != null) {
489                 addLeased(entry);
490                 future.completed(entry);
491             } else {
492                 pending.add(new LeaseRequest<>(state, requestTimeout, future));
493                 if (releaseState != releaseSeqNum.get()) {
494                     servicePendingRequest();
495                 }
496             }
497             return future;
498         }
499 
500         public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
501             removeLeased(releasedEntry);
502             if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
503                 releasedEntry.discardConnection(CloseMode.GRACEFUL);
504             }
505             if (releasedEntry.hasConnection()) {
506                 switch (policy) {
507                     case LIFO:
508                         available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
509                         break;
510                     case FIFO:
511                         available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
512                         break;
513                     default:
514                         throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
515                 }
516             }
517             else {
518                 deallocatePoolEntry();
519             }
520             releaseSeqNum.incrementAndGet();
521             servicePendingRequest();
522         }
523 
524 
525         private void servicePendingRequest() {
526             servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
527         }
528 
529         private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
530             LeaseRequest<T, C> leaseRequest;
531             while ((leaseRequest = pending.poll()) != null) {
532                 if (leaseRequest.isDone()) {
533                     continue;
534                 }
535                 final Object state = leaseRequest.getState();
536                 final Deadline deadline = leaseRequest.getDeadline();
537 
538                 if (deadline.isExpired()) {
539                     leaseRequest.failed(DeadlineTimeoutException.from(deadline));
540                 } else {
541                     final long releaseState = releaseSeqNum.get();
542                     PoolEntry<T, C> entry = getAvailableEntry(state);
543                     if (entry == null) {
544                         entry = createPoolEntry();
545                     }
546                     if (entry != null) {
547                         addLeased(entry);
548                         if (!leaseRequest.completed(entry)) {
549                             release(entry, true);
550                         }
551                         if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
552                             break;
553                         }
554                     }
555                     else {
556                         pending.addFirst(leaseRequest);
557                         if (releaseState == releaseSeqNum.get()) {
558                             break;
559                         }
560                     }
561                 }
562             }
563         }
564 
565         public void validatePendingRequests() {
566             final Iterator<LeaseRequest<T, C>> it = pending.iterator();
567             while (it.hasNext()) {
568                 final LeaseRequest<T, C> request = it.next();
569                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
570                 if (future.isCancelled() && !request.isDone()) {
571                     it.remove();
572                 } else {
573                     final Deadline deadline = request.getDeadline();
574                     if (deadline.isExpired()) {
575                         request.failed(DeadlineTimeoutException.from(deadline));
576                     }
577                     if (request.isDone()) {
578                         it.remove();
579                     }
580                 }
581             }
582         }
583 
584         public final T getRoute() {
585             return route;
586         }
587 
588         public int getMax() {
589             return max;
590         }
591 
592         public void setMax(final int max) {
593             this.max = max;
594         }
595 
596         public int getPendingCount() {
597             return pending.size();
598         }
599 
600         public int getLeasedCount() {
601             return leased.size();
602         }
603 
604         public int getAvailableCount() {
605             return available.size();
606         }
607 
608         public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
609             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
610                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
611                 final PoolEntry<T, C> entry = ref.getReference();
612                 if (ref.compareAndSet(entry, entry, false, true)) {
613                     callback.execute(entry);
614                     if (!entry.hasConnection()) {
615                         deallocatePoolEntry();
616                         it.remove();
617                     }
618                     else {
619                         ref.set(entry, false);
620                     }
621                 }
622             }
623             releaseSeqNum.incrementAndGet();
624             servicePendingRequests(RequestServiceStrategy.ALL);
625         }
626 
627         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
628             for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
629                 final PoolEntry<T, C> entry = it.next();
630                 callback.execute(entry);
631                 if (!entry.hasConnection()) {
632                     deallocatePoolEntry();
633                     it.remove();
634                 }
635             }
636         }
637 
638         @Override
639         public String toString() {
640             final StringBuilder buffer = new StringBuilder();
641             buffer.append("[route: ");
642             buffer.append(route);
643             buffer.append("][leased: ");
644             buffer.append(leased.size());
645             buffer.append("][available: ");
646             buffer.append(available.size());
647             buffer.append("][pending: ");
648             buffer.append(pending.size());
649             buffer.append("]");
650             return buffer.toString();
651         }
652 
653     }
654 
655 }