View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.reactor;
28  
29  import java.util.ArrayDeque;
30  import java.util.HashSet;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.apache.hc.core5.annotation.Contract;
39  import org.apache.hc.core5.annotation.ThreadingBehavior;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.function.Callback;
43  import org.apache.hc.core5.http.ConnectionClosedException;
44  import org.apache.hc.core5.io.CloseMode;
45  import org.apache.hc.core5.io.ModalCloseable;
46  import org.apache.hc.core5.util.Args;
47  import org.apache.hc.core5.util.Asserts;
48  import org.apache.hc.core5.util.TimeValue;
49  import org.apache.hc.core5.util.Timeout;
50  
51  /**
52   * @since 5.0
53   */
54  @Contract(threading = ThreadingBehavior.SAFE)
55  public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
56  
57      private final ConcurrentMap<T, PoolEntry> sessionPool;
58      private final AtomicBoolean closed;
59  
60      public AbstractIOSessionPool() {
61          super();
62          this.sessionPool = new ConcurrentHashMap<>();
63          this.closed = new AtomicBoolean(false);
64      }
65  
66      protected abstract Future<IOSession> connectSession(
67              T namedEndpoint,
68              Timeout connectTimeout,
69              FutureCallback<IOSession> callback);
70  
71      protected abstract void validateSession(
72              IOSession ioSession,
73              Callback<Boolean> callback);
74  
75      protected abstract void closeSession(
76              IOSession ioSession,
77              CloseMode closeMode);
78  
79      @Override
80      public final void close(final CloseMode closeMode) {
81          if (closed.compareAndSet(false, true)) {
82              for (final PoolEntry poolEntry : sessionPool.values()) {
83                  synchronized (poolEntry) {
84                      if (poolEntry.session != null) {
85                          closeSession(poolEntry.session, closeMode);
86                          poolEntry.session = null;
87                      }
88                      if (poolEntry.sessionFuture != null) {
89                          poolEntry.sessionFuture.cancel(true);
90                          poolEntry.sessionFuture = null;
91                      }
92                      for (;;) {
93                          final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
94                          if (callback != null) {
95                              callback.cancelled();
96                          } else {
97                              break;
98                          }
99                      }
100                 }
101             }
102             sessionPool.clear();
103         }
104     }
105 
106     @Override
107     public final void close() {
108         close(CloseMode.GRACEFUL);
109     }
110 
111     PoolEntry getPoolEntry(final T endpoint) {
112         PoolEntry poolEntry = sessionPool.get(endpoint);
113         if (poolEntry == null) {
114             final PoolEntry newPoolEntry = new PoolEntry();
115             poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
116             if (poolEntry == null) {
117                 poolEntry = newPoolEntry;
118             }
119         }
120         return poolEntry;
121     }
122 
123     public final Future<IOSession> getSession(
124             final T endpoint,
125             final Timeout connectTimeout,
126             final FutureCallback<IOSession> callback) {
127         Args.notNull(endpoint, "Endpoint");
128         Asserts.check(!closed.get(), "Connection pool shut down");
129         final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
130         final PoolEntry poolEntry = getPoolEntry(endpoint);
131         getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
132 
133             @Override
134             public void completed(final IOSession ioSession) {
135                 validateSession(ioSession, new Callback<Boolean>() {
136 
137                     @Override
138                     public void execute(final Boolean result) {
139                         if (result) {
140                             future.completed(ioSession);
141                         } else {
142                             getSessionInternal(poolEntry, true, endpoint, connectTimeout,
143                                 new FutureCallback<IOSession>() {
144 
145                                 @Override
146                                 public void completed(final IOSession ioSession) {
147                                     future.completed(ioSession);
148                                 }
149 
150                                 @Override
151                                 public void failed(final Exception ex) {
152                                     future.failed(ex);
153                                 }
154 
155                                 @Override
156                                 public void cancelled() {
157                                     future.cancel();
158                                 }
159 
160                             });
161                         }
162                     }
163 
164                 });
165             }
166 
167             @Override
168             public void failed(final Exception ex) {
169                 future.failed(ex);
170             }
171 
172             @Override
173             public void cancelled() {
174                 future.cancel();
175             }
176 
177         });
178         return future;
179     }
180 
181     private void getSessionInternal(
182             final PoolEntry poolEntry,
183             final boolean requestNew,
184             final T namedEndpoint,
185             final Timeout connectTimeout,
186             final FutureCallback<IOSession> callback) {
187         synchronized (poolEntry) {
188             if (poolEntry.session != null && requestNew) {
189                 closeSession(poolEntry.session, CloseMode.GRACEFUL);
190                 poolEntry.session = null;
191             }
192             if (poolEntry.session != null && !poolEntry.session.isOpen()) {
193                 poolEntry.session = null;
194             }
195             if (poolEntry.session != null) {
196                 callback.completed(poolEntry.session);
197             } else {
198                 poolEntry.requestQueue.add(callback);
199                 if (poolEntry.sessionFuture == null) {
200                     poolEntry.sessionFuture = connectSession(
201                             namedEndpoint,
202                             connectTimeout,
203                             new FutureCallback<IOSession>() {
204 
205                                 @Override
206                                 public void completed(final IOSession result) {
207                                     synchronized (poolEntry) {
208                                         poolEntry.session = result;
209                                         poolEntry.sessionFuture = null;
210                                         for (;;) {
211                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
212                                             if (callback != null) {
213                                                 callback.completed(result);
214                                             } else {
215                                                 break;
216                                             }
217                                         }
218                                     }
219                                 }
220 
221                                 @Override
222                                 public void failed(final Exception ex) {
223                                     synchronized (poolEntry) {
224                                         poolEntry.session = null;
225                                         poolEntry.sessionFuture = null;
226                                         for (;;) {
227                                             final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
228                                             if (callback != null) {
229                                                 callback.failed(ex);
230                                             } else {
231                                                 break;
232                                             }
233                                         }
234                                     }
235                                 }
236 
237                                 @Override
238                                 public void cancelled() {
239                                     failed(new ConnectionClosedException("Connection request cancelled"));
240                                 }
241 
242                             });
243                 }
244             }
245         }
246     }
247 
248     public final void enumAvailable(final Callback<IOSession> callback) {
249         for (final PoolEntry poolEntry: sessionPool.values()) {
250             if (poolEntry.session != null) {
251                 synchronized (poolEntry) {
252                     if (poolEntry.session != null) {
253                         callback.execute(poolEntry.session);
254                         if (!poolEntry.session.isOpen()) {
255                             poolEntry.session = null;
256                         }
257                     }
258                 }
259             }
260         }
261     }
262 
263     public final void closeIdle(final TimeValue idleTime) {
264         final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
265         for (final PoolEntry poolEntry: sessionPool.values()) {
266             if (poolEntry.session != null) {
267                 synchronized (poolEntry) {
268                     if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
269                         closeSession(poolEntry.session, CloseMode.GRACEFUL);
270                         poolEntry.session = null;
271                     }
272                 }
273             }
274         }
275     }
276 
277     public final Set<T> getRoutes() {
278         return new HashSet<>(sessionPool.keySet());
279     }
280 
281     @Override
282     public String toString() {
283         final StringBuilder buffer = new StringBuilder();
284         buffer.append("I/O sessions: ");
285         buffer.append(sessionPool.size());
286         return buffer.toString();
287     }
288 
289     static class PoolEntry {
290 
291         final Queue<FutureCallback<IOSession>> requestQueue;
292         volatile Future<IOSession> sessionFuture;
293         volatile IOSession session;
294 
295         PoolEntry() {
296             this.requestQueue = new ArrayDeque<>();
297         }
298 
299     }
300 
301 }