1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.io;
29
30 import java.io.IOException;
31 import java.util.Date;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicLong;
36 import java.util.concurrent.atomic.AtomicReference;
37
38 import org.apache.hc.client5.http.DnsResolver;
39 import org.apache.hc.client5.http.HttpRoute;
40 import org.apache.hc.client5.http.SchemePortResolver;
41 import org.apache.hc.client5.http.impl.ConnPoolSupport;
42 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
43 import org.apache.hc.client5.http.io.ConnectionEndpoint;
44 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
45 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
46 import org.apache.hc.client5.http.io.LeaseRequest;
47 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
48 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
49 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
50 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
51 import org.apache.hc.core5.annotation.Contract;
52 import org.apache.hc.core5.annotation.ThreadingBehavior;
53 import org.apache.hc.core5.http.ClassicHttpRequest;
54 import org.apache.hc.core5.http.ClassicHttpResponse;
55 import org.apache.hc.core5.http.HttpException;
56 import org.apache.hc.core5.http.HttpHost;
57 import org.apache.hc.core5.http.URIScheme;
58 import org.apache.hc.core5.http.config.Lookup;
59 import org.apache.hc.core5.http.config.Registry;
60 import org.apache.hc.core5.http.config.RegistryBuilder;
61 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
62 import org.apache.hc.core5.http.io.HttpConnectionFactory;
63 import org.apache.hc.core5.http.io.SocketConfig;
64 import org.apache.hc.core5.http.protocol.HttpContext;
65 import org.apache.hc.core5.io.CloseMode;
66 import org.apache.hc.core5.util.Args;
67 import org.apache.hc.core5.util.Asserts;
68 import org.apache.hc.core5.util.LangUtils;
69 import org.apache.hc.core5.util.TimeValue;
70 import org.apache.hc.core5.util.Timeout;
71 import org.slf4j.Logger;
72 import org.slf4j.LoggerFactory;
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 @Contract(threading = ThreadingBehavior.SAFE)
93 public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
94
95 private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
96
97 private static final AtomicLong COUNT = new AtomicLong(0);
98
99 private final HttpClientConnectionOperator connectionOperator;
100 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
101 private final String id;
102
103 private ManagedHttpClientConnection conn;
104 private HttpRoute route;
105 private Object state;
106 private long updated;
107 private long expiry;
108 private boolean leased;
109 private SocketConfig socketConfig;
110
111 private final AtomicBoolean closed;
112
113 private volatile TimeValue validateAfterInactivity;
114
115 private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
116 return RegistryBuilder.<ConnectionSocketFactory>create()
117 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
118 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
119 .build();
120 }
121
122 public BasicHttpClientConnectionManager(
123 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
124 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
125 final SchemePortResolver schemePortResolver,
126 final DnsResolver dnsResolver) {
127 this(new DefaultHttpClientConnectionOperator(
128 socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
129 }
130
131
132
133
134 public BasicHttpClientConnectionManager(
135 final HttpClientConnectionOperator httpClientConnectionOperator,
136 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
137 super();
138 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
139 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
140 this.id = String.format("ep-%010d", COUNT.getAndIncrement());
141 this.expiry = Long.MAX_VALUE;
142 this.socketConfig = SocketConfig.DEFAULT;
143 this.closed = new AtomicBoolean(false);
144 this.validateAfterInactivity = TimeValue.ofSeconds(2L);
145 }
146
147 public BasicHttpClientConnectionManager(
148 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
149 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
150 this(socketFactoryRegistry, connFactory, null, null);
151 }
152
153 public BasicHttpClientConnectionManager(
154 final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
155 this(socketFactoryRegistry, null, null, null);
156 }
157
158 public BasicHttpClientConnectionManager() {
159 this(getDefaultRegistry(), null, null, null);
160 }
161
162 @Override
163 public void close() {
164 close(CloseMode.GRACEFUL);
165 }
166
167 @Override
168 public void close(final CloseMode closeMode) {
169 if (this.closed.compareAndSet(false, true)) {
170 closeConnection(closeMode);
171 }
172 }
173
174 HttpRoute getRoute() {
175 return route;
176 }
177
178 Object getState() {
179 return state;
180 }
181
182 public synchronized SocketConfig getSocketConfig() {
183 return socketConfig;
184 }
185
186 public synchronized void setSocketConfig(final SocketConfig socketConfig) {
187 this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
188 }
189
190 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
191 return lease(id, route, Timeout.DISABLED, state);
192 }
193
194 @Override
195 public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
196 return new LeaseRequest() {
197
198 @Override
199 public ConnectionEndpoint get(
200 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
201 try {
202 return new InternalConnectionEndpoint(route, getConnection(route, state));
203 } catch (final IOException ex) {
204 throw new ExecutionException(ex.getMessage(), ex);
205 }
206 }
207
208 @Override
209 public boolean cancel() {
210 return false;
211 }
212
213 };
214 }
215
216 private synchronized void closeConnection(final CloseMode closeMode) {
217 if (this.conn != null) {
218 if (LOG.isDebugEnabled()) {
219 LOG.debug("{} Closing connection {}", id, closeMode);
220 }
221 this.conn.close(closeMode);
222 this.conn = null;
223 }
224 }
225
226 private void checkExpiry() {
227 if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
228 if (LOG.isDebugEnabled()) {
229 LOG.debug("{} Connection expired @ {}", id, new Date(this.expiry));
230 }
231 closeConnection(CloseMode.GRACEFUL);
232 }
233 }
234
235 private void validate() {
236 final TimeValue validateAfterInactivitySnapshot = validateAfterInactivity;
237 if (this.conn != null
238 && TimeValue.isNonNegative(validateAfterInactivitySnapshot)
239 && updated + validateAfterInactivitySnapshot.toMilliseconds() <= System.currentTimeMillis()) {
240 boolean stale;
241 try {
242 stale = conn.isStale();
243 } catch (final IOException ignore) {
244 stale = true;
245 }
246 if (stale) {
247 if (LOG.isDebugEnabled()) {
248 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
249 }
250 closeConnection(CloseMode.GRACEFUL);
251 }
252 }
253 }
254
255 synchronized ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
256 Asserts.check(!this.closed.get(), "Connection manager has been shut down");
257 if (LOG.isDebugEnabled()) {
258 LOG.debug("{} Get connection for route {}", id, route);
259 }
260 Asserts.check(!this.leased, "Connection is still allocated");
261 if (!LangUtils.equals(this.route, route) || !LangUtils.equals(this.state, state)) {
262 closeConnection(CloseMode.GRACEFUL);
263 }
264 this.route = route;
265 this.state = state;
266 checkExpiry();
267 validate();
268 if (this.conn == null) {
269 this.conn = this.connFactory.createConnection(null);
270 } else {
271 this.conn.activate();
272 }
273 this.leased = true;
274 return this.conn;
275 }
276
277 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
278 if (endpoint instanceof InternalConnectionEndpoint) {
279 return (InternalConnectionEndpoint) endpoint;
280 }
281 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
282 }
283
284 @Override
285 public synchronized void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
286 Args.notNull(endpoint, "Managed endpoint");
287 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
288 final ManagedHttpClientConnection conn = internalEndpoint.detach();
289 if (LOG.isDebugEnabled()) {
290 LOG.debug("{} Releasing connection {}", id, conn);
291 }
292 if (this.closed.get()) {
293 return;
294 }
295 try {
296 if (keepAlive == null) {
297 this.conn.close(CloseMode.GRACEFUL);
298 }
299 this.updated = System.currentTimeMillis();
300 if (!this.conn.isOpen() && !this.conn.isConsistent()) {
301 this.conn = null;
302 this.route = null;
303 this.conn = null;
304 this.expiry = Long.MAX_VALUE;
305 if (LOG.isDebugEnabled()) {
306 LOG.debug("{} Connection is not kept alive", id);
307 }
308 } else {
309 this.state = state;
310 if (conn != null) {
311 conn.passivate();
312 }
313 if (TimeValue.isPositive(keepAlive)) {
314 if (LOG.isDebugEnabled()) {
315 LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
316 }
317 this.expiry = this.updated + keepAlive.toMilliseconds();
318 } else {
319 if (LOG.isDebugEnabled()) {
320 LOG.debug("{} Connection can be kept alive indefinitely", id);
321 }
322 this.expiry = Long.MAX_VALUE;
323 }
324 }
325 } finally {
326 this.leased = false;
327 }
328 }
329
330 @Override
331 public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) throws IOException {
332 Args.notNull(endpoint, "Endpoint");
333
334 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
335 if (internalEndpoint.isConnected()) {
336 return;
337 }
338 final HttpRoute route = internalEndpoint.getRoute();
339 final HttpHost host;
340 if (route.getProxyHost() != null) {
341 host = route.getProxyHost();
342 } else {
343 host = route.getTargetHost();
344 }
345 this.connectionOperator.connect(
346 internalEndpoint.getConnection(),
347 host,
348 route.getLocalSocketAddress(),
349 connectTimeout,
350 this.socketConfig,
351 context);
352 }
353
354 @Override
355 public void upgrade(
356 final ConnectionEndpoint endpoint,
357 final HttpContext context) throws IOException {
358 Args.notNull(endpoint, "Endpoint");
359 Args.notNull(route, "HTTP route");
360 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
361 this.connectionOperator.upgrade(
362 internalEndpoint.getConnection(),
363 internalEndpoint.getRoute().getTargetHost(),
364 context);
365 }
366
367 public synchronized void closeExpired() {
368 if (this.closed.get()) {
369 return;
370 }
371 if (!this.leased) {
372 checkExpiry();
373 }
374 }
375
376 public synchronized void closeIdle(final TimeValue idleTime) {
377 Args.notNull(idleTime, "Idle time");
378 if (this.closed.get()) {
379 return;
380 }
381 if (!this.leased) {
382 long time = idleTime.toMilliseconds();
383 if (time < 0) {
384 time = 0;
385 }
386 final long deadline = System.currentTimeMillis() - time;
387 if (this.updated <= deadline) {
388 closeConnection(CloseMode.GRACEFUL);
389 }
390 }
391 }
392
393
394
395
396
397
398 public TimeValue getValidateAfterInactivity() {
399 return validateAfterInactivity;
400 }
401
402
403
404
405
406
407
408
409
410 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
411 this.validateAfterInactivity = validateAfterInactivity;
412 }
413
414 class InternalConnectionEndpoint extends ConnectionEndpoint {
415
416 private final HttpRoute route;
417 private final AtomicReference<ManagedHttpClientConnection> connRef;
418
419 public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
420 this.route = route;
421 this.connRef = new AtomicReference<>(conn);
422 }
423
424 HttpRoute getRoute() {
425 return route;
426 }
427
428 ManagedHttpClientConnection getConnection() {
429 final ManagedHttpClientConnection conn = this.connRef.get();
430 if (conn == null) {
431 throw new ConnectionShutdownException();
432 }
433 return conn;
434 }
435
436 ManagedHttpClientConnection getValidatedConnection() {
437 final ManagedHttpClientConnection conn = getConnection();
438 Asserts.check(conn.isOpen(), "Endpoint is not connected");
439 return conn;
440 }
441
442 ManagedHttpClientConnection detach() {
443 return this.connRef.getAndSet(null);
444 }
445
446 @Override
447 public boolean isConnected() {
448 final ManagedHttpClientConnection conn = getConnection();
449 return conn != null && conn.isOpen();
450 }
451
452 @Override
453 public void close(final CloseMode closeMode) {
454 final ManagedHttpClientConnection conn = detach();
455 if (conn != null) {
456 conn.close(closeMode);
457 }
458 }
459
460 @Override
461 public void close() throws IOException {
462 final ManagedHttpClientConnection conn = detach();
463 if (conn != null) {
464 conn.close();
465 }
466 }
467
468 @Override
469 public void setSocketTimeout(final Timeout timeout) {
470 getValidatedConnection().setSocketTimeout(timeout);
471 }
472
473 @Override
474 public ClassicHttpResponse execute(
475 final String exchangeId,
476 final ClassicHttpRequest request,
477 final HttpRequestExecutor requestExecutor,
478 final HttpContext context) throws IOException, HttpException {
479 Args.notNull(request, "HTTP request");
480 Args.notNull(requestExecutor, "Request executor");
481 if (LOG.isDebugEnabled()) {
482 LOG.debug("{} Executing exchange {}", id, exchangeId);
483 }
484 return requestExecutor.execute(request, getValidatedConnection(), context);
485 }
486
487 }
488
489 }