View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactor;
28  
29  import java.util.ArrayDeque;
30  import java.util.HashSet;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.locks.ReentrantLock;
38  
39  import org.apache.hc.core5.annotation.Contract;
40  import org.apache.hc.core5.annotation.ThreadingBehavior;
41  import org.apache.hc.core5.concurrent.ComplexFuture;
42  import org.apache.hc.core5.concurrent.FutureCallback;
43  import org.apache.hc.core5.concurrent.FutureContribution;
44  import org.apache.hc.core5.function.Callback;
45  import org.apache.hc.core5.http.ConnectionClosedException;
46  import org.apache.hc.core5.io.CloseMode;
47  import org.apache.hc.core5.io.ModalCloseable;
48  import org.apache.hc.core5.util.Args;
49  import org.apache.hc.core5.util.Asserts;
50  import org.apache.hc.core5.util.TimeValue;
51  import org.apache.hc.core5.util.Timeout;
52  
53  /**
54   * @since 5.0
55   */
56  @Contract(threading = ThreadingBehavior.SAFE)
57  public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
58  
59      private final ConcurrentMap<T, PoolEntry> sessionPool;
60      private final AtomicBoolean closed;
61  
62      private final ReentrantLock lock;
63  
64      public AbstractIOSessionPool() {
65          super();
66          this.sessionPool = new ConcurrentHashMap<>();
67          this.closed = new AtomicBoolean(false);
68          this.lock = new ReentrantLock();
69      }
70  
71      protected abstract Future<IOSession> connectSession(
72              T namedEndpoint,
73              Timeout connectTimeout,
74              FutureCallback<IOSession> callback);
75  
76      protected abstract void validateSession(
77              IOSession ioSession,
78              Callback<Boolean> callback);
79  
80      protected abstract void closeSession(
81              IOSession ioSession,
82              CloseMode closeMode);
83  
84      @Override
85      public final void close(final CloseMode closeMode) {
86          if (closed.compareAndSet(false, true)) {
87              for (final PoolEntry poolEntry : sessionPool.values()) {
88                  lock.lock();
89                  try {
90                      if (poolEntry.session != null) {
91                          closeSession(poolEntry.session, closeMode);
92                          poolEntry.session = null;
93                      }
94                      if (poolEntry.sessionFuture != null) {
95                          poolEntry.sessionFuture.cancel(true);
96                          poolEntry.sessionFuture = null;
97                      }
98                      for (;;) {
99                          final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
100                         if (callback != null) {
101                             callback.cancelled();
102                         } else {
103                             break;
104                         }
105                     }
106                 } finally {
107                     lock.unlock();
108                 }
109             }
110             sessionPool.clear();
111         }
112     }
113 
114     @Override
115     public final void close() {
116         close(CloseMode.GRACEFUL);
117     }
118 
119     PoolEntry getPoolEntry(final T endpoint) {
120         PoolEntry poolEntry = sessionPool.get(endpoint);
121         if (poolEntry == null) {
122             final PoolEntry newPoolEntry = new PoolEntry();
123             poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
124             if (poolEntry == null) {
125                 poolEntry = newPoolEntry;
126             }
127         }
128         return poolEntry;
129     }
130 
131     public final Future<IOSession> getSession(
132             final T endpoint,
133             final Timeout connectTimeout,
134             final FutureCallback<IOSession> callback) {
135         Args.notNull(endpoint, "Endpoint");
136         Asserts.check(!closed.get(), "Connection pool shut down");
137         final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
138         final PoolEntry poolEntry = getPoolEntry(endpoint);
139         getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
140 
141             @Override
142             public void completed(final IOSession ioSession) {
143                 validateSession(ioSession, result -> {
144                     if (result) {
145                         future.completed(ioSession);
146                     } else {
147                         getSessionInternal(poolEntry, true, endpoint, connectTimeout,
148                             new FutureContribution<IOSession>(future) {
149 
150                             @Override
151                             public void completed(final IOSession ioSession1) {
152                                 future.completed(ioSession1);
153                             }
154 
155                         });
156                     }
157                 });
158             }
159 
160             @Override
161             public void failed(final Exception ex) {
162                 future.failed(ex);
163             }
164 
165             @Override
166             public void cancelled() {
167                 future.cancel();
168             }
169 
170         });
171         return future;
172     }
173 
174     private void getSessionInternal(
175             final PoolEntry poolEntry,
176             final boolean requestNew,
177             final T namedEndpoint,
178             final Timeout connectTimeout,
179             final FutureCallback<IOSession> callback) {
180         poolEntry.lock.lock();
181         try {
182             if (poolEntry.session != null && requestNew) {
183                 closeSession(poolEntry.session, CloseMode.GRACEFUL);
184                 poolEntry.session = null;
185             }
186             if (poolEntry.session != null && !poolEntry.session.isOpen()) {
187                 poolEntry.session = null;
188             }
189             if (poolEntry.session != null) {
190                 callback.completed(poolEntry.session);
191             } else {
192                 poolEntry.requestQueue.add(callback);
193                 if (poolEntry.sessionFuture != null && poolEntry.completed) {
194                     poolEntry.sessionFuture = null;
195                 }
196                 if (poolEntry.sessionFuture == null) {
197                     poolEntry.completed = false;
198                     poolEntry.sessionFuture = connectSession(
199                             namedEndpoint,
200                             connectTimeout,
201                             new FutureCallback<IOSession>() {
202 
203                                 @Override
204                                 public void completed(final IOSession result) {
205                                     poolEntry.lock.lock();
206                                     try {
207                                         poolEntry.completed = true;
208                                         if (poolEntry.session == null) {
209                                             poolEntry.session = result;
210                                         } else {
211                                             closeSession(result,CloseMode.GRACEFUL);
212                                         }
213                                         for (;;) {
214                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
215                                             if (callback != null) {
216                                                 callback.completed(result);
217                                             } else {
218                                                 break;
219                                             }
220                                         }
221                                     } finally {
222                                         poolEntry.lock.unlock();
223                                     }
224                                 }
225 
226                                 @Override
227                                 public void failed(final Exception ex) {
228                                     poolEntry.lock.lock();
229                                     try {
230                                         poolEntry.completed = true;
231                                         poolEntry.session = null;
232                                         for (;;) {
233                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
234                                             if (callback != null) {
235                                                 callback.failed(ex);
236                                             } else {
237                                                 break;
238                                             }
239                                         }
240                                     } finally {
241                                         poolEntry.lock.unlock();
242                                     }
243                                 }
244 
245                                 @Override
246                                 public void cancelled() {
247                                     failed(new ConnectionClosedException("Connection request cancelled"));
248                                 }
249 
250                             });
251                 }
252             }
253         } finally {
254             poolEntry.lock.unlock();
255         }
256     }
257 
258     public final void enumAvailable(final Callback<IOSession> callback) {
259         for (final PoolEntry poolEntry: sessionPool.values()) {
260             if (poolEntry.session != null) {
261                 lock.lock();
262                 try {
263                     if (poolEntry.session != null) {
264                         callback.execute(poolEntry.session);
265                         if (!poolEntry.session.isOpen()) {
266                             poolEntry.session = null;
267                         }
268                     }
269                 } finally {
270                     lock.unlock();
271                 }
272             }
273         }
274     }
275 
276     public final void closeIdle(final TimeValue idleTime) {
277         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
278         for (final PoolEntry poolEntry: sessionPool.values()) {
279             if (poolEntry.session != null) {
280                 lock.lock();
281                 try {
282                     if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
283                         closeSession(poolEntry.session, CloseMode.GRACEFUL);
284                         poolEntry.session = null;
285                     }
286                 } finally {
287                     lock.unlock();
288                 }
289             }
290         }
291     }
292 
293     public final Set<T> getRoutes() {
294         return new HashSet<>(sessionPool.keySet());
295     }
296 
297     @Override
298     public String toString() {
299         final StringBuilder buffer = new StringBuilder();
300         buffer.append("I/O sessions: ");
301         buffer.append(sessionPool.size());
302         return buffer.toString();
303     }
304 
305     static class PoolEntry {
306 
307         final Queue<FutureCallback<IOSession>> requestQueue;
308         volatile boolean completed;
309         volatile Future<IOSession> sessionFuture;
310         volatile IOSession session;
311         final ReentrantLock lock; // Added
312 
313         PoolEntry() {
314             this.requestQueue = new ArrayDeque<>();
315             this.lock = new ReentrantLock();
316         }
317 
318     }
319 
320 }