View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.client5.http.impl.async;
29  
30  import java.io.InterruptedIOException;
31  import java.util.concurrent.atomic.AtomicReference;
32  
33  import org.apache.hc.client5.http.HttpRoute;
34  import org.apache.hc.client5.http.async.AsyncExecRuntime;
35  import org.apache.hc.client5.http.config.RequestConfig;
36  import org.apache.hc.client5.http.impl.ConnPoolSupport;
37  import org.apache.hc.client5.http.impl.Operations;
38  import org.apache.hc.client5.http.protocol.HttpClientContext;
39  import org.apache.hc.core5.concurrent.Cancellable;
40  import org.apache.hc.core5.concurrent.ComplexCancellable;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.http.HttpHost;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
45  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
46  import org.apache.hc.core5.http.nio.HandlerFactory;
47  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
48  import org.apache.hc.core5.io.CloseMode;
49  import org.apache.hc.core5.reactor.Command;
50  import org.apache.hc.core5.reactor.IOSession;
51  import org.apache.hc.core5.util.Identifiable;
52  import org.apache.hc.core5.util.TimeValue;
53  import org.apache.hc.core5.util.Timeout;
54  import org.slf4j.Logger;
55  
56  class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
57  
58      private final Logger log;
59      private final InternalH2ConnPool connPool;
60      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
61      private final AtomicReference<Endpoint> sessionRef;
62      private volatile boolean reusable;
63  
64      InternalH2AsyncExecRuntime(
65              final Logger log,
66              final InternalH2ConnPool connPool,
67              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
68          super();
69          this.log = log;
70          this.connPool = connPool;
71          this.pushHandlerFactory = pushHandlerFactory;
72          this.sessionRef = new AtomicReference<>();
73      }
74  
75      @Override
76      public boolean isEndpointAcquired() {
77          return sessionRef.get() != null;
78      }
79  
80      @Override
81      public Cancellable acquireEndpoint(
82              final String id,
83              final HttpRoute route,
84              final Object object,
85              final HttpClientContext context,
86              final FutureCallback<AsyncExecRuntime> callback) {
87          if (sessionRef.get() == null) {
88              final HttpHost target = route.getTargetHost();
89              final RequestConfig requestConfig = context.getRequestConfig();
90              @SuppressWarnings("deprecation")
91              final Timeout connectTimeout = requestConfig.getConnectTimeout();
92              if (log.isDebugEnabled()) {
93                  log.debug("{} acquiring endpoint ({})", id, connectTimeout);
94              }
95              return Operations.cancellable(connPool.getSession(target, connectTimeout,
96                      new FutureCallback<IOSession>() {
97  
98                          @Override
99                          public void completed(final IOSession ioSession) {
100                             sessionRef.set(new Endpoint(target, ioSession));
101                             reusable = true;
102                             if (log.isDebugEnabled()) {
103                                 log.debug("{} acquired endpoint", id);
104                             }
105                             callback.completed(InternalH2AsyncExecRuntime.this);
106                         }
107 
108                         @Override
109                         public void failed(final Exception ex) {
110                             callback.failed(ex);
111                         }
112 
113                         @Override
114                         public void cancelled() {
115                             callback.cancelled();
116                         }
117 
118                     }));
119         }
120         callback.completed(this);
121         return Operations.nonCancellable();
122     }
123 
124     private void closeEndpoint(final Endpoint endpoint) {
125         endpoint.session.close(CloseMode.GRACEFUL);
126         if (log.isDebugEnabled()) {
127             log.debug("{} endpoint closed", ConnPoolSupport.getId(endpoint));
128         }
129     }
130 
131     @Override
132     public void releaseEndpoint() {
133         final Endpoint endpoint = sessionRef.getAndSet(null);
134         if (endpoint != null && !reusable) {
135             closeEndpoint(endpoint);
136         }
137     }
138 
139     @Override
140     public void discardEndpoint() {
141         final Endpoint endpoint = sessionRef.getAndSet(null);
142         if (endpoint != null) {
143             closeEndpoint(endpoint);
144         }
145     }
146 
147     @Override
148     public boolean validateConnection() {
149         if (reusable) {
150             final Endpoint endpoint = sessionRef.get();
151             return endpoint != null && endpoint.session.isOpen();
152         }
153         final Endpoint endpoint = sessionRef.getAndSet(null);
154         if (endpoint != null) {
155             closeEndpoint(endpoint);
156         }
157         return false;
158     }
159 
160     @Override
161     public boolean isEndpointConnected() {
162         final Endpoint endpoint = sessionRef.get();
163         return endpoint != null && endpoint.session.isOpen();
164     }
165 
166 
167     Endpoint ensureValid() {
168         final Endpoint endpoint = sessionRef.get();
169         if (endpoint == null) {
170             throw new IllegalStateException("I/O session not acquired / already released");
171         }
172         return endpoint;
173     }
174 
175     @Override
176     public Cancellable connectEndpoint(
177             final HttpClientContext context,
178             final FutureCallback<AsyncExecRuntime> callback) {
179         final Endpoint endpoint = ensureValid();
180         if (endpoint.session.isOpen()) {
181             callback.completed(this);
182             return Operations.nonCancellable();
183         }
184         final HttpHost target = endpoint.target;
185         final RequestConfig requestConfig = context.getRequestConfig();
186         @SuppressWarnings("deprecation")
187         final Timeout connectTimeout = requestConfig.getConnectTimeout();
188         if (log.isDebugEnabled()) {
189             log.debug("{} connecting endpoint ({})", ConnPoolSupport.getId(endpoint), connectTimeout);
190         }
191         return Operations.cancellable(connPool.getSession(target, connectTimeout,
192             new FutureCallback<IOSession>() {
193 
194             @Override
195             public void completed(final IOSession ioSession) {
196                 sessionRef.set(new Endpoint(target, ioSession));
197                 reusable = true;
198                 if (log.isDebugEnabled()) {
199                     log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
200                 }
201                 callback.completed(InternalH2AsyncExecRuntime.this);
202             }
203 
204             @Override
205             public void failed(final Exception ex) {
206                 callback.failed(ex);
207             }
208 
209             @Override
210             public void cancelled() {
211                 callback.cancelled();
212             }
213 
214         }));
215 
216     }
217 
218     @Override
219     public void upgradeTls(final HttpClientContext context) {
220         throw new UnsupportedOperationException();
221     }
222 
223     @Override
224     public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
225         throw new UnsupportedOperationException();
226     }
227 
228     @Override
229     public Cancellable execute(
230             final String id,
231             final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
232         final ComplexCancellable complexCancellable = new ComplexCancellable();
233         final Endpoint endpoint = ensureValid();
234         final IOSession session = endpoint.session;
235         if (session.isOpen()) {
236             if (log.isDebugEnabled()) {
237                 log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
238             }
239             context.setProtocolVersion(HttpVersion.HTTP_2);
240             session.enqueue(
241                     new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
242                     Command.Priority.NORMAL);
243         } else {
244             final HttpHost target = endpoint.target;
245             final RequestConfig requestConfig = context.getRequestConfig();
246             @SuppressWarnings("deprecation")
247             final Timeout connectTimeout = requestConfig.getConnectTimeout();
248             connPool.getSession(target, connectTimeout, new FutureCallback<IOSession>() {
249 
250                 @Override
251                 public void completed(final IOSession ioSession) {
252                     sessionRef.set(new Endpoint(target, ioSession));
253                     reusable = true;
254                     if (log.isDebugEnabled()) {
255                         log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
256                     }
257                     context.setProtocolVersion(HttpVersion.HTTP_2);
258                     session.enqueue(
259                             new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
260                             Command.Priority.NORMAL);
261                 }
262 
263                 @Override
264                 public void failed(final Exception ex) {
265                     exchangeHandler.failed(ex);
266                 }
267 
268                 @Override
269                 public void cancelled() {
270                     exchangeHandler.failed(new InterruptedIOException());
271                 }
272 
273             });
274         }
275         return complexCancellable;
276     }
277 
278     @Override
279     public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
280         throw new UnsupportedOperationException();
281     }
282 
283     @Override
284     public void markConnectionNonReusable() {
285         reusable = false;
286     }
287 
288     static class Endpoint implements Identifiable {
289 
290         final HttpHost target;
291         final IOSession session;
292 
293         Endpoint(final HttpHost target, final IOSession session) {
294             this.target = target;
295             this.session = session;
296         }
297 
298         @Override
299         public String getId() {
300             return session.getId();
301         }
302 
303     }
304 
305     @Override
306     public AsyncExecRuntime fork() {
307         return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
308     }
309 
310 }