1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.net.SocketTimeoutException;
32 import java.util.Queue;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34
35 import org.apache.http.ConnectionClosedException;
36 import org.apache.http.ExceptionLogger;
37 import org.apache.http.HttpEntity;
38 import org.apache.http.HttpEntityEnclosingRequest;
39 import org.apache.http.HttpException;
40 import org.apache.http.HttpRequest;
41 import org.apache.http.HttpResponse;
42 import org.apache.http.HttpStatus;
43 import org.apache.http.HttpVersion;
44 import org.apache.http.ProtocolException;
45 import org.apache.http.ProtocolVersion;
46 import org.apache.http.annotation.Contract;
47 import org.apache.http.annotation.ThreadingBehavior;
48 import org.apache.http.nio.ContentDecoder;
49 import org.apache.http.nio.ContentEncoder;
50 import org.apache.http.nio.NHttpClientConnection;
51 import org.apache.http.nio.NHttpClientEventHandler;
52 import org.apache.http.nio.NHttpConnection;
53 import org.apache.http.protocol.HttpContext;
54 import org.apache.http.util.Args;
55 import org.apache.http.util.Asserts;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
84 public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
85
86 public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
87 public static final String HTTP_HANDLER = "http.nio.exchange-handler";
88
89 private final int waitForContinue;
90 private final ExceptionLogger exceptionLogger;
91
92
93
94
95
96
97
98
99
100
101
102 public HttpAsyncRequestExecutor(
103 final int waitForContinue,
104 final ExceptionLogger exceptionLogger) {
105 super();
106 this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
107 this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
108 }
109
110
111
112
113
114
115 public HttpAsyncRequestExecutor(final int waitForContinue) {
116 this(waitForContinue, null);
117 }
118
119 public HttpAsyncRequestExecutor() {
120 this(DEFAULT_WAIT_FOR_CONTINUE, null);
121 }
122
123 private static boolean pipelining(final HttpAsyncClientExchangeHandler handler) {
124 return handler.getClass().getAnnotation(Pipelined.class) != null;
125 }
126
127 @Override
128 public void connected(
129 final NHttpClientConnection conn,
130 final Object attachment) throws IOException, HttpException {
131 final State state = new State();
132 final HttpContext context = conn.getContext();
133 context.setAttribute(HTTP_EXCHANGE_STATE, state);
134 requestReady(conn);
135 }
136
137 @Override
138 public void closed(final NHttpClientConnection conn) {
139 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
140 if (handler == null) {
141 return;
142 }
143 final State state = getState(conn);
144 if (state != null) {
145 if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
146 handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
147 }
148 }
149 if (!handler.isDone() && pipelining(handler)) {
150 handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
151 }
152 if (state == null || handler.isDone()) {
153 closeHandler(handler);
154 }
155 }
156
157 @Override
158 public void exception(
159 final NHttpClientConnection conn, final Exception cause) {
160 shutdownConnection(conn);
161 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
162 if (handler != null) {
163 handler.failed(cause);
164 } else {
165 log(cause);
166 }
167 }
168
169 @Override
170 public void requestReady(
171 final NHttpClientConnection conn) throws IOException, HttpException {
172 final State state = getState(conn);
173 Asserts.notNull(state, "Connection state");
174 Asserts.check(state.getRequestState() == MessageState.READY ||
175 state.getRequestState() == MessageState.COMPLETED,
176 "Unexpected request state %s", state.getRequestState());
177
178 if (state.getRequestState() == MessageState.COMPLETED) {
179 conn.suspendOutput();
180 return;
181 }
182 final HttpContext context = conn.getContext();
183 final HttpAsyncClientExchangeHandler handler;
184 synchronized (context) {
185 handler = getHandler(conn);
186 if (handler == null || handler.isDone()) {
187 conn.suspendOutput();
188 return;
189 }
190 }
191 final boolean pipelined = pipelining(handler);
192
193 final HttpRequest request = handler.generateRequest();
194 if (request == null) {
195 conn.suspendOutput();
196 return;
197 }
198 final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
199 if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
200 throw new ProtocolException(version + " cannot be used with request pipelining");
201 }
202 state.setRequest(request);
203 if (pipelined) {
204 state.getRequestQueue().add(request);
205 }
206 if (request instanceof HttpEntityEnclosingRequest) {
207 final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
208 if (expectContinue && pipelined) {
209 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
210 }
211 conn.submitRequest(request);
212 if (expectContinue) {
213 final int timeout = conn.getSocketTimeout();
214 state.setTimeout(timeout);
215 conn.setSocketTimeout(this.waitForContinue);
216 state.setRequestState(MessageState.ACK_EXPECTED);
217 } else {
218 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
219 if (entity != null) {
220 state.setRequestState(MessageState.BODY_STREAM);
221 } else {
222 handler.requestCompleted();
223 state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
224 }
225 }
226 } else {
227 conn.submitRequest(request);
228 handler.requestCompleted();
229 state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
230 }
231 }
232
233 @Override
234 public void outputReady(
235 final NHttpClientConnection conn,
236 final ContentEncoder encoder) throws IOException, HttpException {
237 final State state = getState(conn);
238 Asserts.notNull(state, "Connection state");
239 Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
240 state.getRequestState() == MessageState.ACK_EXPECTED,
241 "Unexpected request state %s", state.getRequestState());
242
243 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
244 Asserts.notNull(handler, "Client exchange handler");
245 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
246 conn.suspendOutput();
247 return;
248 }
249 handler.produceContent(encoder, conn);
250 if (encoder.isCompleted()) {
251 handler.requestCompleted();
252 state.setRequestState(pipelining(handler) ? MessageState.READY : MessageState.COMPLETED);
253 }
254 }
255
256 @Override
257 public void responseReceived(
258 final NHttpClientConnection conn) throws HttpException, IOException {
259 final State state = getState(conn);
260 Asserts.notNull(state, "Connection state");
261 Asserts.check(state.getResponseState() == MessageState.READY,
262 "Unexpected request state %s", state.getResponseState());
263
264 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
265 Asserts.notNull(handler, "Client exchange handler");
266
267 final HttpRequest request;
268 if (pipelining(handler)) {
269 request = state.getRequestQueue().poll();
270 Asserts.notNull(request, "HTTP request");
271 } else {
272 request = state.getRequest();
273 if (request == null) {
274 throw new HttpException("Out of sequence response");
275 }
276 }
277
278 final HttpResponse response = conn.getHttpResponse();
279
280 final int statusCode = response.getStatusLine().getStatusCode();
281 if (statusCode < HttpStatus.SC_CONTINUE) {
282 throw new ProtocolException("Invalid response: " + response.getStatusLine());
283 }
284 if (statusCode < HttpStatus.SC_OK) {
285
286 if (statusCode != HttpStatus.SC_CONTINUE) {
287 throw new ProtocolException(
288 "Unexpected response: " + response.getStatusLine());
289 }
290 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
291 final int timeout = state.getTimeout();
292 conn.setSocketTimeout(timeout);
293 conn.requestOutput();
294 state.setRequestState(MessageState.BODY_STREAM);
295 }
296 return;
297 }
298 state.setResponse(response);
299 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
300 final int timeout = state.getTimeout();
301 conn.setSocketTimeout(timeout);
302 conn.resetOutput();
303 state.setRequestState(MessageState.COMPLETED);
304 } else if (state.getRequestState() == MessageState.BODY_STREAM) {
305
306 if (statusCode >= 400) {
307 conn.resetOutput();
308 conn.suspendOutput();
309 state.setRequestState(MessageState.COMPLETED);
310 state.invalidate();
311 }
312 }
313
314 if (canResponseHaveBody(request, response)) {
315 handler.responseReceived(response);
316 state.setResponseState(MessageState.BODY_STREAM);
317 } else {
318 response.setEntity(null);
319 handler.responseReceived(response);
320 conn.resetInput();
321 processResponse(conn, state, handler);
322 }
323 }
324
325 @Override
326 public void inputReady(
327 final NHttpClientConnection conn,
328 final ContentDecoder decoder) throws IOException, HttpException {
329 final State state = getState(conn);
330 Asserts.notNull(state, "Connection state");
331 Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
332 "Unexpected request state %s", state.getResponseState());
333
334 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
335 Asserts.notNull(handler, "Client exchange handler");
336 handler.consumeContent(decoder, conn);
337 if (decoder.isCompleted()) {
338 processResponse(conn, state, handler);
339 }
340 }
341
342 @Override
343 public void endOfInput(final NHttpClientConnection conn) throws IOException {
344 final State state = getState(conn);
345 final HttpContext context = conn.getContext();
346 synchronized (context) {
347 if (state != null) {
348 if (state.getRequestState().compareTo(MessageState.READY) != 0) {
349 state.invalidate();
350 }
351 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
352 if (handler != null) {
353 if (state.isValid()) {
354 handler.inputTerminated();
355 } else {
356 handler.failed(new ConnectionClosedException());
357 }
358 }
359 }
360
361
362
363
364 if (conn.getSocketTimeout() <= 0) {
365 conn.setSocketTimeout(1000);
366 }
367 conn.close();
368 }
369 }
370
371 @Override
372 public void timeout(
373 final NHttpClientConnection conn) throws IOException {
374 final State state = getState(conn);
375 if (state != null) {
376 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
377 final int timeout = state.getTimeout();
378 conn.setSocketTimeout(timeout);
379 conn.requestOutput();
380 state.setRequestState(MessageState.BODY_STREAM);
381 state.setTimeout(0);
382 return;
383 }
384 state.invalidate();
385 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
386 if (handler != null) {
387 handler.failed(new SocketTimeoutException(
388 String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
389 handler.close();
390 }
391 }
392 if (conn.getStatus() == NHttpConnection.ACTIVE) {
393 conn.close();
394 if (conn.getStatus() == NHttpConnection.CLOSING) {
395
396
397 conn.setSocketTimeout(250);
398 }
399 } else {
400 conn.shutdown();
401 }
402 }
403
404
405
406
407
408
409
410
411 protected void log(final Exception ex) {
412 this.exceptionLogger.log(ex);
413 }
414
415 private static State getState(final NHttpConnection conn) {
416 return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
417 }
418
419 private static HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
420 return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
421 }
422
423 private void shutdownConnection(final NHttpConnection conn) {
424 try {
425 conn.shutdown();
426 } catch (final IOException ex) {
427 log(ex);
428 }
429 }
430
431 private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
432 if (handler != null) {
433 try {
434 handler.close();
435 } catch (final IOException ioex) {
436 log(ioex);
437 }
438 }
439 }
440
441 private void processResponse(
442 final NHttpClientConnection conn,
443 final State state,
444 final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
445 if (!state.isValid()) {
446 conn.close();
447 }
448 handler.responseCompleted();
449
450 if (!pipelining(handler)) {
451 state.setRequestState(MessageState.READY);
452 state.setRequest(null);
453 }
454 state.setResponseState(MessageState.READY);
455 state.setResponse(null);
456 if (!handler.isDone() && conn.isOpen()) {
457 conn.requestOutput();
458 }
459 }
460
461 private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
462
463 final String method = request.getRequestLine().getMethod();
464 final int status = response.getStatusLine().getStatusCode();
465
466 if (method.equalsIgnoreCase("HEAD")) {
467 return false;
468 }
469 if (method.equalsIgnoreCase("CONNECT") && status < 300) {
470 return false;
471 }
472 return status >= HttpStatus.SC_OK
473 && status != HttpStatus.SC_NO_CONTENT
474 && status != HttpStatus.SC_NOT_MODIFIED
475 && status != HttpStatus.SC_RESET_CONTENT;
476 }
477
478 static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
479
480 static class State {
481
482 private final Queue<HttpRequest> requestQueue;
483 private volatile MessageState requestState;
484 private volatile MessageState responseState;
485 private volatile HttpRequest request;
486 private volatile HttpResponse response;
487 private volatile boolean valid;
488 private volatile int timeout;
489
490 State() {
491 super();
492 this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
493 this.valid = true;
494 this.requestState = MessageState.READY;
495 this.responseState = MessageState.READY;
496 }
497
498 public MessageState getRequestState() {
499 return this.requestState;
500 }
501
502 public void setRequestState(final MessageState state) {
503 this.requestState = state;
504 }
505
506 public MessageState getResponseState() {
507 return this.responseState;
508 }
509
510 public void setResponseState(final MessageState state) {
511 this.responseState = state;
512 }
513
514 public HttpRequest getRequest() {
515 return this.request;
516 }
517
518 public void setRequest(final HttpRequest request) {
519 this.request = request;
520 }
521
522 public HttpResponse getResponse() {
523 return this.response;
524 }
525
526 public void setResponse(final HttpResponse response) {
527 this.response = response;
528 }
529
530 public Queue<HttpRequest> getRequestQueue() {
531 return this.requestQueue;
532 }
533
534 public int getTimeout() {
535 return this.timeout;
536 }
537
538 public void setTimeout(final int timeout) {
539 this.timeout = timeout;
540 }
541
542 public boolean isValid() {
543 return this.valid;
544 }
545
546 public void invalidate() {
547 this.valid = false;
548 }
549
550 @Override
551 public String toString() {
552 final StringBuilder buf = new StringBuilder();
553 buf.append("request state: ");
554 buf.append(this.requestState);
555 buf.append("; request: ");
556 if (this.request != null) {
557 buf.append(this.request.getRequestLine());
558 }
559 buf.append("; response state: ");
560 buf.append(this.responseState);
561 buf.append("; response: ");
562 if (this.response != null) {
563 buf.append(this.response.getStatusLine());
564 }
565 buf.append("; valid: ");
566 buf.append(this.valid);
567 buf.append(";");
568 return buf.toString();
569 }
570
571 }
572
573 }