1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.reactor;
28
29 import java.util.ArrayDeque;
30 import java.util.HashSet;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.locks.ReentrantLock;
38
39 import org.apache.hc.core5.annotation.Contract;
40 import org.apache.hc.core5.annotation.ThreadingBehavior;
41 import org.apache.hc.core5.concurrent.ComplexFuture;
42 import org.apache.hc.core5.concurrent.FutureCallback;
43 import org.apache.hc.core5.concurrent.FutureContribution;
44 import org.apache.hc.core5.function.Callback;
45 import org.apache.hc.core5.http.ConnectionClosedException;
46 import org.apache.hc.core5.io.CloseMode;
47 import org.apache.hc.core5.io.ModalCloseable;
48 import org.apache.hc.core5.util.Args;
49 import org.apache.hc.core5.util.Asserts;
50 import org.apache.hc.core5.util.TimeValue;
51 import org.apache.hc.core5.util.Timeout;
52
53
54
55
56 @Contract(threading = ThreadingBehavior.SAFE)
57 public abstract class AbstractIOSessionPool<T> implements ModalCloseable {
58
59 private final ConcurrentMap<T, PoolEntry> sessionPool;
60 private final AtomicBoolean closed;
61
62 private final ReentrantLock lock;
63
64 public AbstractIOSessionPool() {
65 super();
66 this.sessionPool = new ConcurrentHashMap<>();
67 this.closed = new AtomicBoolean(false);
68 this.lock = new ReentrantLock();
69 }
70
71 protected abstract Future<IOSession> connectSession(
72 T namedEndpoint,
73 Timeout connectTimeout,
74 FutureCallback<IOSession> callback);
75
76 protected abstract void validateSession(
77 IOSession ioSession,
78 Callback<Boolean> callback);
79
80 protected abstract void closeSession(
81 IOSession ioSession,
82 CloseMode closeMode);
83
84 @Override
85 public final void close(final CloseMode closeMode) {
86 if (closed.compareAndSet(false, true)) {
87 for (final PoolEntry poolEntry : sessionPool.values()) {
88 lock.lock();
89 try {
90 if (poolEntry.session != null) {
91 closeSession(poolEntry.session, closeMode);
92 poolEntry.session = null;
93 }
94 if (poolEntry.sessionFuture != null) {
95 poolEntry.sessionFuture.cancel(true);
96 poolEntry.sessionFuture = null;
97 }
98 for (;;) {
99 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
100 if (callback != null) {
101 callback.cancelled();
102 } else {
103 break;
104 }
105 }
106 } finally {
107 lock.unlock();
108 }
109 }
110 sessionPool.clear();
111 }
112 }
113
114 @Override
115 public final void close() {
116 close(CloseMode.GRACEFUL);
117 }
118
119 PoolEntry getPoolEntry(final T endpoint) {
120 PoolEntry poolEntry = sessionPool.get(endpoint);
121 if (poolEntry == null) {
122 final PoolEntry newPoolEntry = new PoolEntry();
123 poolEntry = sessionPool.putIfAbsent(endpoint, newPoolEntry);
124 if (poolEntry == null) {
125 poolEntry = newPoolEntry;
126 }
127 }
128 return poolEntry;
129 }
130
131 public final Future<IOSession> getSession(
132 final T endpoint,
133 final Timeout connectTimeout,
134 final FutureCallback<IOSession> callback) {
135 Args.notNull(endpoint, "Endpoint");
136 Asserts.check(!closed.get(), "Connection pool shut down");
137 final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
138 final PoolEntry poolEntry = getPoolEntry(endpoint);
139 getSessionInternal(poolEntry, false, endpoint, connectTimeout, new FutureCallback<IOSession>() {
140
141 @Override
142 public void completed(final IOSession ioSession) {
143 validateSession(ioSession, result -> {
144 if (result) {
145 future.completed(ioSession);
146 } else {
147 getSessionInternal(poolEntry, true, endpoint, connectTimeout,
148 new FutureContribution<IOSession>(future) {
149
150 @Override
151 public void completed(final IOSession ioSession1) {
152 future.completed(ioSession1);
153 }
154
155 });
156 }
157 });
158 }
159
160 @Override
161 public void failed(final Exception ex) {
162 future.failed(ex);
163 }
164
165 @Override
166 public void cancelled() {
167 future.cancel();
168 }
169
170 });
171 return future;
172 }
173
174 private void getSessionInternal(
175 final PoolEntry poolEntry,
176 final boolean requestNew,
177 final T namedEndpoint,
178 final Timeout connectTimeout,
179 final FutureCallback<IOSession> callback) {
180 poolEntry.lock.lock();
181 try {
182 if (poolEntry.session != null && requestNew) {
183 closeSession(poolEntry.session, CloseMode.GRACEFUL);
184 poolEntry.session = null;
185 }
186 if (poolEntry.session != null && !poolEntry.session.isOpen()) {
187 poolEntry.session = null;
188 }
189 if (poolEntry.session != null) {
190 callback.completed(poolEntry.session);
191 } else {
192 poolEntry.requestQueue.add(callback);
193 if (poolEntry.sessionFuture != null && poolEntry.completed) {
194 poolEntry.sessionFuture = null;
195 }
196 if (poolEntry.sessionFuture == null) {
197 poolEntry.completed = false;
198 poolEntry.sessionFuture = connectSession(
199 namedEndpoint,
200 connectTimeout,
201 new FutureCallback<IOSession>() {
202
203 @Override
204 public void completed(final IOSession result) {
205 poolEntry.lock.lock();
206 try {
207 poolEntry.completed = true;
208 if (poolEntry.session == null) {
209 poolEntry.session = result;
210 } else {
211 closeSession(result,CloseMode.GRACEFUL);
212 }
213 for (;;) {
214 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
215 if (callback != null) {
216 callback.completed(result);
217 } else {
218 break;
219 }
220 }
221 } finally {
222 poolEntry.lock.unlock();
223 }
224 }
225
226 @Override
227 public void failed(final Exception ex) {
228 poolEntry.lock.lock();
229 try {
230 poolEntry.completed = true;
231 poolEntry.session = null;
232 for (;;) {
233 final FutureCallback<IOSession> callback = poolEntry.requestQueue.poll();
234 if (callback != null) {
235 callback.failed(ex);
236 } else {
237 break;
238 }
239 }
240 } finally {
241 poolEntry.lock.unlock();
242 }
243 }
244
245 @Override
246 public void cancelled() {
247 failed(new ConnectionClosedException("Connection request cancelled"));
248 }
249
250 });
251 }
252 }
253 } finally {
254 poolEntry.lock.unlock();
255 }
256 }
257
258 public final void enumAvailable(final Callback<IOSession> callback) {
259 for (final PoolEntry poolEntry: sessionPool.values()) {
260 if (poolEntry.session != null) {
261 lock.lock();
262 try {
263 if (poolEntry.session != null) {
264 callback.execute(poolEntry.session);
265 if (!poolEntry.session.isOpen()) {
266 poolEntry.session = null;
267 }
268 }
269 } finally {
270 lock.unlock();
271 }
272 }
273 }
274 }
275
276 public final void closeIdle(final TimeValue idleTime) {
277 final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0);
278 for (final PoolEntry poolEntry: sessionPool.values()) {
279 if (poolEntry.session != null) {
280 lock.lock();
281 try {
282 if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) {
283 closeSession(poolEntry.session, CloseMode.GRACEFUL);
284 poolEntry.session = null;
285 }
286 } finally {
287 lock.unlock();
288 }
289 }
290 }
291 }
292
293 public final Set<T> getRoutes() {
294 return new HashSet<>(sessionPool.keySet());
295 }
296
297 @Override
298 public String toString() {
299 final StringBuilder buffer = new StringBuilder();
300 buffer.append("I/O sessions: ");
301 buffer.append(sessionPool.size());
302 return buffer.toString();
303 }
304
305 static class PoolEntry {
306
307 final Queue<FutureCallback<IOSession>> requestQueue;
308 volatile boolean completed;
309 volatile Future<IOSession> sessionFuture;
310 volatile IOSession session;
311 final ReentrantLock lock;
312
313 PoolEntry() {
314 this.requestQueue = new ArrayDeque<>();
315 this.lock = new ReentrantLock();
316 }
317
318 }
319
320 }