View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.pool;
28  
29  import java.util.Deque;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedDeque;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicLong;
40  import java.util.concurrent.atomic.AtomicMarkableReference;
41  
42  import org.apache.hc.core5.annotation.Contract;
43  import org.apache.hc.core5.annotation.Experimental;
44  import org.apache.hc.core5.annotation.ThreadingBehavior;
45  import org.apache.hc.core5.concurrent.BasicFuture;
46  import org.apache.hc.core5.concurrent.Cancellable;
47  import org.apache.hc.core5.concurrent.FutureCallback;
48  import org.apache.hc.core5.function.Callback;
49  import org.apache.hc.core5.io.CloseMode;
50  import org.apache.hc.core5.io.ModalCloseable;
51  import org.apache.hc.core5.util.Args;
52  import org.apache.hc.core5.util.Asserts;
53  import org.apache.hc.core5.util.Deadline;
54  import org.apache.hc.core5.util.DeadlineTimeoutException;
55  import org.apache.hc.core5.util.LangUtils;
56  import org.apache.hc.core5.util.TimeValue;
57  import org.apache.hc.core5.util.Timeout;
58  
59  /**
60   * Connection pool with higher concurrency but with lax connection limit guarantees.
61   *
62   * @param <T> route
63   * @param <C> connection object
64   *
65   * @since 5.0
66   */
67  @Contract(threading = ThreadingBehavior.SAFE)
68  @Experimental
69  public class LaxConnPool<T, C extends ModalCloseable> implements ManagedConnPool<T, C> {
70  
71      private final TimeValue timeToLive;
72      private final PoolReusePolicy policy;
73      private final DisposalCallback<C> disposalCallback;
74      private final ConnPoolListener<T> connPoolListener;
75      private final ConcurrentMap<T, PerRoutePool<T, C>> routeToPool;
76      private final AtomicBoolean isShutDown;
77  
78      private volatile int defaultMaxPerRoute;
79  
80      /**
81       * @since 5.0
82       */
83      public LaxConnPool(
84              final int defaultMaxPerRoute,
85              final TimeValue timeToLive,
86              final PoolReusePolicy policy,
87              final DisposalCallback<C> disposalCallback,
88              final ConnPoolListener<T> connPoolListener) {
89          super();
90          Args.positive(defaultMaxPerRoute, "Max per route value");
91          this.timeToLive = TimeValue.defaultsToNegativeOneMillisecond(timeToLive);
92          this.policy = policy != null ? policy : PoolReusePolicy.LIFO;
93          this.disposalCallback = disposalCallback;
94          this.connPoolListener = connPoolListener;
95          this.routeToPool = new ConcurrentHashMap<>();
96          this.isShutDown = new AtomicBoolean(false);
97          this.defaultMaxPerRoute = defaultMaxPerRoute;
98      }
99  
100     /**
101      * @since 5.0
102      */
103     public LaxConnPool(
104             final int defaultMaxPerRoute,
105             final TimeValue timeToLive,
106             final PoolReusePolicy policy,
107             final ConnPoolListener<T> connPoolListener) {
108         this(defaultMaxPerRoute, timeToLive, policy, null, connPoolListener);
109     }
110 
111     public LaxConnPool(final int defaultMaxPerRoute) {
112         this(defaultMaxPerRoute, TimeValue.NEG_ONE_MILLISECOND, PoolReusePolicy.LIFO, null, null);
113     }
114 
115     public boolean isShutdown() {
116         return isShutDown.get();
117     }
118 
119     @Override
120     public void close(final CloseMode closeMode) {
121         if (isShutDown.compareAndSet(false, true)) {
122             for (final Iterator<PerRoutePool<T, C>> it = routeToPool.values().iterator(); it.hasNext(); ) {
123                 final PerRoutePool<T, C> routePool = it.next();
124                 routePool.shutdown(closeMode);
125             }
126             routeToPool.clear();
127         }
128     }
129 
130     @Override
131     public void close() {
132         close(CloseMode.GRACEFUL);
133     }
134 
135     private PerRoutePool<T, C> getPool(final T route) {
136         PerRoutePool<T, C> routePool = routeToPool.get(route);
137         if (routePool == null) {
138             final PerRoutePool<T, C> newRoutePool = new PerRoutePool<>(
139                     route,
140                     defaultMaxPerRoute,
141                     timeToLive,
142                     policy,
143                     this,
144                     disposalCallback,
145                     connPoolListener);
146             routePool = routeToPool.putIfAbsent(route, newRoutePool);
147             if (routePool == null) {
148                 routePool = newRoutePool;
149             }
150         }
151         return routePool;
152     }
153 
154     @Override
155     public Future<PoolEntry<T, C>> lease(
156             final T route, final Object state,
157             final Timeout requestTimeout,
158             final FutureCallback<PoolEntry<T, C>> callback) {
159         Args.notNull(route, "Route");
160         Asserts.check(!isShutDown.get(), "Connection pool shut down");
161         final PerRoutePool<T, C> routePool = getPool(route);
162         return routePool.lease(state, requestTimeout, callback);
163     }
164 
165     public Future<PoolEntry<T, C>> lease(final T route, final Object state) {
166         return lease(route, state, Timeout.DISABLED, null);
167     }
168 
169     @Override
170     public void release(final PoolEntry<T, C> entry, final boolean reusable) {
171         if (entry == null) {
172             return;
173         }
174         if (isShutDown.get()) {
175             return;
176         }
177         final PerRoutePool<T, C> routePool = getPool(entry.getRoute());
178         routePool.release(entry, reusable);
179     }
180 
181     public void validatePendingRequests() {
182         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
183             routePool.validatePendingRequests();
184         }
185     }
186 
187     @Override
188     public void setMaxTotal(final int max) {
189     }
190 
191     @Override
192     public int getMaxTotal() {
193         return 0;
194     }
195 
196     @Override
197     public void setDefaultMaxPerRoute(final int max) {
198         Args.positive(max, "Max value");
199         defaultMaxPerRoute = max;
200     }
201 
202     @Override
203     public int getDefaultMaxPerRoute() {
204         return defaultMaxPerRoute;
205     }
206 
207     @Override
208     public void setMaxPerRoute(final T route, final int max) {
209         Args.notNull(route, "Route");
210         final PerRoutePool<T, C> routePool = getPool(route);
211         routePool.setMax(max > -1 ? max : defaultMaxPerRoute);
212     }
213 
214     @Override
215     public int getMaxPerRoute(final T route) {
216         Args.notNull(route, "Route");
217         final PerRoutePool<T, C> routePool = getPool(route);
218         return routePool.getMax();
219     }
220 
221     @Override
222     public PoolStats getTotalStats() {
223         int leasedTotal = 0;
224         int pendingTotal = 0;
225         int availableTotal = 0;
226         int maxTotal = 0;
227         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
228             leasedTotal += routePool.getLeasedCount();
229             pendingTotal += routePool.getPendingCount();
230             availableTotal += routePool.getAvailableCount();
231             maxTotal += routePool.getMax();
232         }
233         return new PoolStats(leasedTotal, pendingTotal, availableTotal, maxTotal);
234     }
235 
236     @Override
237     public PoolStats getStats(final T route) {
238         Args.notNull(route, "Route");
239         final PerRoutePool<T, C> routePool = getPool(route);
240         return new PoolStats(
241                 routePool.getLeasedCount(),
242                 routePool.getPendingCount(),
243                 routePool.getAvailableCount(),
244                 routePool.getMax());
245     }
246 
247     @Override
248     public Set<T> getRoutes() {
249         return new HashSet<>(routeToPool.keySet());
250     }
251 
252     public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
253         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
254             routePool.enumAvailable(callback);
255         }
256     }
257 
258     public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
259         for (final PerRoutePool<T, C> routePool : routeToPool.values()) {
260             routePool.enumLeased(callback);
261         }
262     }
263 
264     @Override
265     public void closeIdle(final TimeValue idleTime) {
266         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
267         enumAvailable(new Callback<PoolEntry<T, C>>() {
268 
269             @Override
270             public void execute(final PoolEntry<T, C> entry) {
271                 if (entry.getUpdated() <= deadline) {
272                     entry.discardConnection(CloseMode.GRACEFUL);
273                 }
274             }
275 
276         });
277     }
278 
279     @Override
280     public void closeExpired() {
281         final long now = System.currentTimeMillis();
282         enumAvailable(new Callback<PoolEntry<T, C>>() {
283 
284             @Override
285             public void execute(final PoolEntry<T, C> entry) {
286                 if (entry.getExpiryDeadline().isBefore(now)) {
287                     entry.discardConnection(CloseMode.GRACEFUL);
288                 }
289             }
290 
291         });
292     }
293 
294     @Override
295     public String toString() {
296         final PoolStats totalStats = getTotalStats();
297         final StringBuilder buffer = new StringBuilder();
298         buffer.append("[leased: ");
299         buffer.append(totalStats.getLeased());
300         buffer.append("][available: ");
301         buffer.append(totalStats.getAvailable());
302         buffer.append("][pending: ");
303         buffer.append(totalStats.getPending());
304         buffer.append("]");
305         return buffer.toString();
306     }
307 
308     static class LeaseRequest<T, C extends ModalCloseable> implements Cancellable {
309 
310         private final Object state;
311         private final Deadline deadline;
312         private final BasicFuture<PoolEntry<T, C>> future;
313 
314         LeaseRequest(
315                 final Object state,
316                 final Timeout requestTimeout,
317                 final BasicFuture<PoolEntry<T, C>> future) {
318             super();
319             this.state = state;
320             this.deadline = Deadline.calculate(requestTimeout);
321             this.future = future;
322         }
323 
324         BasicFuture<PoolEntry<T, C>> getFuture() {
325             return this.future;
326         }
327 
328         public Object getState() {
329             return this.state;
330         }
331 
332         public Deadline getDeadline() {
333             return this.deadline;
334         }
335 
336         public boolean isDone() {
337             return this.future.isDone();
338         }
339 
340         public boolean completed(final PoolEntry<T, C> result) {
341             return future.completed(result);
342         }
343 
344         public boolean failed(final Exception ex) {
345             return future.failed(ex);
346         }
347 
348         @Override
349         public boolean cancel() {
350             return future.cancel();
351         }
352 
353     }
354 
355     static class PerRoutePool<T, C extends ModalCloseable> {
356 
357         private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL }
358 
359         private final T route;
360         private final TimeValue timeToLive;
361         private final PoolReusePolicy policy;
362         private final DisposalCallback<C> disposalCallback;
363         private final ConnPoolListener<T> connPoolListener;
364         private final ConnPoolStats<T> connPoolStats;
365         private final ConcurrentMap<PoolEntry<T, C>, Boolean> leased;
366         private final Deque<AtomicMarkableReference<PoolEntry<T, C>>> available;
367         private final Deque<LeaseRequest<T, C>> pending;
368         private final AtomicBoolean terminated;
369         private final AtomicInteger allocated;
370         private final AtomicLong releaseSeqNum;
371 
372         private volatile int max;
373 
374         PerRoutePool(
375                 final T route,
376                 final int max,
377                 final TimeValue timeToLive,
378                 final PoolReusePolicy policy,
379                 final ConnPoolStats<T> connPoolStats,
380                 final DisposalCallback<C> disposalCallback,
381                 final ConnPoolListener<T> connPoolListener) {
382             super();
383             this.route = route;
384             this.timeToLive = timeToLive;
385             this.policy = policy;
386             this.connPoolStats = connPoolStats;
387             this.disposalCallback = disposalCallback;
388             this.connPoolListener = connPoolListener;
389             this.leased = new ConcurrentHashMap<>();
390             this.available = new ConcurrentLinkedDeque<>();
391             this.pending = new ConcurrentLinkedDeque<>();
392             this.terminated = new AtomicBoolean(false);
393             this.allocated = new AtomicInteger(0);
394             this.releaseSeqNum = new AtomicLong(0);
395             this.max = max;
396         }
397 
398         public void shutdown(final CloseMode closeMode) {
399             if (terminated.compareAndSet(false, true)) {
400                 AtomicMarkableReference<PoolEntry<T, C>> entryRef;
401                 while ((entryRef = available.poll()) != null) {
402                     entryRef.getReference().discardConnection(closeMode);
403                 }
404                 for (final PoolEntry<T, C> entry : leased.keySet()) {
405                     entry.discardConnection(closeMode);
406                 }
407                 leased.clear();
408                 LeaseRequest<T, C> leaseRequest;
409                 while ((leaseRequest = pending.poll()) != null) {
410                     leaseRequest.cancel();
411                 }
412             }
413         }
414 
415         private PoolEntry<T, C> createPoolEntry() {
416             final int poolmax = max;
417             int prev, next;
418             do {
419                 prev = allocated.get();
420                 next = (prev<poolmax)? prev+1 : prev;
421             } while (!allocated.compareAndSet(prev, next));
422             return (prev < next)? new PoolEntry<>(route, timeToLive, disposalCallback) : null;
423         }
424 
425         private void deallocatePoolEntry() {
426             allocated.decrementAndGet();
427         }
428 
429         private void addLeased(final PoolEntry<T, C> entry) {
430             if (leased.putIfAbsent(entry, Boolean.TRUE) != null) {
431                 throw new IllegalStateException("Pool entry already present in the set of leased entries");
432             } else if (connPoolListener != null) {
433                 connPoolListener.onLease(route, connPoolStats);
434             }
435         }
436 
437         private void removeLeased(final PoolEntry<T, C> entry) {
438             if (connPoolListener != null) {
439                 connPoolListener.onRelease(route, connPoolStats);
440             }
441             if (!leased.remove(entry, Boolean.TRUE)) {
442                 throw new IllegalStateException("Pool entry is not present in the set of leased entries");
443             }
444         }
445 
446         private PoolEntry<T, C> getAvailableEntry(final Object state) {
447             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
448                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
449                 final PoolEntry<T, C> entry = ref.getReference();
450                 if (ref.compareAndSet(entry, entry, false, true)) {
451                     it.remove();
452                     if (entry.getExpiryDeadline().isExpired()) {
453                         entry.discardConnection(CloseMode.GRACEFUL);
454                     }
455                     if (!LangUtils.equals(entry.getState(), state)) {
456                         entry.discardConnection(CloseMode.GRACEFUL);
457                     }
458                     return entry;
459                 }
460             }
461             return null;
462         }
463 
464         public Future<PoolEntry<T, C>> lease(
465                 final Object state,
466                 final Timeout requestTimeout,
467                 final FutureCallback<PoolEntry<T, C>> callback) {
468             Asserts.check(!terminated.get(), "Connection pool shut down");
469             final BasicFuture<PoolEntry<T, C>> future = new BasicFuture<>(callback);
470             final long releaseState = releaseSeqNum.get();
471             PoolEntry<T, C> entry = null;
472             if (pending.isEmpty()) {
473                 entry = getAvailableEntry(state);
474                 if (entry == null) {
475                     entry = createPoolEntry();
476                 }
477             }
478             if (entry != null) {
479                 addLeased(entry);
480                 future.completed(entry);
481             } else {
482                 pending.add(new LeaseRequest<>(state, requestTimeout, future));
483                 if (releaseState != releaseSeqNum.get()) {
484                     servicePendingRequest();
485                 }
486             }
487             return future;
488         }
489 
490         public void release(final PoolEntry<T, C> releasedEntry, final boolean reusable) {
491             removeLeased(releasedEntry);
492             if (!reusable || releasedEntry.getExpiryDeadline().isExpired()) {
493                 releasedEntry.discardConnection(CloseMode.GRACEFUL);
494             }
495             if (releasedEntry.hasConnection()) {
496                 switch (policy) {
497                     case LIFO:
498                         available.addFirst(new AtomicMarkableReference<>(releasedEntry, false));
499                         break;
500                     case FIFO:
501                         available.addLast(new AtomicMarkableReference<>(releasedEntry, false));
502                         break;
503                     default:
504                         throw new IllegalStateException("Unexpected ConnPoolPolicy value: " + policy);
505                 }
506             }
507             else {
508                 deallocatePoolEntry();
509             }
510             releaseSeqNum.incrementAndGet();
511             servicePendingRequest();
512         }
513 
514 
515         private void servicePendingRequest() {
516             servicePendingRequests(RequestServiceStrategy.FIRST_SUCCESSFUL);
517         }
518 
519         private void servicePendingRequests(final RequestServiceStrategy serviceStrategy) {
520             LeaseRequest<T, C> leaseRequest;
521             while ((leaseRequest = pending.poll()) != null) {
522                 if (leaseRequest.isDone()) {
523                     continue;
524                 }
525                 final Object state = leaseRequest.getState();
526                 final Deadline deadline = leaseRequest.getDeadline();
527 
528                 if (deadline.isExpired()) {
529                     leaseRequest.failed(DeadlineTimeoutException.from(deadline));
530                 } else {
531                     final long releaseState = releaseSeqNum.get();
532                     PoolEntry<T, C> entry = getAvailableEntry(state);
533                     if (entry == null) {
534                         entry = createPoolEntry();
535                     }
536                     if (entry != null) {
537                         addLeased(entry);
538                         if (!leaseRequest.completed(entry)) {
539                             release(entry, true);
540                         }
541                         if (serviceStrategy == RequestServiceStrategy.FIRST_SUCCESSFUL) {
542                             break;
543                         }
544                     }
545                     else {
546                         pending.addFirst(leaseRequest);
547                         if (releaseState == releaseSeqNum.get()) {
548                             break;
549                         }
550                     }
551                 }
552             }
553         }
554 
555         public void validatePendingRequests() {
556             final Iterator<LeaseRequest<T, C>> it = pending.iterator();
557             while (it.hasNext()) {
558                 final LeaseRequest<T, C> request = it.next();
559                 final BasicFuture<PoolEntry<T, C>> future = request.getFuture();
560                 if (future.isCancelled() && !request.isDone()) {
561                     it.remove();
562                 } else {
563                     final Deadline deadline = request.getDeadline();
564                     if (deadline.isExpired()) {
565                         request.failed(DeadlineTimeoutException.from(deadline));
566                     }
567                     if (request.isDone()) {
568                         it.remove();
569                     }
570                 }
571             }
572         }
573 
574         public final T getRoute() {
575             return route;
576         }
577 
578         public int getMax() {
579             return max;
580         }
581 
582         public void setMax(final int max) {
583             this.max = max;
584         }
585 
586         public int getPendingCount() {
587             return pending.size();
588         }
589 
590         public int getLeasedCount() {
591             return leased.size();
592         }
593 
594         public int getAvailableCount() {
595             return available.size();
596         }
597 
598         public void enumAvailable(final Callback<PoolEntry<T, C>> callback) {
599             for (final Iterator<AtomicMarkableReference<PoolEntry<T, C>>> it = available.iterator(); it.hasNext(); ) {
600                 final AtomicMarkableReference<PoolEntry<T, C>> ref = it.next();
601                 final PoolEntry<T, C> entry = ref.getReference();
602                 if (ref.compareAndSet(entry, entry, false, true)) {
603                     callback.execute(entry);
604                     if (!entry.hasConnection()) {
605                         deallocatePoolEntry();
606                         it.remove();
607                     }
608                     else {
609                         ref.set(entry, false);
610                     }
611                 }
612             }
613             releaseSeqNum.incrementAndGet();
614             servicePendingRequests(RequestServiceStrategy.ALL);
615         }
616 
617         public void enumLeased(final Callback<PoolEntry<T, C>> callback) {
618             for (final Iterator<PoolEntry<T, C>> it = leased.keySet().iterator(); it.hasNext(); ) {
619                 final PoolEntry<T, C> entry = it.next();
620                 callback.execute(entry);
621                 if (!entry.hasConnection()) {
622                     deallocatePoolEntry();
623                     it.remove();
624                 }
625             }
626         }
627 
628         @Override
629         public String toString() {
630             final StringBuilder buffer = new StringBuilder();
631             buffer.append("[route: ");
632             buffer.append(route);
633             buffer.append("][leased: ");
634             buffer.append(leased.size());
635             buffer.append("][available: ");
636             buffer.append(available.size());
637             buffer.append("][pending: ");
638             buffer.append(pending.size());
639             buffer.append("]");
640             return buffer.toString();
641         }
642 
643     }
644 
645 }