1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.io;
29
30 import java.io.IOException;
31 import java.util.Date;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicReference;
36
37 import org.apache.hc.client5.http.DnsResolver;
38 import org.apache.hc.client5.http.HttpRoute;
39 import org.apache.hc.client5.http.SchemePortResolver;
40 import org.apache.hc.client5.http.impl.ConnectionShutdownException;
41 import org.apache.hc.client5.http.io.ConnectionEndpoint;
42 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
43 import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
44 import org.apache.hc.client5.http.io.LeaseRequest;
45 import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
46 import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
47 import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
48 import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
49 import org.apache.hc.core5.annotation.Contract;
50 import org.apache.hc.core5.annotation.ThreadingBehavior;
51 import org.apache.hc.core5.http.ClassicHttpRequest;
52 import org.apache.hc.core5.http.ClassicHttpResponse;
53 import org.apache.hc.core5.http.HttpException;
54 import org.apache.hc.core5.http.HttpHost;
55 import org.apache.hc.core5.http.URIScheme;
56 import org.apache.hc.core5.http.config.Lookup;
57 import org.apache.hc.core5.http.config.Registry;
58 import org.apache.hc.core5.http.config.RegistryBuilder;
59 import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
60 import org.apache.hc.core5.http.io.HttpConnectionFactory;
61 import org.apache.hc.core5.http.io.SocketConfig;
62 import org.apache.hc.core5.http.protocol.HttpContext;
63 import org.apache.hc.core5.io.CloseMode;
64 import org.apache.hc.core5.util.Args;
65 import org.apache.hc.core5.util.Asserts;
66 import org.apache.hc.core5.util.LangUtils;
67 import org.apache.hc.core5.util.TimeValue;
68 import org.apache.hc.core5.util.Timeout;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @Contract(threading = ThreadingBehavior.SAFE)
91 public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
92
93 private static final Logger LOG = LoggerFactory.getLogger(BasicHttpClientConnectionManager.class);
94
95 private final HttpClientConnectionOperator connectionOperator;
96 private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
97
98 private ManagedHttpClientConnection conn;
99 private HttpRoute route;
100 private Object state;
101 private long updated;
102 private long expiry;
103 private boolean leased;
104 private SocketConfig socketConfig;
105
106 private final AtomicBoolean closed;
107
108 private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
109 return RegistryBuilder.<ConnectionSocketFactory>create()
110 .register(URIScheme.HTTP.id, PlainConnectionSocketFactory.getSocketFactory())
111 .register(URIScheme.HTTPS.id, SSLConnectionSocketFactory.getSocketFactory())
112 .build();
113 }
114
115 public BasicHttpClientConnectionManager(
116 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
117 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory,
118 final SchemePortResolver schemePortResolver,
119 final DnsResolver dnsResolver) {
120 this(new DefaultHttpClientConnectionOperator(
121 socketFactoryRegistry, schemePortResolver, dnsResolver), connFactory);
122 }
123
124
125
126
127 public BasicHttpClientConnectionManager(
128 final HttpClientConnectionOperator httpClientConnectionOperator,
129 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
130 super();
131 this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
132 this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
133 this.expiry = Long.MAX_VALUE;
134 this.socketConfig = SocketConfig.DEFAULT;
135 this.closed = new AtomicBoolean(false);
136 }
137
138 public BasicHttpClientConnectionManager(
139 final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
140 final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
141 this(socketFactoryRegistry, connFactory, null, null);
142 }
143
144 public BasicHttpClientConnectionManager(
145 final Lookup<ConnectionSocketFactory> socketFactoryRegistry) {
146 this(socketFactoryRegistry, null, null, null);
147 }
148
149 public BasicHttpClientConnectionManager() {
150 this(getDefaultRegistry(), null, null, null);
151 }
152
153 @Override
154 public void close() {
155 close(CloseMode.GRACEFUL);
156 }
157
158 @Override
159 public void close(final CloseMode closeMode) {
160 if (this.closed.compareAndSet(false, true)) {
161 closeConnection(closeMode);
162 }
163 }
164
165 HttpRoute getRoute() {
166 return route;
167 }
168
169 Object getState() {
170 return state;
171 }
172
173 public synchronized SocketConfig getSocketConfig() {
174 return socketConfig;
175 }
176
177 public synchronized void setSocketConfig(final SocketConfig socketConfig) {
178 this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
179 }
180
181 public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
182 return lease(id, route, Timeout.DISABLED, state);
183 }
184
185 @Override
186 public LeaseRequest lease(final String id, final HttpRoute route, final Timeout requestTimeout, final Object state) {
187 return new LeaseRequest() {
188
189 @Override
190 public ConnectionEndpoint get(
191 final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
192 try {
193 return new InternalConnectionEndpoint(route, getConnection(route, state));
194 } catch (final IOException ex) {
195 throw new ExecutionException(ex.getMessage(), ex);
196 }
197 }
198
199 @Override
200 public boolean cancel() {
201 return false;
202 }
203
204 };
205 }
206
207 private synchronized void closeConnection(final CloseMode closeMode) {
208 if (this.conn != null) {
209 LOG.debug("Closing connection {}", closeMode);
210 this.conn.close(closeMode);
211 this.conn = null;
212 }
213 }
214
215 private void checkExpiry() {
216 if (this.conn != null && System.currentTimeMillis() >= this.expiry) {
217 if (LOG.isDebugEnabled()) {
218 LOG.debug("Connection expired @ {}", new Date(this.expiry));
219 }
220 closeConnection(CloseMode.GRACEFUL);
221 }
222 }
223
224 synchronized ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
225 Asserts.check(!this.closed.get(), "Connection manager has been shut down");
226 if (LOG.isDebugEnabled()) {
227 LOG.debug("Get connection for route {}", route);
228 }
229 Asserts.check(!this.leased, "Connection is still allocated");
230 if (!LangUtils.equals(this.route, route) || !LangUtils.equals(this.state, state)) {
231 closeConnection(CloseMode.GRACEFUL);
232 }
233 this.route = route;
234 this.state = state;
235 checkExpiry();
236 if (this.conn == null) {
237 this.conn = this.connFactory.createConnection(null);
238 } else {
239 this.conn.activate();
240 }
241 this.leased = true;
242 return this.conn;
243 }
244
245 private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
246 if (endpoint instanceof InternalConnectionEndpoint) {
247 return (InternalConnectionEndpoint) endpoint;
248 }
249 throw new IllegalStateException("Unexpected endpoint class: " + endpoint.getClass());
250 }
251
252 @Override
253 public synchronized void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
254 Args.notNull(endpoint, "Managed endpoint");
255 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
256 final ManagedHttpClientConnection conn = internalEndpoint.detach();
257 if (conn != null && LOG.isDebugEnabled()) {
258 LOG.debug("Releasing connection {}", conn);
259 }
260 if (this.closed.get()) {
261 return;
262 }
263 try {
264 if (keepAlive == null) {
265 this.conn.close(CloseMode.GRACEFUL);
266 }
267 this.updated = System.currentTimeMillis();
268 if (!this.conn.isOpen() && !this.conn.isConsistent()) {
269 this.conn = null;
270 this.route = null;
271 this.conn = null;
272 this.expiry = Long.MAX_VALUE;
273 if (LOG.isDebugEnabled()) {
274 LOG.debug("Connection is not kept alive");
275 }
276 } else {
277 this.state = state;
278 conn.passivate();
279 if (TimeValue.isPositive(keepAlive)) {
280 if (LOG.isDebugEnabled()) {
281 LOG.debug("Connection can be kept alive for {}", keepAlive);
282 }
283 this.expiry = this.updated + keepAlive.toMilliseconds();
284 } else {
285 if (LOG.isDebugEnabled()) {
286 LOG.debug("Connection can be kept alive indefinitely");
287 }
288 this.expiry = Long.MAX_VALUE;
289 }
290 }
291 } finally {
292 this.leased = false;
293 }
294 }
295
296 @Override
297 public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) throws IOException {
298 Args.notNull(endpoint, "Endpoint");
299
300 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
301 if (internalEndpoint.isConnected()) {
302 return;
303 }
304 final HttpRoute route = internalEndpoint.getRoute();
305 final HttpHost host;
306 if (route.getProxyHost() != null) {
307 host = route.getProxyHost();
308 } else {
309 host = route.getTargetHost();
310 }
311 this.connectionOperator.connect(
312 internalEndpoint.getConnection(),
313 host,
314 route.getLocalSocketAddress(),
315 connectTimeout,
316 this.socketConfig,
317 context);
318 }
319
320 @Override
321 public void upgrade(
322 final ConnectionEndpoint endpoint,
323 final HttpContext context) throws IOException {
324 Args.notNull(endpoint, "Endpoint");
325 Args.notNull(route, "HTTP route");
326 final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
327 this.connectionOperator.upgrade(
328 internalEndpoint.getConnection(),
329 internalEndpoint.getRoute().getTargetHost(),
330 context);
331 }
332
333 public synchronized void closeExpired() {
334 if (this.closed.get()) {
335 return;
336 }
337 if (!this.leased) {
338 checkExpiry();
339 }
340 }
341
342 public synchronized void closeIdle(final TimeValue idleTime) {
343 Args.notNull(idleTime, "Idle time");
344 if (this.closed.get()) {
345 return;
346 }
347 if (!this.leased) {
348 long time = idleTime.toMilliseconds();
349 if (time < 0) {
350 time = 0;
351 }
352 final long deadline = System.currentTimeMillis() - time;
353 if (this.updated <= deadline) {
354 closeConnection(CloseMode.GRACEFUL);
355 }
356 }
357 }
358
359 class InternalConnectionEndpoint extends ConnectionEndpoint {
360
361 private final HttpRoute route;
362 private final AtomicReference<ManagedHttpClientConnection> connRef;
363
364 public InternalConnectionEndpoint(final HttpRoute route, final ManagedHttpClientConnection conn) {
365 this.route = route;
366 this.connRef = new AtomicReference<>(conn);
367 }
368
369 HttpRoute getRoute() {
370 return route;
371 }
372
373 ManagedHttpClientConnection getConnection() {
374 final ManagedHttpClientConnection conn = this.connRef.get();
375 if (conn == null) {
376 throw new ConnectionShutdownException();
377 }
378 return conn;
379 }
380
381 ManagedHttpClientConnection getValidatedConnection() {
382 final ManagedHttpClientConnection conn = getConnection();
383 Asserts.check(conn.isOpen(), "Endpoint is not connected");
384 return conn;
385 }
386
387 ManagedHttpClientConnection detach() {
388 return this.connRef.getAndSet(null);
389 }
390
391 @Override
392 public boolean isConnected() {
393 final ManagedHttpClientConnection conn = getConnection();
394 return conn != null && conn.isOpen();
395 }
396
397 @Override
398 public void close(final CloseMode closeMode) {
399 final ManagedHttpClientConnection conn = detach();
400 if (conn != null) {
401 conn.close(closeMode);
402 }
403 }
404
405 @Override
406 public void close() throws IOException {
407 final ManagedHttpClientConnection conn = detach();
408 if (conn != null) {
409 conn.close();
410 }
411 }
412
413 @Override
414 public void setSocketTimeout(final Timeout timeout) {
415 getValidatedConnection().setSocketTimeout(timeout);
416 }
417
418 @Override
419 public ClassicHttpResponse execute(
420 final String id,
421 final ClassicHttpRequest request,
422 final HttpRequestExecutor requestExecutor,
423 final HttpContext context) throws IOException, HttpException {
424 Args.notNull(request, "HTTP request");
425 Args.notNull(requestExecutor, "Request executor");
426 return requestExecutor.execute(request, getValidatedConnection(), context);
427 }
428
429 }
430
431 }