1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactor;
28
29 import java.util.ArrayDeque;
30 import java.util.HashSet;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.concurrent.ComplexFuture;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.function.Callback;
43 import org.apache.hc.core5.http.ConnectionClosedException;
44 import org.apache.hc.core5.io.CloseMode;
45 import org.apache.hc.core5.io.ModalCloseable;
46 import org.apache.hc.core5.util.Args;
47 import org.apache.hc.core5.util.Asserts;
48 import org.apache.hc.core5.util.TimeValue;
49 import org.apache.hc.core5.util.Timeout;
50
51
52
53
54 @Contract(threading = ThreadingBehavior.SAFE)
55 public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
56
57 private final ConcurrentMap<T, PoolEntry> sessionPool;
58 private final AtomicBoolean closed;
59
60 public AbstractIOSessionPool() {
61 super();
62 this.sessionPool = new ConcurrentHashMap<>();
63 this.closed = new AtomicBoolean(false);
64 }
65
66 protected abstract Future<IOSession> connectSession(
67 T namedEndpoint,
68 Timeout connectTimeout,
69 FutureCallback<IOSession> callback);
70
71 protected abstract void validateSession(
72 IOSession ioSession,
73 Callback<Boolean> callback);
74
75 protected abstract void closeSession(
76 IOSession ioSession,
77 CloseMode closeMode);
78
79 @Override
80 public final void close(final CloseMode closeMode) {
81 if (closed.compareAndSet(false, true)) {
82 for (final PoolEntry poolEntry : sessionPool.values()) {
83 synchronized (poolEntry) {
84 if (poolEntry.session != null) {
85 closeSession(poolEntry.session, closeMode);
86 poolEntry.session = null;
87 }
88 if (poolEntry.sessionFuture != null) {
89 poolEntry.sessionFuture.cancel(true);
90 poolEntry.sessionFuture = null;
91 }
92 for (;;) {
93 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
94 if (callback != null) {
95 callback.cancelled();
96 } else {
97 break;
98 }
99 }
100 }
101 }
102 sessionPool.clear();
103 }
104 }
105
106 @Override
107 public final void close() {
108 close(CloseMode.GRACEFUL);
109 }
110
111 PoolEntry getPoolEntry(final T endpoint) {
112 PoolEntry poolEntry = sessionPool.get(endpoint);
113 if (poolEntry == null) {
114 final PoolEntry newPoolEntry = new PoolEntry();
115 poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
116 if (poolEntry == null) {
117 poolEntry = newPoolEntry;
118 }
119 }
120 return poolEntry;
121 }
122
123 public final Future<IOSession> getSession(
124 final T endpoint,
125 final Timeout connectTimeout,
126 final FutureCallback<IOSession> callback) {
127 Args.notNull(endpoint, "Endpoint");
128 Asserts.check(!closed.get(), "Connection pool shut down");
129 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
130 final PoolEntry poolEntry = getPoolEntry(endpoint);
131 getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
132
133 @Override
134 public void completed(final IOSession ioSession) {
135 validateSession(ioSession, new Callback<Boolean>() {
136
137 @Override
138 public void execute(final Boolean result) {
139 if (result) {
140 future.completed(ioSession);
141 } else {
142 getSessionInternal(poolEntry, true, endpoint, connectTimeout,
143 new FutureCallback<IOSession>() {
144
145 @Override
146 public void completed(final IOSession ioSession) {
147 future.completed(ioSession);
148 }
149
150 @Override
151 public void failed(final Exception ex) {
152 future.failed(ex);
153 }
154
155 @Override
156 public void cancelled() {
157 future.cancel();
158 }
159
160 });
161 }
162 }
163
164 });
165 }
166
167 @Override
168 public void failed(final Exception ex) {
169 future.failed(ex);
170 }
171
172 @Override
173 public void cancelled() {
174 future.cancel();
175 }
176
177 });
178 return future;
179 }
180
181 private void getSessionInternal(
182 final PoolEntry poolEntry,
183 final boolean requestNew,
184 final T namedEndpoint,
185 final Timeout connectTimeout,
186 final FutureCallback<IOSession> callback) {
187 synchronized (poolEntry) {
188 if (poolEntry.session != null && requestNew) {
189 closeSession(poolEntry.session, CloseMode.GRACEFUL);
190 poolEntry.session = null;
191 }
192 if (poolEntry.session != null && !poolEntry.session.isOpen()) {
193 poolEntry.session = null;
194 }
195 if (poolEntry.session != null) {
196 callback.completed(poolEntry.session);
197 } else {
198 poolEntry.requestQueue.add(callback);
199 if (poolEntry.sessionFuture == null) {
200 poolEntry.sessionFuture = connectSession(
201 namedEndpoint,
202 connectTimeout,
203 new FutureCallback<IOSession>() {
204
205 @Override
206 public void completed(final IOSession result) {
207 synchronized (poolEntry) {
208 poolEntry.session = result;
209 poolEntry.sessionFuture = null;
210 for (;;) {
211 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
212 if (callback != null) {
213 callback.completed(result);
214 } else {
215 break;
216 }
217 }
218 }
219 }
220
221 @Override
222 public void failed(final Exception ex) {
223 synchronized (poolEntry) {
224 poolEntry.session = null;
225 poolEntry.sessionFuture = null;
226 for (;;) {
227 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
228 if (callback != null) {
229 callback.failed(ex);
230 } else {
231 break;
232 }
233 }
234 }
235 }
236
237 @Override
238 public void cancelled() {
239 failed(new ConnectionClosedException("Connection request cancelled"));
240 }
241
242 });
243 }
244 }
245 }
246 }
247
248 public final void enumAvailable(final Callback<IOSession> callback) {
249 for (final PoolEntry poolEntry: sessionPool.values()) {
250 if (poolEntry.session != null) {
251 synchronized (poolEntry) {
252 if (poolEntry.session != null) {
253 callback.execute(poolEntry.session);
254 if (!poolEntry.session.isOpen()) {
255 poolEntry.session = null;
256 }
257 }
258 }
259 }
260 }
261 }
262
263 public final void closeIdle(final TimeValue idleTime) {
264 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
265 for (final PoolEntry poolEntry: sessionPool.values()) {
266 if (poolEntry.session != null) {
267 synchronized (poolEntry) {
268 if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
269 closeSession(poolEntry.session, CloseMode.GRACEFUL);
270 poolEntry.session = null;
271 }
272 }
273 }
274 }
275 }
276
277 public final Set<T> getRoutes() {
278 return new HashSet<>(sessionPool.keySet());
279 }
280
281 @Override
282 public String toString() {
283 final StringBuilder buffer = new StringBuilder();
284 buffer.append("I/O sessions: ");
285 buffer.append(sessionPool.size());
286 return buffer.toString();
287 }
288
289 static class PoolEntry {
290
291 final Queue<FutureCallback<IOSession>> requestQueue;
292 volatile Future<IOSession> sessionFuture;
293 volatile IOSession session;
294
295 PoolEntry() {
296 this.requestQueue = new ArrayDeque<>();
297 }
298
299 }
300
301 }