View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http2.impl.nio.bootstrap;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.util.List;
34  import java.util.Set;
35  import java.util.concurrent.Future;
36  
37  import org.apache.hc.core5.annotation.Internal;
38  import org.apache.hc.core5.concurrent.Cancellable;
39  import org.apache.hc.core5.concurrent.CancellableDependency;
40  import org.apache.hc.core5.concurrent.ComplexFuture;
41  import org.apache.hc.core5.concurrent.FutureCallback;
42  import org.apache.hc.core5.concurrent.FutureContribution;
43  import org.apache.hc.core5.function.Callback;
44  import org.apache.hc.core5.function.Decorator;
45  import org.apache.hc.core5.function.Resolver;
46  import org.apache.hc.core5.http.ConnectionClosedException;
47  import org.apache.hc.core5.http.EntityDetails;
48  import org.apache.hc.core5.http.Header;
49  import org.apache.hc.core5.http.HttpException;
50  import org.apache.hc.core5.http.HttpHost;
51  import org.apache.hc.core5.http.HttpResponse;
52  import org.apache.hc.core5.http.impl.DefaultAddressResolver;
53  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequester;
54  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
55  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
56  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
57  import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
58  import org.apache.hc.core5.http.nio.CapacityChannel;
59  import org.apache.hc.core5.http.nio.DataStreamChannel;
60  import org.apache.hc.core5.http.nio.HandlerFactory;
61  import org.apache.hc.core5.http.nio.RequestChannel;
62  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
63  import org.apache.hc.core5.http.nio.command.ShutdownCommand;
64  import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
65  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
66  import org.apache.hc.core5.http.protocol.HttpContext;
67  import org.apache.hc.core5.http.protocol.HttpCoreContext;
68  import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
69  import org.apache.hc.core5.net.URIAuthority;
70  import org.apache.hc.core5.reactor.Command;
71  import org.apache.hc.core5.reactor.IOEventHandlerFactory;
72  import org.apache.hc.core5.reactor.IOReactorConfig;
73  import org.apache.hc.core5.reactor.IOSession;
74  import org.apache.hc.core5.reactor.IOSessionListener;
75  import org.apache.hc.core5.util.Args;
76  import org.apache.hc.core5.util.TimeValue;
77  import org.apache.hc.core5.util.Timeout;
78  
79  /**
80   * HTTP/2 multiplexing client side message exchange initiator.
81   *
82   * @since 5.0
83   */
84  public class H2MultiplexingRequester extends AsyncRequester{
85  
86      private final H2ConnPool connPool;
87  
88      /**
89       * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
90       */
91      @Internal
92      public H2MultiplexingRequester(
93              final IOReactorConfig ioReactorConfig,
94              final IOEventHandlerFactory eventHandlerFactory,
95              final Decorator<IOSession> ioSessionDecorator,
96              final Callback<Exception> exceptionCallback,
97              final IOSessionListener sessionListener,
98              final Resolver<HttpHost, InetSocketAddress> addressResolver,
99              final TlsStrategy tlsStrategy) {
100         super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
101                         ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE);
102         this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
103     }
104 
105     public void closeIdle(final TimeValue idleTime) {
106         connPool.closeIdle(idleTime);
107     }
108 
109     public Set<HttpHost> getRoutes() {
110         return connPool.getRoutes();
111     }
112 
113     public TimeValue getValidateAfterInactivity() {
114         return connPool.getValidateAfterInactivity();
115     }
116 
117     public void setValidateAfterInactivity(final TimeValue timeValue) {
118         connPool.setValidateAfterInactivity(timeValue);
119     }
120 
121     /**
122      * @since 5.3
123      */
124     public Cancellable execute(
125             final HttpHost target,
126             final AsyncClientExchangeHandler exchangeHandler,
127             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
128             final Timeout timeout,
129             final HttpContext context) {
130         Args.notNull(exchangeHandler, "Exchange handler");
131         Args.notNull(timeout, "Timeout");
132         Args.notNull(context, "Context");
133         final CancellableExecution cancellableExecution = new CancellableExecution();
134         execute(target, exchangeHandler, pushHandlerFactory, cancellableExecution, timeout, context);
135         return cancellableExecution;
136     }
137 
138     public Cancellable execute(
139             final AsyncClientExchangeHandler exchangeHandler,
140             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
141             final Timeout timeout,
142             final HttpContext context) {
143         return execute(null, exchangeHandler, pushHandlerFactory, timeout, context);
144     }
145 
146     /**
147      * @since 5.3
148      */
149     public Cancellable execute(
150             final HttpHost target,
151             final AsyncClientExchangeHandler exchangeHandler,
152             final Timeout timeout,
153             final HttpContext context) {
154         return execute(target, exchangeHandler, null, timeout, context);
155     }
156 
157     public Cancellable execute(
158             final AsyncClientExchangeHandler exchangeHandler,
159             final Timeout timeout,
160             final HttpContext context) {
161         return execute(null, exchangeHandler, null, timeout, context);
162     }
163 
164     private void execute(
165             final HttpHost target,
166             final AsyncClientExchangeHandler exchangeHandler,
167             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
168             final CancellableDependency cancellableDependency,
169             final Timeout timeout,
170             final HttpContext context) {
171         Args.notNull(exchangeHandler, "Exchange handler");
172         Args.notNull(timeout, "Timeout");
173         Args.notNull(context, "Context");
174         try {
175             exchangeHandler.produceRequest((request, entityDetails, httpContext) -> {
176                 final HttpHost host = target != null ? target : defaultTarget(request);
177                 if (request.getAuthority() == null) {
178                     request.setAuthority(new URIAuthority(host.getHostName(), host.getPort()));
179                 }
180                 connPool.getSession(host, timeout, new FutureCallback<IOSession>() {
181 
182                     @Override
183                     public void completed(final IOSession ioSession) {
184                         ioSession.enqueue(new RequestExecutionCommand(new AsyncClientExchangeHandler() {
185 
186                             @Override
187                             public void releaseResources() {
188                                 exchangeHandler.releaseResources();
189                             }
190 
191                             @Override
192                             public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
193                                 channel.sendRequest(request, entityDetails, httpContext);
194                             }
195 
196                             @Override
197                             public int available() {
198                                 return exchangeHandler.available();
199                             }
200 
201                             @Override
202                             public void produce(final DataStreamChannel channel) throws IOException {
203                                 exchangeHandler.produce(channel);
204                             }
205 
206                             @Override
207                             public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
208                                 exchangeHandler.consumeInformation(response, httpContext);
209                             }
210 
211                             @Override
212                             public void consumeResponse(
213                                     final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
214                                 exchangeHandler.consumeResponse(response, entityDetails, httpContext);
215                             }
216 
217                             @Override
218                             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
219                                 exchangeHandler.updateCapacity(capacityChannel);
220                             }
221 
222                             @Override
223                             public void consume(final ByteBuffer src) throws IOException {
224                                 exchangeHandler.consume(src);
225                             }
226 
227                             @Override
228                             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
229                                 exchangeHandler.streamEnd(trailers);
230                             }
231 
232                             @Override
233                             public void cancel() {
234                                 exchangeHandler.cancel();
235                             }
236 
237                             @Override
238                             public void failed(final Exception cause) {
239                                 exchangeHandler.failed(cause);
240                             }
241 
242                         }, pushHandlerFactory, cancellableDependency, context), Command.Priority.NORMAL);
243                         if (!ioSession.isOpen()) {
244                             exchangeHandler.failed(new ConnectionClosedException());
245                         }
246                     }
247 
248                     @Override
249                     public void failed(final Exception ex) {
250                         exchangeHandler.failed(ex);
251                     }
252 
253                     @Override
254                     public void cancelled() {
255                         exchangeHandler.cancel();
256                     }
257 
258                 });
259 
260             }, context);
261         } catch (final IOException | HttpException ex) {
262             exchangeHandler.failed(ex);
263         }
264     }
265 
266     /**
267      * @since 5.3
268      */
269     public final <T> Future<T> execute(
270             final HttpHost target,
271             final AsyncRequestProducer requestProducer,
272             final AsyncResponseConsumer<T> responseConsumer,
273             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
274             final Timeout timeout,
275             final HttpContext context,
276             final FutureCallback<T> callback) {
277         Args.notNull(requestProducer, "Request producer");
278         Args.notNull(responseConsumer, "Response consumer");
279         Args.notNull(timeout, "Timeout");
280         final ComplexFuture<T> future = new ComplexFuture<>(callback);
281         final AsyncClientExchangeHandler exchangeHandler = new BasicClientExchangeHandler<>(
282                 requestProducer,
283                 responseConsumer,
284                 new FutureContribution<T>(future) {
285 
286                     @Override
287                     public void completed(final T result) {
288                         future.completed(result);
289                     }
290 
291                 });
292         execute(target, exchangeHandler, pushHandlerFactory, future, timeout, context != null ? context : HttpCoreContext.create());
293         return future;
294     }
295 
296     public final <T> Future<T> execute(
297             final AsyncRequestProducer requestProducer,
298             final AsyncResponseConsumer<T> responseConsumer,
299             final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
300             final Timeout timeout,
301             final HttpContext context,
302             final FutureCallback<T> callback) {
303         return execute(null, requestProducer, responseConsumer, pushHandlerFactory, timeout, context, callback);
304     }
305 
306     /**
307      * @since 5.3
308      */
309     public final <T> Future<T> execute(
310             final HttpHost target,
311             final AsyncRequestProducer requestProducer,
312             final AsyncResponseConsumer<T> responseConsumer,
313             final Timeout timeout,
314             final HttpContext context,
315             final FutureCallback<T> callback) {
316         return execute(target, requestProducer, responseConsumer, null, timeout, context, callback);
317     }
318 
319     public final <T> Future<T> execute(
320             final AsyncRequestProducer requestProducer,
321             final AsyncResponseConsumer<T> responseConsumer,
322             final Timeout timeout,
323             final HttpContext context,
324             final FutureCallback<T> callback) {
325         return execute(null, requestProducer, responseConsumer, null, timeout, context, callback);
326     }
327 
328     public final <T> Future<T> execute(
329             final AsyncRequestProducer requestProducer,
330             final AsyncResponseConsumer<T> responseConsumer,
331             final Timeout timeout,
332             final FutureCallback<T> callback) {
333         return execute(null, requestProducer, responseConsumer, null, timeout, null, callback);
334     }
335 
336     @Internal
337     public H2ConnPool getConnPool() {
338         return connPool;
339     }
340 
341 }