1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactor;
28
29 import java.util.ArrayDeque;
30 import java.util.HashSet;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import org.apache.hc.core5.annotation.Contract;
39 import org.apache.hc.core5.annotation.ThreadingBehavior;
40 import org.apache.hc.core5.concurrent.ComplexFuture;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.concurrent.FutureContribution;
43 import org.apache.hc.core5.function.Callback;
44 import org.apache.hc.core5.http.ConnectionClosedException;
45 import org.apache.hc.core5.io.CloseMode;
46 import org.apache.hc.core5.io.ModalCloseable;
47 import org.apache.hc.core5.util.Args;
48 import org.apache.hc.core5.util.Asserts;
49 import org.apache.hc.core5.util.TimeValue;
50 import org.apache.hc.core5.util.Timeout;
51
52
53
54
55 @Contract(threading = ThreadingBehavior.SAFE)
56 public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
57
58 private final ConcurrentMap<T, PoolEntry> sessionPool;
59 private final AtomicBoolean closed;
60
61 public AbstractIOSessionPool() {
62 super();
63 this.sessionPool = new ConcurrentHashMap<>();
64 this.closed = new AtomicBoolean(false);
65 }
66
67 protected abstract Future<IOSession> connectSession(
68 T namedEndpoint,
69 Timeout connectTimeout,
70 FutureCallback<IOSession> callback);
71
72 protected abstract void validateSession(
73 IOSession ioSession,
74 Callback<Boolean> callback);
75
76 protected abstract void closeSession(
77 IOSession ioSession,
78 CloseMode closeMode);
79
80 @Override
81 public final void close(final CloseMode closeMode) {
82 if (closed.compareAndSet(false, true)) {
83 for (final PoolEntry poolEntry : sessionPool.values()) {
84 synchronized (poolEntry) {
85 if (poolEntry.session != null) {
86 closeSession(poolEntry.session, closeMode);
87 poolEntry.session = null;
88 }
89 if (poolEntry.sessionFuture != null) {
90 poolEntry.sessionFuture.cancel(true);
91 poolEntry.sessionFuture = null;
92 }
93 for (;;) {
94 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
95 if (callback != null) {
96 callback.cancelled();
97 } else {
98 break;
99 }
100 }
101 }
102 }
103 sessionPool.clear();
104 }
105 }
106
107 @Override
108 public final void close() {
109 close(CloseMode.GRACEFUL);
110 }
111
112 PoolEntry getPoolEntry(final T endpoint) {
113 PoolEntry poolEntry = sessionPool.get(endpoint);
114 if (poolEntry == null) {
115 final PoolEntry newPoolEntry = new PoolEntry();
116 poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
117 if (poolEntry == null) {
118 poolEntry = newPoolEntry;
119 }
120 }
121 return poolEntry;
122 }
123
124 public final Future<IOSession> getSession(
125 final T endpoint,
126 final Timeout connectTimeout,
127 final FutureCallback<IOSession> callback) {
128 Args.notNull(endpoint, "Endpoint");
129 Asserts.check(!closed.get(), "Connection pool shut down");
130 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
131 final PoolEntry poolEntry = getPoolEntry(endpoint);
132 getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
133
134 @Override
135 public void completed(final IOSession ioSession) {
136 validateSession(ioSession, result -> {
137 if (result) {
138 future.completed(ioSession);
139 } else {
140 getSessionInternal(poolEntry, true, endpoint, connectTimeout,
141 new FutureContribution<IOSession>(future) {
142
143 @Override
144 public void completed(final IOSession ioSession1) {
145 future.completed(ioSession1);
146 }
147
148 });
149 }
150 });
151 }
152
153 @Override
154 public void failed(final Exception ex) {
155 future.failed(ex);
156 }
157
158 @Override
159 public void cancelled() {
160 future.cancel();
161 }
162
163 });
164 return future;
165 }
166
167 private void getSessionInternal(
168 final PoolEntry poolEntry,
169 final boolean requestNew,
170 final T namedEndpoint,
171 final Timeout connectTimeout,
172 final FutureCallback<IOSession> callback) {
173 synchronized (poolEntry) {
174 if (poolEntry.session != null && requestNew) {
175 closeSession(poolEntry.session, CloseMode.GRACEFUL);
176 poolEntry.session = null;
177 }
178 if (poolEntry.session != null && !poolEntry.session.isOpen()) {
179 poolEntry.session = null;
180 }
181 if (poolEntry.session != null) {
182 callback.completed(poolEntry.session);
183 } else {
184 poolEntry.requestQueue.add(callback);
185 if (poolEntry.sessionFuture != null && poolEntry.sessionFuture.isDone()) {
186 poolEntry.sessionFuture = null;
187 }
188 if (poolEntry.sessionFuture == null) {
189 poolEntry.sessionFuture = connectSession(
190 namedEndpoint,
191 connectTimeout,
192 new FutureCallback<IOSession>() {
193
194 @Override
195 public void completed(final IOSession result) {
196 synchronized (poolEntry) {
197 poolEntry.session = result;
198 for (;;) {
199 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
200 if (callback != null) {
201 callback.completed(result);
202 } else {
203 break;
204 }
205 }
206 }
207 }
208
209 @Override
210 public void failed(final Exception ex) {
211 synchronized (poolEntry) {
212 poolEntry.session = null;
213 for (;;) {
214 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
215 if (callback != null) {
216 callback.failed(ex);
217 } else {
218 break;
219 }
220 }
221 }
222 }
223
224 @Override
225 public void cancelled() {
226 failed(new ConnectionClosedException("Connection request cancelled"));
227 }
228
229 });
230 }
231 }
232 }
233 }
234
235 public final void enumAvailable(final Callback<IOSession> callback) {
236 for (final PoolEntry poolEntry: sessionPool.values()) {
237 if (poolEntry.session != null) {
238 synchronized (poolEntry) {
239 if (poolEntry.session != null) {
240 callback.execute(poolEntry.session);
241 if (!poolEntry.session.isOpen()) {
242 poolEntry.session = null;
243 }
244 }
245 }
246 }
247 }
248 }
249
250 public final void closeIdle(final TimeValue idleTime) {
251 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
252 for (final PoolEntry poolEntry: sessionPool.values()) {
253 if (poolEntry.session != null) {
254 synchronized (poolEntry) {
255 if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
256 closeSession(poolEntry.session, CloseMode.GRACEFUL);
257 poolEntry.session = null;
258 }
259 }
260 }
261 }
262 }
263
264 public final Set<T> getRoutes() {
265 return new HashSet<>(sessionPool.keySet());
266 }
267
268 @Override
269 public String toString() {
270 final StringBuilder buffer = new StringBuilder();
271 buffer.append("I/O sessions: ");
272 buffer.append(sessionPool.size());
273 return buffer.toString();
274 }
275
276 static class PoolEntry {
277
278 final Queue<FutureCallback<IOSession>> requestQueue;
279 volatile Future<IOSession> sessionFuture;
280 volatile IOSession session;
281
282 PoolEntry() {
283 this.requestQueue = new ArrayDeque<>();
284 }
285
286 }
287
288 }