1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.io;
29
30 import java.io.IOException;
31 import java.util.Date;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.client5.http.DnsResolver;
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
41 import org.apache.hc.client5.http.io.ConnectionEndpoint;
42 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
43 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
44 import org.apache.hc.client5.http.io.LeaseRequest;
45 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
46 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
47 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
48 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
49 import org.apache.hc.core5.annotation.Contract;
50 import org.apache.hc.core5.annotation.ThreadingBehavior;
51 import org.apache.hc.core5.http.ClassicHttpRequest;
52 import org.apache.hc.core5.http.ClassicHttpResponse;
53 import org.apache.hc.core5.http.HttpException;
54 import org.apache.hc.core5.http.HttpHost;
55 import org.apache.hc.core5.http.URIScheme;
56 import org.apache.hc.core5.http.config.Lookup;
57 import org.apache.hc.core5.http.config.Registry;
58 import org.apache.hc.core5.http.config.RegistryBuilder;
59 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
60 import org.apache.hc.core5.http.io.HttpConnectionFactory;
61 import org.apache.hc.core5.http.io.SocketConfig;
62 import org.apache.hc.core5.http.protocol.HttpContext;
63 import org.apache.hc.core5.io.CloseMode;
64 import org.apache.hc.core5.util.Args;
65 import org.apache.hc.core5.util.Asserts;
66 import org.apache.hc.core5.util.LangUtils;
67 import org.apache.hc.core5.util.TimeValue;
68 import org.apache.hc.core5.util.Timeout;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @Contract(threading = ThreadingBehavior.SAFE)
91 public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
92
93 private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
94
95 private final HttpClientConnectionOperator connectionOperator;
96 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
97
98 private ManagedHttpClientConnection conn;
99 private HttpRoute route;
100 private Object state;
101 private long updated;
102 private long expiry;
103 private boolean leased;
104 private SocketConfig socketConfig;
105
106 private final AtomicBoolean closed;
107
108 private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
109 return RegistryBuilder.<ConnectionSocketFactory>create()
110 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
111 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
112 .build();
113 }
114
115 public BasicHttpClientConnectionManager(
116 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
117 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
118 final SchemePortResolver schemePortResolver,
119 final DnsResolver dnsResolver) {
120 this(new DefaultHttpClientConnectionOperator(
121 socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
122 }
123
124
125
126
127 public BasicHttpClientConnectionManager(
128 final HttpClientConnectionOperator httpClientConnectionOperator,
129 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
130 super();
131 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
132 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
133 this.expiry = Long.MAX_VALUE;
134 this.socketConfig = SocketConfig.DEFAULT;
135 this.closed = new AtomicBoolean(false);
136 }
137
138 public BasicHttpClientConnectionManager(
139 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
140 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
141 this(socketFactoryRegistry, connFactory, null, null);
142 }
143
144 public BasicHttpClientConnectionManager(
145 final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
146 this(socketFactoryRegistry, null, null, null);
147 }
148
149 public BasicHttpClientConnectionManager() {
150 this(getDefaultRegistry(), null, null, null);
151 }
152
153 @Override
154 public void close() {
155 close(CloseMode.GRACEFUL);
156 }
157
158 @Override
159 public void close(final CloseMode closeMode) {
160 if (this.closed.compareAndSet(false, true)) {
161 closeConnection(closeMode);
162 }
163 }
164
165 HttpRoute getRoute() {
166 return route;
167 }
168
169 Object getState() {
170 return state;
171 }
172
173 public synchronized SocketConfig getSocketConfig() {
174 return socketConfig;
175 }
176
177 public synchronized void setSocketConfig(final SocketConfig socketConfig) {
178 this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
179 }
180
181 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
182 return lease(id, route, Timeout.DISABLED, state);
183 }
184
185 @Override
186 public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
187 return new LeaseRequest() {
188
189 @Override
190 public ConnectionEndpoint get(
191 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
192 try {
193 return new InternalConnectionEndpoint(route, getConnection(route, state));
194 } catch (final IOException ex) {
195 throw new ExecutionException(ex.getMessage(), ex);
196 }
197 }
198
199 @Override
200 public boolean cancel() {
201 return false;
202 }
203
204 };
205 }
206
207 private synchronized void closeConnection(final CloseMode closeMode) {
208 if (this.conn != null) {
209 LOG.debug("Closing connection {}", closeMode);
210 this.conn.close(closeMode);
211 this.conn = null;
212 }
213 }
214
215 private void checkExpiry() {
216 if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
217 if (LOG.isDebugEnabled()) {
218 LOG.debug("Connection expired @ {}", new Date(this.expiry));
219 }
220 closeConnection(CloseMode.GRACEFUL);
221 }
222 }
223
224 synchronized ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
225 Asserts.check(!this.closed.get(), "Connection manager has been shut down");
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("Get connection for route {}", route);
228 }
229 Asserts.check(!this.leased, "Connection is still allocated");
230 if (!LangUtils.equals(this.route, route) || !LangUtils.equals(this.state, state)) {
231 closeConnection(CloseMode.GRACEFUL);
232 }
233 this.route = route;
234 this.state = state;
235 checkExpiry();
236 if (this.conn == null) {
237 this.conn = this.connFactory.createConnection(null);
238 } else {
239 this.conn.activate();
240 }
241 this.leased = true;
242 return this.conn;
243 }
244
245 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
246 if (endpoint instanceof InternalConnectionEndpoint) {
247 return (InternalConnectionEndpoint) endpoint;
248 }
249 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
250 }
251
252 @Override
253 public synchronized void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
254 Args.notNull(endpoint, "Managed endpoint");
255 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
256 final ManagedHttpClientConnection conn = internalEndpoint.detach();
257 if (conn != null && LOG.isDebugEnabled()) {
258 LOG.debug("Releasing connection {}", conn);
259 }
260 if (this.closed.get()) {
261 return;
262 }
263 try {
264 if (keepAlive == null) {
265 this.conn.close(CloseMode.GRACEFUL);
266 }
267 this.updated = System.currentTimeMillis();
268 if (!this.conn.isOpen() && !this.conn.isConsistent()) {
269 this.conn = null;
270 this.route = null;
271 this.conn = null;
272 this.expiry = Long.MAX_VALUE;
273 if (LOG.isDebugEnabled()) {
274 LOG.debug("Connection is not kept alive");
275 }
276 } else {
277 this.state = state;
278 if (conn != null) {
279 conn.passivate();
280 }
281 if (TimeValue.isPositive(keepAlive)) {
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("Connection can be kept alive for {}", keepAlive);
284 }
285 this.expiry = this.updated + keepAlive.toMilliseconds();
286 } else {
287 if (LOG.isDebugEnabled()) {
288 LOG.debug("Connection can be kept alive indefinitely");
289 }
290 this.expiry = Long.MAX_VALUE;
291 }
292 }
293 } finally {
294 this.leased = false;
295 }
296 }
297
298 @Override
299 public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) throws IOException {
300 Args.notNull(endpoint, "Endpoint");
301
302 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
303 if (internalEndpoint.isConnected()) {
304 return;
305 }
306 final HttpRoute route = internalEndpoint.getRoute();
307 final HttpHost host;
308 if (route.getProxyHost() != null) {
309 host = route.getProxyHost();
310 } else {
311 host = route.getTargetHost();
312 }
313 this.connectionOperator.connect(
314 internalEndpoint.getConnection(),
315 host,
316 route.getLocalSocketAddress(),
317 connectTimeout,
318 this.socketConfig,
319 context);
320 }
321
322 @Override
323 public void upgrade(
324 final ConnectionEndpoint endpoint,
325 final HttpContext context) throws IOException {
326 Args.notNull(endpoint, "Endpoint");
327 Args.notNull(route, "HTTP route");
328 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
329 this.connectionOperator.upgrade(
330 internalEndpoint.getConnection(),
331 internalEndpoint.getRoute().getTargetHost(),
332 context);
333 }
334
335 public synchronized void closeExpired() {
336 if (this.closed.get()) {
337 return;
338 }
339 if (!this.leased) {
340 checkExpiry();
341 }
342 }
343
344 public synchronized void closeIdle(final TimeValue idleTime) {
345 Args.notNull(idleTime, "Idle time");
346 if (this.closed.get()) {
347 return;
348 }
349 if (!this.leased) {
350 long time = idleTime.toMilliseconds();
351 if (time < 0) {
352 time = 0;
353 }
354 final long deadline = System.currentTimeMillis() - time;
355 if (this.updated <= deadline) {
356 closeConnection(CloseMode.GRACEFUL);
357 }
358 }
359 }
360
361 class InternalConnectionEndpoint extends ConnectionEndpoint {
362
363 private final HttpRoute route;
364 private final AtomicReference<ManagedHttpClientConnection> connRef;
365
366 public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
367 this.route = route;
368 this.connRef = new AtomicReference<>(conn);
369 }
370
371 HttpRoute getRoute() {
372 return route;
373 }
374
375 ManagedHttpClientConnection getConnection() {
376 final ManagedHttpClientConnection conn = this.connRef.get();
377 if (conn == null) {
378 throw new ConnectionShutdownException();
379 }
380 return conn;
381 }
382
383 ManagedHttpClientConnection getValidatedConnection() {
384 final ManagedHttpClientConnection conn = getConnection();
385 Asserts.check(conn.isOpen(), "Endpoint is not connected");
386 return conn;
387 }
388
389 ManagedHttpClientConnection detach() {
390 return this.connRef.getAndSet(null);
391 }
392
393 @Override
394 public boolean isConnected() {
395 final ManagedHttpClientConnection conn = getConnection();
396 return conn != null && conn.isOpen();
397 }
398
399 @Override
400 public void close(final CloseMode closeMode) {
401 final ManagedHttpClientConnection conn = detach();
402 if (conn != null) {
403 conn.close(closeMode);
404 }
405 }
406
407 @Override
408 public void close() throws IOException {
409 final ManagedHttpClientConnection conn = detach();
410 if (conn != null) {
411 conn.close();
412 }
413 }
414
415 @Override
416 public void setSocketTimeout(final Timeout timeout) {
417 getValidatedConnection().setSocketTimeout(timeout);
418 }
419
420 @Override
421 public ClassicHttpResponse execute(
422 final String id,
423 final ClassicHttpRequest request,
424 final HttpRequestExecutor requestExecutor,
425 final HttpContext context) throws IOException, HttpException {
426 Args.notNull(request, "HTTP request");
427 Args.notNull(requestExecutor, "Request executor");
428 return requestExecutor.execute(request, getValidatedConnection(), context);
429 }
430
431 }
432
433 }