1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.impl.nio;
28
29 import java.io.IOException;
30 import java.nio.ByteBuffer;
31 import java.util.List;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import org.apache.hc.core5.http.ConnectionReuseStrategy;
35 import org.apache.hc.core5.http.EntityDetails;
36 import org.apache.hc.core5.http.Header;
37 import org.apache.hc.core5.http.HeaderElements;
38 import org.apache.hc.core5.http.HttpException;
39 import org.apache.hc.core5.http.HttpHeaders;
40 import org.apache.hc.core5.http.HttpRequest;
41 import org.apache.hc.core5.http.HttpResponse;
42 import org.apache.hc.core5.http.HttpStatus;
43 import org.apache.hc.core5.http.HttpVersion;
44 import org.apache.hc.core5.http.ProtocolException;
45 import org.apache.hc.core5.http.ProtocolVersion;
46 import org.apache.hc.core5.http.UnsupportedHttpVersionException;
47 import org.apache.hc.core5.http.config.Http1Config;
48 import org.apache.hc.core5.http.message.StatusLine;
49 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
50 import org.apache.hc.core5.http.nio.CapacityChannel;
51 import org.apache.hc.core5.http.nio.DataStreamChannel;
52 import org.apache.hc.core5.http.nio.ResourceHolder;
53 import org.apache.hc.core5.http.protocol.HttpCoreContext;
54 import org.apache.hc.core5.http.protocol.HttpProcessor;
55 import org.apache.hc.core5.util.Timeout;
56
57 class ClientHttp1StreamHandler implements ResourceHolder {
58
59 public static final Timeout DEFAULT_WAIT_FOR_CONTINUE = Timeout.ofSeconds(3);
60
61 private final Http1StreamChannel<HttpRequest> outputChannel;
62 private final DataStreamChannel internalDataChannel;
63 private final HttpProcessor httpProcessor;
64 private final Http1Config http1Config;
65 private final ConnectionReuseStrategy connectionReuseStrategy;
66 private final AsyncClientExchangeHandler exchangeHandler;
67 private final HttpCoreContext context;
68 private final AtomicBoolean requestCommitted;
69 private final AtomicBoolean done;
70
71 private volatile boolean keepAlive;
72 private volatile Timeout timeout;
73 private volatile HttpRequest committedRequest;
74 private volatile MessageState requestState;
75 private volatile MessageState responseState;
76
77 ClientHttp1StreamHandler(
78 final Http1StreamChannel<HttpRequest> outputChannel,
79 final HttpProcessor httpProcessor,
80 final Http1Config http1Config,
81 final ConnectionReuseStrategy connectionReuseStrategy,
82 final AsyncClientExchangeHandler exchangeHandler,
83 final HttpCoreContext context) {
84 this.outputChannel = outputChannel;
85 this.internalDataChannel = new DataStreamChannel() {
86
87 @Override
88 public void requestOutput() {
89 outputChannel.requestOutput();
90 }
91
92 @Override
93 public void endStream(final List<? extends Header> trailers) throws IOException {
94 outputChannel.complete(trailers);
95 requestState = MessageState.COMPLETE;
96 }
97
98 @Override
99 public int write(final ByteBuffer src) throws IOException {
100 return outputChannel.write(src);
101 }
102
103 @Override
104 public void endStream() throws IOException {
105 endStream(null);
106 }
107
108 };
109
110 this.httpProcessor = httpProcessor;
111 this.http1Config = http1Config;
112 this.connectionReuseStrategy = connectionReuseStrategy;
113 this.exchangeHandler = exchangeHandler;
114 this.context = context;
115 this.requestCommitted = new AtomicBoolean(false);
116 this.done = new AtomicBoolean(false);
117 this.keepAlive = true;
118 this.requestState = MessageState.IDLE;
119 this.responseState = MessageState.HEADERS;
120 }
121
122 boolean isResponseFinal() {
123 return responseState == MessageState.COMPLETE;
124 }
125
126 boolean isCompleted() {
127 return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
128 }
129
130 String getRequestMethod() {
131 return committedRequest != null ? committedRequest.getMethod() : null;
132 }
133
134 boolean isOutputReady() {
135 switch (requestState) {
136 case IDLE:
137 case ACK:
138 return true;
139 case BODY:
140 return exchangeHandler.available() > 0;
141 default:
142 return false;
143 }
144 }
145
146 private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
147 if (requestCommitted.compareAndSet(false, true)) {
148 final ProtocolVersion transportVersion = request.getVersion();
149 if (transportVersion != null && !transportVersion.lessEquals(http1Config.getVersion())) {
150 throw new UnsupportedHttpVersionException(transportVersion);
151 }
152 context.setProtocolVersion(transportVersion != null ? transportVersion : http1Config.getVersion());
153 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
154
155 httpProcessor.process(request, entityDetails, context);
156
157 final boolean endStream = entityDetails == null;
158 if (endStream) {
159 outputChannel.submit(request, true, FlushMode.IMMEDIATE);
160 committedRequest = request;
161 requestState = MessageState.COMPLETE;
162 } else {
163 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
164 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
165 outputChannel.submit(request, false, expectContinue ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
166 committedRequest = request;
167 if (expectContinue) {
168 requestState = MessageState.ACK;
169 timeout = outputChannel.getSocketTimeout();
170 final Timeout timeout = http1Config.getWaitForContinueTimeout() != null ? http1Config.getWaitForContinueTimeout() : DEFAULT_WAIT_FOR_CONTINUE;
171 outputChannel.setSocketTimeout(timeout);
172 } else {
173 requestState = MessageState.BODY;
174 exchangeHandler.produce(internalDataChannel);
175 }
176 }
177 } else {
178 throw new HttpException("Request already committed");
179 }
180 }
181
182 void produceOutput() throws HttpException, IOException {
183 switch (requestState) {
184 case IDLE:
185 requestState = MessageState.HEADERS;
186 exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
187 break;
188 case ACK:
189 outputChannel.suspendOutput();
190 break;
191 case BODY:
192 exchangeHandler.produce(internalDataChannel);
193 break;
194 }
195 }
196
197 void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
198 if (done.get() || responseState != MessageState.HEADERS) {
199 throw new ProtocolException("Unexpected message head");
200 }
201 final ProtocolVersion transportVersion = response.getVersion();
202 if (transportVersion != null) {
203 if (transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
204 throw new UnsupportedHttpVersionException(transportVersion);
205 }
206 context.setProtocolVersion(transportVersion);
207 }
208
209 final int status = response.getCode();
210 if (status < HttpStatus.SC_INFORMATIONAL) {
211 throw new ProtocolException("Invalid response: " + new StatusLine(response));
212 }
213 if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
214 exchangeHandler.consumeInformation(response, context);
215 } else {
216 if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
217 keepAlive = false;
218 }
219 }
220 if (requestState == MessageState.ACK) {
221 if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
222 outputChannel.setSocketTimeout(timeout);
223 requestState = MessageState.BODY;
224 if (status < HttpStatus.SC_CLIENT_ERROR) {
225 exchangeHandler.produce(internalDataChannel);
226 }
227 }
228 }
229 if (status < HttpStatus.SC_SUCCESS) {
230 return;
231 }
232 if (requestState == MessageState.BODY) {
233 if (status >= HttpStatus.SC_CLIENT_ERROR) {
234 requestState = MessageState.COMPLETE;
235 if (!outputChannel.abortGracefully()) {
236 keepAlive = false;
237 }
238 }
239 }
240
241 context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
242 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
243 httpProcessor.process(response, entityDetails, context);
244
245 if (entityDetails == null && !keepAlive) {
246 outputChannel.close();
247 }
248
249 exchangeHandler.consumeResponse(response, entityDetails, context);
250 if (entityDetails == null) {
251 responseState = MessageState.COMPLETE;
252 } else {
253 responseState = MessageState.BODY;
254 }
255 }
256
257 void consumeData(final ByteBuffer src) throws HttpException, IOException {
258 if (done.get() || responseState != MessageState.BODY) {
259 throw new ProtocolException("Unexpected message data");
260 }
261 exchangeHandler.consume(src);
262 }
263
264 void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
265 exchangeHandler.updateCapacity(capacityChannel);
266 }
267
268 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
269 if (done.get() || responseState != MessageState.BODY) {
270 throw new ProtocolException("Unexpected message data");
271 }
272 if (!keepAlive) {
273 outputChannel.close();
274 }
275 responseState = MessageState.COMPLETE;
276 exchangeHandler.streamEnd(trailers);
277 }
278
279 boolean handleTimeout() {
280 if (requestState == MessageState.ACK) {
281 requestState = MessageState.BODY;
282 outputChannel.setSocketTimeout(timeout);
283 outputChannel.requestOutput();
284 return true;
285 }
286 return false;
287 }
288
289 void failed(final Exception cause) {
290 if (!done.get()) {
291 exchangeHandler.failed(cause);
292 }
293 }
294
295 @Override
296 public void releaseResources() {
297 if (done.compareAndSet(false, true)) {
298 responseState = MessageState.COMPLETE;
299 requestState = MessageState.COMPLETE;
300 exchangeHandler.releaseResources();
301 }
302 }
303
304 void appendState(final StringBuilder buf) {
305 buf.append("requestState=").append(requestState)
306 .append(", responseState=").append(responseState)
307 .append(", responseCommitted=").append(requestCommitted)
308 .append(", keepAlive=").append(keepAlive)
309 .append(", done=").append(done);
310 }
311
312 @Override
313 public String toString() {
314 final StringBuilder buf = new StringBuilder();
315 buf.append("[");
316 appendState(buf);
317 buf.append("]");
318 return buf.toString();
319 }
320
321 }
322