1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.client5.http.impl.async;
29
30 import java.io.InterruptedIOException;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import org.apache.hc.client5.http.HttpRoute;
34 import org.apache.hc.client5.http.async.AsyncExecRuntime;
35 import org.apache.hc.client5.http.config.RequestConfig;
36 import org.apache.hc.client5.http.impl.ConnPoolSupport;
37 import org.apache.hc.client5.http.impl.Operations;
38 import org.apache.hc.client5.http.protocol.HttpClientContext;
39 import org.apache.hc.core5.concurrent.Cancellable;
40 import org.apache.hc.core5.concurrent.ComplexCancellable;
41 import org.apache.hc.core5.concurrent.FutureCallback;
42 import org.apache.hc.core5.http.HttpHost;
43 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
44 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45 import org.apache.hc.core5.http.nio.HandlerFactory;
46 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
47 import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
48 import org.apache.hc.core5.io.CloseMode;
49 import org.apache.hc.core5.reactor.Command;
50 import org.apache.hc.core5.reactor.IOSession;
51 import org.apache.hc.core5.util.Identifiable;
52 import org.apache.hc.core5.util.TimeValue;
53 import org.apache.hc.core5.util.Timeout;
54 import org.slf4j.Logger;
55
56 class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
57
58 private final Logger log;
59 private final H2ConnPool connPool;
60 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61 private final AtomicReference<Endpoint> sessionRef;
62 private volatile boolean reusable;
63
64 InternalH2AsyncExecRuntime(
65 final Logger log,
66 final H2ConnPool connPool,
67 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
68 super();
69 this.log = log;
70 this.connPool = connPool;
71 this.pushHandlerFactory = pushHandlerFactory;
72 this.sessionRef = new AtomicReference<>();
73 }
74
75 @Override
76 public boolean isEndpointAcquired() {
77 return sessionRef.get() != null;
78 }
79
80 @Override
81 public Cancellable acquireEndpoint(
82 final String id,
83 final HttpRoute route,
84 final Object object,
85 final HttpClientContext context,
86 final FutureCallback<AsyncExecRuntime> callback) {
87 if (sessionRef.get() == null) {
88 final HttpHost target = route.getTargetHost();
89 final RequestConfig requestConfig = context.getRequestConfig();
90 final Timeout connectTimeout = requestConfig.getConnectTimeout();
91 if (log.isDebugEnabled()) {
92 log.debug("{} acquiring endpoint ({})", id, connectTimeout);
93 }
94 return Operations.cancellable(connPool.getSession(
95 target,
96 connectTimeout,
97 new FutureCallback<IOSession>() {
98
99 @Override
100 public void completed(final IOSession ioSession) {
101 sessionRef.set(new Endpoint(target, ioSession));
102 reusable = true;
103 if (log.isDebugEnabled()) {
104 log.debug("{} acquired endpoint", id);
105 }
106 callback.completed(InternalH2AsyncExecRuntime.this);
107 }
108
109 @Override
110 public void failed(final Exception ex) {
111 callback.failed(ex);
112 }
113
114 @Override
115 public void cancelled() {
116 callback.cancelled();
117 }
118
119 }));
120 }
121 callback.completed(this);
122 return Operations.nonCancellable();
123 }
124
125 private void closeEndpoint(final Endpoint endpoint) {
126 endpoint.session.close(CloseMode.GRACEFUL);
127 if (log.isDebugEnabled()) {
128 log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
129 }
130 }
131
132 @Override
133 public void releaseEndpoint() {
134 final Endpoint endpoint = sessionRef.getAndSet(null);
135 if (endpoint != null && !reusable) {
136 closeEndpoint(endpoint);
137 }
138 }
139
140 @Override
141 public void discardEndpoint() {
142 final Endpoint endpoint = sessionRef.getAndSet(null);
143 if (endpoint != null) {
144 closeEndpoint(endpoint);
145 }
146 }
147
148 @Override
149 public boolean validateConnection() {
150 if (reusable) {
151 final Endpoint endpoint = sessionRef.get();
152 return endpoint != null && endpoint.session.isOpen();
153 }
154 final Endpoint endpoint = sessionRef.getAndSet(null);
155 if (endpoint != null) {
156 closeEndpoint(endpoint);
157 }
158 return false;
159 }
160
161 @Override
162 public boolean isEndpointConnected() {
163 final Endpoint endpoint = sessionRef.get();
164 return endpoint != null && endpoint.session.isOpen();
165 }
166
167
168 Endpoint ensureValid() {
169 final Endpoint endpoint = sessionRef.get();
170 if (endpoint == null) {
171 throw new IllegalStateException("I/O session not acquired / already released");
172 }
173 return endpoint;
174 }
175
176 @Override
177 public Cancellable connectEndpoint(
178 final HttpClientContext context,
179 final FutureCallback<AsyncExecRuntime> callback) {
180 final Endpoint endpoint = ensureValid();
181 if (endpoint.session.isOpen()) {
182 callback.completed(this);
183 return Operations.nonCancellable();
184 }
185 final HttpHost target = endpoint.target;
186 final RequestConfig requestConfig = context.getRequestConfig();
187 final Timeout connectTimeout = requestConfig.getConnectTimeout();
188 if (log.isDebugEnabled()) {
189 log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
190 }
191 return Operations.cancellable(connPool.getSession(target, connectTimeout,
192 new FutureCallback<IOSession>() {
193
194 @Override
195 public void completed(final IOSession ioSession) {
196 sessionRef.set(new Endpoint(target, ioSession));
197 reusable = true;
198 if (log.isDebugEnabled()) {
199 log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
200 }
201 callback.completed(InternalH2AsyncExecRuntime.this);
202 }
203
204 @Override
205 public void failed(final Exception ex) {
206 callback.failed(ex);
207 }
208
209 @Override
210 public void cancelled() {
211 callback.cancelled();
212 }
213
214 }));
215
216 }
217
218 @Override
219 public void upgradeTls(final HttpClientContext context) {
220 throw new UnsupportedOperationException();
221 }
222
223 @Override
224 public Cancellable execute(
225 final String id,
226 final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
227 final ComplexCancellable complexCancellable = new ComplexCancellable();
228 final Endpoint endpoint = ensureValid();
229 final IOSession session = endpoint.session;
230 if (session.isOpen()) {
231 if (log.isDebugEnabled()) {
232 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
233 }
234 session.enqueue(
235 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
236 Command.Priority.NORMAL);
237 } else {
238 final HttpHost target = endpoint.target;
239 final RequestConfig requestConfig = context.getRequestConfig();
240 final Timeout connectTimeout = requestConfig.getConnectTimeout();
241 connPool.getSession(target, connectTimeout, new FutureCallback<IOSession>() {
242
243 @Override
244 public void completed(final IOSession ioSession) {
245 sessionRef.set(new Endpoint(target, ioSession));
246 reusable = true;
247 if (log.isDebugEnabled()) {
248 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
249 }
250 session.enqueue(
251 new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
252 Command.Priority.NORMAL);
253 }
254
255 @Override
256 public void failed(final Exception ex) {
257 exchangeHandler.failed(ex);
258 }
259
260 @Override
261 public void cancelled() {
262 exchangeHandler.failed(new InterruptedIOException());
263 }
264
265 });
266 }
267 return complexCancellable;
268 }
269
270 @Override
271 public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
272 throw new UnsupportedOperationException();
273 }
274
275 @Override
276 public void markConnectionNonReusable() {
277 reusable = false;
278 }
279
280 static class Endpoint implements Identifiable {
281
282 final HttpHost target;
283 final IOSession session;
284
285 Endpoint(final HttpHost target, final IOSession session) {
286 this.target = target;
287 this.session = session;
288 }
289
290 @Override
291 public String getId() {
292 return session.getId();
293 }
294
295 }
296
297 @Override
298 public AsyncExecRuntime fork() {
299 return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
300 }
301
302 }