1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http2.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.EntityDetails;
35 import org.apache.hc.core5.http.Header;
36 import org.apache.hc.core5.http.HeaderElements;
37 import org.apache.hc.core5.http.HttpException;
38 import org.apache.hc.core5.http.HttpHeaders;
39 import org.apache.hc.core5.http.HttpRequest;
40 import org.apache.hc.core5.http.HttpResponse;
41 import org.apache.hc.core5.http.HttpStatus;
42 import org.apache.hc.core5.http.HttpVersion;
43 import org.apache.hc.core5.http.ProtocolException;
44 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
45 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
46 import org.apache.hc.core5.http.impl.nio.MessageState;
47 import org.apache.hc.core5.http.message.StatusLine;
48 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
49 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
50 import org.apache.hc.core5.http.nio.DataStreamChannel;
51 import org.apache.hc.core5.http.nio.HandlerFactory;
52 import org.apache.hc.core5.http.protocol.HttpCoreContext;
53 import org.apache.hc.core5.http.protocol.HttpProcessor;
54 import org.apache.hc.core5.http2.H2ConnectionException;
55 import org.apache.hc.core5.http2.H2Error;
56 import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
57 import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
58
59 class ClientH2StreamHandler implements H2StreamHandler {
60
61 private final H2StreamChannel outputChannel;
62 private final DataStreamChannel dataChannel;
63 private final HttpProcessor httpProcessor;
64 private final BasicHttpConnectionMetrics connMetrics;
65 private final AsyncClientExchangeHandler exchangeHandler;
66 private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
67 private final HttpCoreContext context;
68 private final AtomicBoolean requestCommitted;
69 private final AtomicBoolean failed;
70 private final AtomicBoolean done;
71
72 private volatile MessageState requestState;
73 private volatile MessageState responseState;
74
75 ClientH2StreamHandler(
76 final H2StreamChannel outputChannel,
77 final HttpProcessor httpProcessor,
78 final BasicHttpConnectionMetrics connMetrics,
79 final AsyncClientExchangeHandler exchangeHandler,
80 final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
81 final HttpCoreContext context) {
82 this.outputChannel = outputChannel;
83 this.dataChannel = new DataStreamChannel() {
84
85 @Override
86 public void requestOutput() {
87 outputChannel.requestOutput();
88 }
89
90 @Override
91 public int write(final ByteBuffer src) throws IOException {
92 return outputChannel.write(src);
93 }
94
95 @Override
96 public void endStream(final List<? extends Header> trailers) throws IOException {
97 outputChannel.endStream(trailers);
98 requestState = MessageState.COMPLETE;
99 }
100
101 @Override
102 public void endStream() throws IOException {
103 outputChannel.endStream();
104 requestState = MessageState.COMPLETE;
105 }
106
107 };
108 this.httpProcessor = httpProcessor;
109 this.connMetrics = connMetrics;
110 this.exchangeHandler = exchangeHandler;
111 this.pushHandlerFactory = pushHandlerFactory;
112 this.context = context;
113 this.requestCommitted = new AtomicBoolean(false);
114 this.failed = new AtomicBoolean(false);
115 this.done = new AtomicBoolean(false);
116 this.requestState = MessageState.HEADERS;
117 this.responseState = MessageState.HEADERS;
118 }
119
120 @Override
121 public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
122 return pushHandlerFactory;
123 }
124
125 @Override
126 public boolean isOutputReady() {
127 switch (requestState) {
128 case HEADERS:
129 return true;
130 case BODY:
131 return exchangeHandler.available() > 0;
132 default:
133 return false;
134 }
135 }
136
137 private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
138 if (requestCommitted.compareAndSet(false, true)) {
139 context.setProtocolVersion(HttpVersion.HTTP_2);
140 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
141
142 httpProcessor.process(request, entityDetails, context);
143
144 final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
145 outputChannel.submit(headers, entityDetails == null);
146 connMetrics.incrementRequestCount();
147
148 if (entityDetails == null) {
149 requestState = MessageState.COMPLETE;
150 } else {
151 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
152 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
153 if (expectContinue) {
154 requestState = MessageState.ACK;
155 } else {
156 requestState = MessageState.BODY;
157 exchangeHandler.produce(dataChannel);
158 }
159 }
160 } else {
161 throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Request already committed");
162 }
163 }
164
165 @Override
166 public void produceOutput() throws HttpException, IOException {
167 switch (requestState) {
168 case HEADERS:
169 exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
170 break;
171 case BODY:
172 exchangeHandler.produce(dataChannel);
173 break;
174 }
175 }
176
177 @Override
178 public void consumePromise(final List<Header> headers) throws HttpException, IOException {
179 throw new ProtocolException("Unexpected message promise");
180 }
181
182 @Override
183 public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
184 if (done.get()) {
185 throw new ProtocolException("Unexpected message headers");
186 }
187 switch (responseState) {
188 case HEADERS:
189 final HttpResponse response = DefaultH2ResponseConverter.INSTANCE.convert(headers);
190 final int status = response.getCode();
191 if (status < HttpStatus.SC_INFORMATIONAL) {
192 throw new ProtocolException("Invalid response: " + new StatusLine(response));
193 }
194 if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
195 exchangeHandler.consumeInformation(response, context);
196 }
197 if (requestState == MessageState.ACK) {
198 if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
199 requestState = MessageState.BODY;
200 exchangeHandler.produce(dataChannel);
201 }
202 }
203 if (status < HttpStatus.SC_SUCCESS) {
204 return;
205 }
206
207 final EntityDetails entityDetails = endStream ? null : new IncomingEntityDetails(response, -1);
208 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
209 httpProcessor.process(response, entityDetails, context);
210 connMetrics.incrementResponseCount();
211
212 exchangeHandler.consumeResponse(response, entityDetails, context);
213 responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
214 break;
215 case BODY:
216 responseState = MessageState.COMPLETE;
217 exchangeHandler.streamEnd(headers);
218 break;
219 default:
220 throw new ProtocolException("Unexpected message headers");
221 }
222 }
223
224 @Override
225 public void updateInputCapacity() throws IOException {
226 exchangeHandler.updateCapacity(outputChannel);
227 }
228
229 @Override
230 public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
231 if (done.get() || responseState != MessageState.BODY) {
232 throw new ProtocolException("Unexpected message data");
233 }
234 if (src != null) {
235 exchangeHandler.consume(src);
236 }
237 if (endStream) {
238 responseState = MessageState.COMPLETE;
239 exchangeHandler.streamEnd(null);
240 }
241 }
242
243 @Override
244 public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
245 throw ex;
246 }
247
248 @Override
249 public void failed(final Exception cause) {
250 try {
251 if (failed.compareAndSet(false, true)) {
252 if (exchangeHandler != null) {
253 exchangeHandler.failed(cause);
254 }
255 }
256 } finally {
257 releaseResources();
258 }
259 }
260
261 @Override
262 public void releaseResources() {
263 if (done.compareAndSet(false, true)) {
264 responseState = MessageState.COMPLETE;
265 requestState = MessageState.COMPLETE;
266 exchangeHandler.releaseResources();
267 }
268 }
269
270 @Override
271 public String toString() {
272 return "[" +
273 "requestState=" + requestState +
274 ", responseState=" + responseState +
275 ']';
276 }
277
278 }
279