View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.EntityDetails;
35  import org.apache.hc.core5.http.Header;
36  import org.apache.hc.core5.http.HeaderElements;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpHeaders;
39  import org.apache.hc.core5.http.HttpRequest;
40  import org.apache.hc.core5.http.HttpResponse;
41  import org.apache.hc.core5.http.HttpStatus;
42  import org.apache.hc.core5.http.HttpVersion;
43  import org.apache.hc.core5.http.ProtocolException;
44  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
45  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
46  import org.apache.hc.core5.http.impl.nio.MessageState;
47  import org.apache.hc.core5.http.message.StatusLine;
48  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
49  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
50  import org.apache.hc.core5.http.nio.DataStreamChannel;
51  import org.apache.hc.core5.http.nio.HandlerFactory;
52  import org.apache.hc.core5.http.protocol.HttpCoreContext;
53  import org.apache.hc.core5.http.protocol.HttpProcessor;
54  import org.apache.hc.core5.http2.H2ConnectionException;
55  import org.apache.hc.core5.http2.H2Error;
56  import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
57  import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
58  
59  class ClientH2StreamHandler implements H2StreamHandler {
60  
61      private final H2StreamChannel outputChannel;
62      private final DataStreamChannel dataChannel;
63      private final HttpProcessor httpProcessor;
64      private final BasicHttpConnectionMetrics connMetrics;
65      private final AsyncClientExchangeHandler exchangeHandler;
66      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
67      private final HttpCoreContext context;
68      private final AtomicBoolean requestCommitted;
69      private final AtomicBoolean failed;
70      private final AtomicBoolean done;
71  
72      private volatile MessageState requestState;
73      private volatile MessageState responseState;
74  
75      ClientH2StreamHandler(
76              final H2StreamChannel outputChannel,
77              final HttpProcessor httpProcessor,
78              final BasicHttpConnectionMetrics connMetrics,
79              final AsyncClientExchangeHandler exchangeHandler,
80              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
81              final HttpCoreContext context) {
82          this.outputChannel = outputChannel;
83          this.dataChannel = new DataStreamChannel() {
84  
85              @Override
86              public void requestOutput() {
87                  outputChannel.requestOutput();
88              }
89  
90              @Override
91              public int write(final ByteBuffer src) throws IOException {
92                  return outputChannel.write(src);
93              }
94  
95              @Override
96              public void endStream(final List<? extends Header> trailers) throws IOException {
97                  outputChannel.endStream(trailers);
98                  requestState = MessageState.COMPLETE;
99              }
100 
101             @Override
102             public void endStream() throws IOException {
103                 outputChannel.endStream();
104                 requestState = MessageState.COMPLETE;
105             }
106 
107         };
108         this.httpProcessor = httpProcessor;
109         this.connMetrics = connMetrics;
110         this.exchangeHandler = exchangeHandler;
111         this.pushHandlerFactory = pushHandlerFactory;
112         this.context = context;
113         this.requestCommitted = new AtomicBoolean(false);
114         this.failed = new AtomicBoolean(false);
115         this.done = new AtomicBoolean(false);
116         this.requestState = MessageState.HEADERS;
117         this.responseState = MessageState.HEADERS;
118     }
119 
120     @Override
121     public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
122         return pushHandlerFactory;
123     }
124 
125     @Override
126     public boolean isOutputReady() {
127         switch (requestState) {
128             case HEADERS:
129                 return true;
130             case BODY:
131                 return exchangeHandler.available() > 0;
132             default:
133                 return false;
134         }
135     }
136 
137     private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
138         if (requestCommitted.compareAndSet(false, true)) {
139             context.setProtocolVersion(HttpVersion.HTTP_2);
140             context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
141 
142             httpProcessor.process(request, entityDetails, context);
143 
144             final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
145             outputChannel.submit(headers, entityDetails == null);
146             connMetrics.incrementRequestCount();
147 
148             if (entityDetails == null) {
149                 requestState = MessageState.COMPLETE;
150             } else {
151                 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
152                 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
153                 if (expectContinue) {
154                     requestState = MessageState.ACK;
155                 } else {
156                     requestState = MessageState.BODY;
157                     exchangeHandler.produce(dataChannel);
158                 }
159             }
160         } else {
161             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Request already committed");
162         }
163     }
164 
165     @Override
166     public void produceOutput() throws HttpException, IOException {
167         switch (requestState) {
168             case HEADERS:
169                 exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
170                 break;
171             case BODY:
172                 exchangeHandler.produce(dataChannel);
173                 break;
174         }
175     }
176 
177     @Override
178     public void consumePromise(final List<Header> headers) throws HttpException, IOException {
179         throw new ProtocolException("Unexpected message promise");
180     }
181 
182     @Override
183     public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
184         if (done.get()) {
185             throw new ProtocolException("Unexpected message headers");
186         }
187         switch (responseState) {
188             case HEADERS:
189                 final HttpResponse response = DefaultH2ResponseConverter.INSTANCE.convert(headers);
190                 final int status = response.getCode();
191                 if (status < HttpStatus.SC_INFORMATIONAL) {
192                     throw new ProtocolException("Invalid response: " + new StatusLine(response));
193                 }
194                 if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
195                     exchangeHandler.consumeInformation(response, context);
196                 }
197                 if (requestState == MessageState.ACK) {
198                     if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
199                         requestState = MessageState.BODY;
200                         exchangeHandler.produce(dataChannel);
201                     }
202                 }
203                 if (status < HttpStatus.SC_SUCCESS) {
204                     return;
205                 }
206 
207                 final EntityDetails entityDetails = endStream ? null : new IncomingEntityDetails(response, -1);
208                 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
209                 httpProcessor.process(response, entityDetails, context);
210                 connMetrics.incrementResponseCount();
211 
212                 exchangeHandler.consumeResponse(response, entityDetails, context);
213                 responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
214                 break;
215             case BODY:
216                 responseState = MessageState.COMPLETE;
217                 exchangeHandler.streamEnd(headers);
218                 break;
219             default:
220                 throw new ProtocolException("Unexpected message headers");
221         }
222     }
223 
224     @Override
225     public void updateInputCapacity() throws IOException {
226         exchangeHandler.updateCapacity(outputChannel);
227     }
228 
229     @Override
230     public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
231         if (done.get() || responseState != MessageState.BODY) {
232             throw new ProtocolException("Unexpected message data");
233         }
234         if (src != null) {
235             exchangeHandler.consume(src);
236         }
237         if (endStream) {
238             responseState = MessageState.COMPLETE;
239             exchangeHandler.streamEnd(null);
240         }
241     }
242 
243     @Override
244     public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
245         throw ex;
246     }
247 
248     @Override
249     public void failed(final Exception cause) {
250         try {
251             if (failed.compareAndSet(false, true)) {
252                 if (exchangeHandler != null) {
253                     exchangeHandler.failed(cause);
254                 }
255             }
256         } finally {
257             releaseResources();
258         }
259     }
260 
261     @Override
262     public void releaseResources() {
263         if (done.compareAndSet(false, true)) {
264             responseState = MessageState.COMPLETE;
265             requestState = MessageState.COMPLETE;
266             exchangeHandler.releaseResources();
267         }
268     }
269 
270     @Override
271     public String toString() {
272         return "[" +
273                 "requestState=" + requestState +
274                 ", responseState=" + responseState +
275                 ']';
276     }
277 
278 }
279