1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.io;
29
30 import java.io.IOException;
31 import java.time.Instant;
32 import java.util.Objects;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.TimeoutException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38
39 import org.apache.hc.client5.http.DnsResolver;
40 import org.apache.hc.client5.http.HttpRoute;
41 import org.apache.hc.client5.http.SchemePortResolver;
42 import org.apache.hc.client5.http.config.ConnectionConfig;
43 import org.apache.hc.client5.http.config.TlsConfig;
44 import org.apache.hc.client5.http.impl.ConnPoolSupport;
45 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
46 import org.apache.hc.client5.http.io.ConnectionEndpoint;
47 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
48 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
49 import org.apache.hc.client5.http.io.LeaseRequest;
50 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
51 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
52 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
53 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
54 import org.apache.hc.core5.annotation.Contract;
55 import org.apache.hc.core5.annotation.ThreadingBehavior;
56 import org.apache.hc.core5.http.ClassicHttpRequest;
57 import org.apache.hc.core5.http.ClassicHttpResponse;
58 import org.apache.hc.core5.http.HttpException;
59 import org.apache.hc.core5.http.HttpHost;
60 import org.apache.hc.core5.http.URIScheme;
61 import org.apache.hc.core5.http.config.Lookup;
62 import org.apache.hc.core5.http.config.Registry;
63 import org.apache.hc.core5.http.config.RegistryBuilder;
64 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
65 import org.apache.hc.core5.http.io.HttpConnectionFactory;
66 import org.apache.hc.core5.http.io.SocketConfig;
67 import org.apache.hc.core5.http.protocol.HttpContext;
68 import org.apache.hc.core5.io.CloseMode;
69 import org.apache.hc.core5.util.Args;
70 import org.apache.hc.core5.util.Asserts;
71 import org.apache.hc.core5.util.Deadline;
72 import org.apache.hc.core5.util.TimeValue;
73 import org.apache.hc.core5.util.Timeout;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 @Contract(threading = ThreadingBehavior.SAFE)
96 public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
97
98 private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
99
100 private static final AtomicLong COUNT = new AtomicLong(0);
101
102 private final HttpClientConnectionOperator connectionOperator;
103 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
104 private final String id;
105
106 private ManagedHttpClientConnection conn;
107 private HttpRoute route;
108 private Object state;
109 private long created;
110 private long updated;
111 private long expiry;
112 private boolean leased;
113 private SocketConfig socketConfig;
114 private ConnectionConfig connectionConfig;
115 private TlsConfig tlsConfig;
116
117 private final AtomicBoolean closed;
118
119 private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
120 return RegistryBuilder.<ConnectionSocketFactory>create()
121 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
122 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
123 .build();
124 }
125
126 public BasicHttpClientConnectionManager(
127 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
128 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
129 final SchemePortResolver schemePortResolver,
130 final DnsResolver dnsResolver) {
131 this(new DefaultHttpClientConnectionOperator(
132 socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
133 }
134
135
136
137
138 public BasicHttpClientConnectionManager(
139 final HttpClientConnectionOperator httpClientConnectionOperator,
140 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
141 super();
142 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
143 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
144 this.id = String.format("ep-%010d", COUNT.getAndIncrement());
145 this.expiry = Long.MAX_VALUE;
146 this.socketConfig = SocketConfig.DEFAULT;
147 this.connectionConfig = ConnectionConfig.DEFAULT;
148 this.tlsConfig = TlsConfig.DEFAULT;
149 this.closed = new AtomicBoolean(false);
150 }
151
152 public BasicHttpClientConnectionManager(
153 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
154 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
155 this(socketFactoryRegistry, connFactory, null, null);
156 }
157
158 public BasicHttpClientConnectionManager(
159 final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
160 this(socketFactoryRegistry, null, null, null);
161 }
162
163 public BasicHttpClientConnectionManager() {
164 this(getDefaultRegistry(), null, null, null);
165 }
166
167 @Override
168 public void close() {
169 close(CloseMode.GRACEFUL);
170 }
171
172 @Override
173 public void close(final CloseMode closeMode) {
174 if (this.closed.compareAndSet(false, true)) {
175 closeConnection(closeMode);
176 }
177 }
178
179 HttpRoute getRoute() {
180 return route;
181 }
182
183 Object getState() {
184 return state;
185 }
186
187 public synchronized SocketConfig getSocketConfig() {
188 return socketConfig;
189 }
190
191 public synchronized void setSocketConfig(final SocketConfig socketConfig) {
192 this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
193 }
194
195
196
197
198 public synchronized ConnectionConfig getConnectionConfig() {
199 return connectionConfig;
200 }
201
202
203
204
205 public synchronized void setConnectionConfig(final ConnectionConfig connectionConfig) {
206 this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
207 }
208
209
210
211
212 public synchronized TlsConfig getTlsConfig() {
213 return tlsConfig;
214 }
215
216
217
218
219 public synchronized void setTlsConfig(final TlsConfig tlsConfig) {
220 this.tlsConfig = tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
221 }
222
223 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
224 return lease(id, route, Timeout.DISABLED, state);
225 }
226
227 @Override
228 public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
229 return new LeaseRequest() {
230
231 @Override
232 public ConnectionEndpoint get(
233 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
234 try {
235 return new InternalConnectionEndpoint(route, getConnection(route, state));
236 } catch (final IOException ex) {
237 throw new ExecutionException(ex.getMessage(), ex);
238 }
239 }
240
241 @Override
242 public boolean cancel() {
243 return false;
244 }
245
246 };
247 }
248
249 private synchronized void closeConnection(final CloseMode closeMode) {
250 if (this.conn != null) {
251 if (LOG.isDebugEnabled()) {
252 LOG.debug("{} Closing connection {}", id, closeMode);
253 }
254 this.conn.close(closeMode);
255 this.conn = null;
256 }
257 }
258
259 private void checkExpiry() {
260 if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
261 if (LOG.isDebugEnabled()) {
262 LOG.debug("{} Connection expired @ {}", id, Instant.ofEpochMilli(this.expiry));
263 }
264 closeConnection(CloseMode.GRACEFUL);
265 }
266 }
267
268 private void validate() {
269 if (this.conn != null) {
270 final TimeValue timeToLive = connectionConfig.getTimeToLive();
271 if (TimeValue.isNonNegative(timeToLive)) {
272 final Deadline deadline = Deadline.calculate(created, timeToLive);
273 if (deadline.isExpired()) {
274 closeConnection(CloseMode.GRACEFUL);
275 }
276 }
277 }
278 if (this.conn != null) {
279 final TimeValue timeValue = connectionConfig.getValidateAfterInactivity() != null ?
280 connectionConfig.getValidateAfterInactivity() : TimeValue.ofSeconds(2);
281 if (TimeValue.isNonNegative(timeValue)) {
282 final Deadline deadline = Deadline.calculate(updated, timeValue);
283 if (deadline.isExpired()) {
284 boolean stale;
285 try {
286 stale = conn.isStale();
287 } catch (final IOException ignore) {
288 stale = true;
289 }
290 if (stale) {
291 if (LOG.isDebugEnabled()) {
292 LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(conn));
293 }
294 closeConnection(CloseMode.GRACEFUL);
295 }
296 }
297 }
298 }
299 }
300
301 synchronized ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
302 Asserts.check(!isClosed(), "Connection manager has been shut down");
303 if (LOG.isDebugEnabled()) {
304 LOG.debug("{} Get connection for route {}", id, route);
305 }
306 Asserts.check(!this.leased, "Connection %s is still allocated", conn);
307 if (!Objects.equals(this.route, route) || !Objects.equals(this.state, state)) {
308 closeConnection(CloseMode.GRACEFUL);
309 }
310 this.route = route;
311 this.state = state;
312 checkExpiry();
313 validate();
314 if (this.conn == null) {
315 this.conn = this.connFactory.createConnection(null);
316 this.created = System.currentTimeMillis();
317 } else {
318 this.conn.activate();
319 }
320 this.leased = true;
321 if (LOG.isDebugEnabled()) {
322 LOG.debug("{} Using connection {}", id, conn);
323 }
324 return this.conn;
325 }
326
327 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
328 if (endpoint instanceof InternalConnectionEndpoint) {
329 return (InternalConnectionEndpoint) endpoint;
330 }
331 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
332 }
333
334 @Override
335 public synchronized void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
336 Args.notNull(endpoint, "Managed endpoint");
337 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
338 final ManagedHttpClientConnection conn = internalEndpoint.detach();
339 if (LOG.isDebugEnabled()) {
340 LOG.debug("{} Releasing connection {}", id, conn);
341 }
342 if (isClosed()) {
343 return;
344 }
345 try {
346 if (keepAlive == null) {
347 this.conn.close(CloseMode.GRACEFUL);
348 }
349 this.updated = System.currentTimeMillis();
350 if (!this.conn.isOpen() && !this.conn.isConsistent()) {
351 this.route = null;
352 this.conn = null;
353 this.expiry = Long.MAX_VALUE;
354 if (LOG.isDebugEnabled()) {
355 LOG.debug("{} Connection is not kept alive", id);
356 }
357 } else {
358 this.state = state;
359 if (conn != null) {
360 conn.passivate();
361 }
362 if (TimeValue.isPositive(keepAlive)) {
363 if (LOG.isDebugEnabled()) {
364 LOG.debug("{} Connection can be kept alive for {}", id, keepAlive);
365 }
366 this.expiry = this.updated + keepAlive.toMilliseconds();
367 } else {
368 if (LOG.isDebugEnabled()) {
369 LOG.debug("{} Connection can be kept alive indefinitely", id);
370 }
371 this.expiry = Long.MAX_VALUE;
372 }
373 }
374 } finally {
375 this.leased = false;
376 }
377 }
378
379 @Override
380 public synchronized void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
381 Args.notNull(endpoint, "Endpoint");
382
383 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
384 if (internalEndpoint.isConnected()) {
385 return;
386 }
387 final HttpRoute route = internalEndpoint.getRoute();
388 final HttpHost host;
389 if (route.getProxyHost() != null) {
390 host = route.getProxyHost();
391 } else {
392 host = route.getTargetHost();
393 }
394 final Timeout connectTimeout = timeout != null ? Timeout.of(timeout.getDuration(), timeout.getTimeUnit()) : connectionConfig.getConnectTimeout();
395 final ManagedHttpClientConnection connection = internalEndpoint.getConnection();
396 if (LOG.isDebugEnabled()) {
397 LOG.debug("{} connecting endpoint to {} ({})", ConnPoolSupport.getId(endpoint), host, connectTimeout);
398 }
399 this.connectionOperator.connect(
400 connection,
401 host,
402 route.getLocalSocketAddress(),
403 connectTimeout,
404 socketConfig,
405 tlsConfig,
406 context);
407 if (LOG.isDebugEnabled()) {
408 LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(conn));
409 }
410 final Timeout socketTimeout = connectionConfig.getSocketTimeout();
411 if (socketTimeout != null) {
412 connection.setSocketTimeout(socketTimeout);
413 }
414 }
415
416 @Override
417 public synchronized void upgrade(
418 final ConnectionEndpoint endpoint,
419 final HttpContext context) throws IOException {
420 Args.notNull(endpoint, "Endpoint");
421 Args.notNull(route, "HTTP route");
422 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
423 this.connectionOperator.upgrade(
424 internalEndpoint.getConnection(),
425 internalEndpoint.getRoute().getTargetHost(),
426 tlsConfig,
427 context);
428 }
429
430 public synchronized void closeExpired() {
431 if (isClosed()) {
432 return;
433 }
434 if (!this.leased) {
435 checkExpiry();
436 }
437 }
438
439 public synchronized void closeIdle(final TimeValue idleTime) {
440 Args.notNull(idleTime, "Idle time");
441 if (isClosed()) {
442 return;
443 }
444 if (!this.leased) {
445 long time = idleTime.toMilliseconds();
446 if (time < 0) {
447 time = 0;
448 }
449 final long deadline = System.currentTimeMillis() - time;
450 if (this.updated <= deadline) {
451 closeConnection(CloseMode.GRACEFUL);
452 }
453 }
454 }
455
456
457
458
459
460
461
462
463 @Deprecated
464 public TimeValue getValidateAfterInactivity() {
465 return connectionConfig.getValidateAfterInactivity();
466 }
467
468
469
470
471
472
473
474
475
476
477
478 @Deprecated
479 public void setValidateAfterInactivity(final TimeValue validateAfterInactivity) {
480 this.connectionConfig = ConnectionConfig.custom()
481 .setValidateAfterInactivity(validateAfterInactivity)
482 .build();
483 }
484
485 class InternalConnectionEndpoint extends ConnectionEndpoint {
486
487 private final HttpRoute route;
488 private final AtomicReference<ManagedHttpClientConnection> connRef;
489
490 public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
491 this.route = route;
492 this.connRef = new AtomicReference<>(conn);
493 }
494
495 HttpRoute getRoute() {
496 return route;
497 }
498
499 ManagedHttpClientConnection getConnection() {
500 final ManagedHttpClientConnection conn = this.connRef.get();
501 if (conn == null) {
502 throw new ConnectionShutdownException();
503 }
504 return conn;
505 }
506
507 ManagedHttpClientConnection getValidatedConnection() {
508 final ManagedHttpClientConnection conn = getConnection();
509 Asserts.check(conn.isOpen(), "Endpoint is not connected");
510 return conn;
511 }
512
513 ManagedHttpClientConnection detach() {
514 return this.connRef.getAndSet(null);
515 }
516
517 @Override
518 public boolean isConnected() {
519 final ManagedHttpClientConnection conn = getConnection();
520 return conn != null && conn.isOpen();
521 }
522
523 @Override
524 public void close(final CloseMode closeMode) {
525 final ManagedHttpClientConnection conn = detach();
526 if (conn != null) {
527 conn.close(closeMode);
528 }
529 }
530
531 @Override
532 public void close() throws IOException {
533 final ManagedHttpClientConnection conn = detach();
534 if (conn != null) {
535 conn.close();
536 }
537 }
538
539 @Override
540 public void setSocketTimeout(final Timeout timeout) {
541 getValidatedConnection().setSocketTimeout(timeout);
542 }
543
544 @Override
545 public ClassicHttpResponse execute(
546 final String exchangeId,
547 final ClassicHttpRequest request,
548 final HttpRequestExecutor requestExecutor,
549 final HttpContext context) throws IOException, HttpException {
550 Args.notNull(request, "HTTP request");
551 Args.notNull(requestExecutor, "Request executor");
552 if (LOG.isDebugEnabled()) {
553 LOG.debug("{} Executing exchange {}", id, exchangeId);
554 }
555 return requestExecutor.execute(request, getValidatedConnection(), context);
556 }
557
558 }
559
560
561
562
563
564
565
566
567 boolean isClosed() {
568 return this.closed.get();
569 }
570
571 }