1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.net.SocketException;
32 import java.net.SocketTimeoutException;
33 import java.util.Queue;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 import org.apache.http.ConnectionReuseStrategy;
38 import org.apache.http.ExceptionLogger;
39 import org.apache.http.HttpEntity;
40 import org.apache.http.HttpEntityEnclosingRequest;
41 import org.apache.http.HttpException;
42 import org.apache.http.HttpRequest;
43 import org.apache.http.HttpResponse;
44 import org.apache.http.HttpResponseFactory;
45 import org.apache.http.HttpStatus;
46 import org.apache.http.HttpVersion;
47 import org.apache.http.MethodNotSupportedException;
48 import org.apache.http.ProtocolException;
49 import org.apache.http.UnsupportedHttpVersionException;
50 import org.apache.http.annotation.Contract;
51 import org.apache.http.annotation.ThreadingBehavior;
52 import org.apache.http.concurrent.Cancellable;
53 import org.apache.http.entity.ContentType;
54 import org.apache.http.impl.DefaultConnectionReuseStrategy;
55 import org.apache.http.impl.DefaultHttpResponseFactory;
56 import org.apache.http.nio.ContentDecoder;
57 import org.apache.http.nio.ContentEncoder;
58 import org.apache.http.nio.NHttpConnection;
59 import org.apache.http.nio.NHttpServerConnection;
60 import org.apache.http.nio.NHttpServerEventHandler;
61 import org.apache.http.nio.entity.NStringEntity;
62 import org.apache.http.nio.reactor.SessionBufferStatus;
63 import org.apache.http.params.HttpParams;
64 import org.apache.http.protocol.BasicHttpContext;
65 import org.apache.http.protocol.HttpContext;
66 import org.apache.http.protocol.HttpCoreContext;
67 import org.apache.http.protocol.HttpProcessor;
68 import org.apache.http.util.Args;
69 import org.apache.http.util.Asserts;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 @SuppressWarnings("deprecation")
103 @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
104 public class HttpAsyncService implements NHttpServerEventHandler {
105
106 static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
107
108 private final HttpProcessor httpProcessor;
109 private final ConnectionReuseStrategy connectionStrategy;
110 private final HttpResponseFactory responseFactory;
111 private final HttpAsyncRequestHandlerMapper handlerMapper;
112 private final HttpAsyncExpectationVerifier expectationVerifier;
113 private final ExceptionLogger exceptionLogger;
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 @Deprecated
130 public HttpAsyncService(
131 final HttpProcessor httpProcessor,
132 final ConnectionReuseStrategy connStrategy,
133 final HttpResponseFactory responseFactory,
134 final HttpAsyncRequestHandlerResolver handlerResolver,
135 final HttpAsyncExpectationVerifier expectationVerifier,
136 final HttpParams params) {
137 this(httpProcessor,
138 connStrategy,
139 responseFactory,
140 new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
141 expectationVerifier);
142 }
143
144
145
146
147
148
149
150
151
152
153
154
155
156 @Deprecated
157 public HttpAsyncService(
158 final HttpProcessor httpProcessor,
159 final ConnectionReuseStrategy connStrategy,
160 final HttpAsyncRequestHandlerResolver handlerResolver,
161 final HttpParams params) {
162 this(httpProcessor,
163 connStrategy,
164 DefaultHttpResponseFactory.INSTANCE,
165 new HttpAsyncRequestHandlerResolverAdapter(handlerResolver),
166 null);
167 }
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 public HttpAsyncService(
183 final HttpProcessor httpProcessor,
184 final ConnectionReuseStrategy connStrategy,
185 final HttpResponseFactory responseFactory,
186 final HttpAsyncRequestHandlerMapper handlerMapper,
187 final HttpAsyncExpectationVerifier expectationVerifier) {
188 this(httpProcessor, connStrategy, responseFactory, handlerMapper, expectationVerifier, null);
189 }
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208 public HttpAsyncService(
209 final HttpProcessor httpProcessor,
210 final ConnectionReuseStrategy connStrategy,
211 final HttpResponseFactory responseFactory,
212 final HttpAsyncRequestHandlerMapper handlerMapper,
213 final HttpAsyncExpectationVerifier expectationVerifier,
214 final ExceptionLogger exceptionLogger) {
215 super();
216 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
217 this.connectionStrategy = connStrategy != null ? connStrategy :
218 DefaultConnectionReuseStrategy.INSTANCE;
219 this.responseFactory = responseFactory != null ? responseFactory :
220 DefaultHttpResponseFactory.INSTANCE;
221 this.handlerMapper = handlerMapper;
222 this.expectationVerifier = expectationVerifier;
223 this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
224 }
225
226
227
228
229
230
231
232
233
234 public HttpAsyncService(
235 final HttpProcessor httpProcessor,
236 final HttpAsyncRequestHandlerMapper handlerMapper) {
237 this(httpProcessor, null, null, handlerMapper, null);
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252 public HttpAsyncService(
253 final HttpProcessor httpProcessor,
254 final HttpAsyncRequestHandlerMapper handlerMapper,
255 final ExceptionLogger exceptionLogger) {
256 this(httpProcessor, null, null, handlerMapper, null, exceptionLogger);
257 }
258
259 @Override
260 public void connected(final NHttpServerConnection conn) {
261 final State state = new State();
262 conn.getContext().setAttribute(HTTP_EXCHANGE_STATE, state);
263 }
264
265 @Override
266 public void closed(final NHttpServerConnection conn) {
267 final State state = (State) conn.getContext().removeAttribute(HTTP_EXCHANGE_STATE);
268 if (state != null) {
269 state.setTerminated();
270 closeHandlers(state);
271 final Cancellable cancellable = state.getCancellable();
272 if (cancellable != null) {
273 cancellable.cancel();
274 }
275 }
276 }
277
278 @Override
279 public void exception(
280 final NHttpServerConnection conn, final Exception cause) {
281 log(cause);
282 final State state = getState(conn);
283 if (state == null) {
284 shutdownConnection(conn);
285 return;
286 }
287 state.setTerminated();
288 closeHandlers(state, cause);
289 try {
290 final Cancellable cancellable = state.getCancellable();
291 if (cancellable != null) {
292 cancellable.cancel();
293 }
294 if (cause instanceof SocketException || cause.getClass() == IOException.class) {
295
296 conn.shutdown();
297 return;
298 }
299 if (cause instanceof SocketTimeoutException) {
300
301 conn.close();
302 return;
303 }
304
305 if (conn.isResponseSubmitted() || state.getResponseState().compareTo(MessageState.INIT) > 0) {
306
307 conn.close();
308 return;
309 }
310 HttpRequest request = conn.getHttpRequest();
311 if (request == null) {
312 final Incoming incoming = state.getIncoming();
313 if (incoming != null) {
314 request = incoming.getRequest();
315 }
316 }
317 if (request == null) {
318 final Queue<PipelineEntry> pipeline = state.getPipeline();
319 final PipelineEntry pipelineEntry = pipeline.poll();
320 if (pipelineEntry != null) {
321 request = pipelineEntry.getRequest();
322 }
323 }
324 if (request != null) {
325 conn.resetInput();
326 final HttpCoreContext context = HttpCoreContext.create();
327 final HttpAsyncResponseProducer responseProducer = handleException(cause, context);
328 final HttpResponse response = responseProducer.generateResponse();
329 final Outgoing outgoing = new Outgoing(request, response, responseProducer, context);
330 state.setResponseState(MessageState.INIT);
331 state.setOutgoing(outgoing);
332 commitFinalResponse(conn, state);
333 return;
334 }
335 conn.close();
336 } catch (final Exception ex) {
337 shutdownConnection(conn);
338 closeHandlers(state);
339 if (ex instanceof RuntimeException) {
340 throw (RuntimeException) ex;
341 }
342 log(ex);
343 }
344 }
345
346 protected HttpResponse createHttpResponse(final int status, final HttpContext context) {
347 return this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_1, status, context);
348 }
349
350 @Override
351 public void requestReceived(
352 final NHttpServerConnection conn) throws IOException, HttpException {
353 final State state = getState(conn);
354 Asserts.notNull(state, "Connection state");
355 Asserts.check(state.getRequestState() == MessageState.READY,
356 "Unexpected request state %s", state.getRequestState());
357
358 final HttpRequest request = conn.getHttpRequest();
359 final HttpContext context = new BasicHttpContext();
360
361 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
362 context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);
363 this.httpProcessor.process(request, context);
364
365 final HttpAsyncRequestHandler<Object> requestHandler = getRequestHandler(request);
366 final HttpAsyncRequestConsumer<Object> consumer = requestHandler.processRequest(request, context);
367 consumer.requestReceived(request);
368
369 final Incoming incoming = new Incoming(request, requestHandler, consumer, context);
370 state.setIncoming(incoming);
371
372 if (request instanceof HttpEntityEnclosingRequest) {
373
374
375
376 if (((HttpEntityEnclosingRequest) request).expectContinue()
377 && state.getResponseState() == MessageState.READY
378 && state.getPipeline().isEmpty()
379 && !(conn instanceof SessionBufferStatusrg/apache/http/nio/reactor/SessionBufferStatus.html#SessionBufferStatus">SessionBufferStatus && ((SessionBufferStatus) conn).hasBufferedInput())) {
380
381 state.setRequestState(MessageState.ACK_EXPECTED);
382 final HttpResponse ack = createHttpResponse(HttpStatus.SC_CONTINUE, context);
383 if (this.expectationVerifier != null) {
384 conn.suspendInput();
385 conn.suspendOutput();
386 final HttpAsyncExchange httpAsyncExchange = new HttpAsyncExchangeImpl(
387 request, ack, state, conn, context);
388 this.expectationVerifier.verify(httpAsyncExchange, context);
389 } else {
390 conn.submitResponse(ack);
391 state.setRequestState(MessageState.BODY_STREAM);
392 }
393 } else {
394 state.setRequestState(MessageState.BODY_STREAM);
395 }
396 } else {
397
398 completeRequest(incoming, conn, state);
399 }
400 }
401
402 @Override
403 public void inputReady(
404 final NHttpServerConnection conn,
405 final ContentDecoder decoder) throws IOException, HttpException {
406 final State state = getState(conn);
407 Asserts.notNull(state, "Connection state");
408 Asserts.check(state.getRequestState() == MessageState.BODY_STREAM,
409 "Unexpected request state %s", state.getRequestState());
410
411 final Incoming incoming = state.getIncoming();
412 Asserts.notNull(incoming, "Incoming request");
413 final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
414 consumer.consumeContent(decoder, conn);
415 if (decoder.isCompleted()) {
416 completeRequest(incoming, conn, state);
417 }
418 }
419
420 @Override
421 public void responseReady(
422 final NHttpServerConnection conn) throws IOException, HttpException {
423 final State state = getState(conn);
424 Asserts.notNull(state, "Connection state");
425 Asserts.check(state.getResponseState() == MessageState.READY ||
426 state.getResponseState() == MessageState.INIT,
427 "Unexpected response state %s", state.getResponseState());
428
429 if (state.getRequestState() == MessageState.ACK_EXPECTED) {
430 final Outgoing outgoing;
431 synchronized (state) {
432 outgoing = state.getOutgoing();
433 if (outgoing == null) {
434 conn.suspendOutput();
435 return;
436 }
437 }
438 final HttpResponse response = outgoing.getResponse();
439 final int status = response.getStatusLine().getStatusCode();
440 if (status == 100) {
441 final HttpContext context = outgoing.getContext();
442 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
443 try {
444
445 response.setEntity(null);
446 conn.requestInput();
447 state.setRequestState(MessageState.BODY_STREAM);
448 state.setOutgoing(null);
449 conn.submitResponse(response);
450 responseProducer.responseCompleted(context);
451 } finally {
452 responseProducer.close();
453 }
454 } else if (status >= 400) {
455 conn.resetInput();
456 state.setRequestState(MessageState.READY);
457 commitFinalResponse(conn, state);
458 } else {
459 throw new HttpException("Invalid response: " + response.getStatusLine());
460 }
461 } else {
462 if (state.getResponseState() == MessageState.READY) {
463 final Queue<PipelineEntry> pipeline = state.getPipeline();
464 final PipelineEntry pipelineEntry = pipeline.poll();
465 if (pipelineEntry == null) {
466 conn.suspendOutput();
467 return;
468 }
469 state.setResponseState(MessageState.INIT);
470 final Object result = pipelineEntry.getResult();
471 final HttpRequest request = pipelineEntry.getRequest();
472 final HttpContext context = pipelineEntry.getContext();
473 final HttpResponse response = createHttpResponse(HttpStatus.SC_OK, context);
474 final HttpAsyncExchangeImpl httpExchange = new HttpAsyncExchangeImpl(
475 request, response, state, conn, context);
476 if (result != null) {
477 final HttpAsyncRequestHandler<Object> handler = pipelineEntry.getHandler();
478 conn.suspendOutput();
479 try {
480 handler.handle(result, httpExchange, context);
481 } catch (final RuntimeException ex) {
482 throw ex;
483 } catch (final Exception ex) {
484 if (!httpExchange.isCompleted()) {
485 httpExchange.submitResponse(handleException(ex, context));
486 } else {
487 log(ex);
488 conn.close();
489 }
490 return;
491 }
492 } else {
493 final Exception exception = pipelineEntry.getException();
494 final HttpAsyncResponseProducer responseProducer = handleException(
495 exception != null ? exception : new HttpException("Internal error processing request"),
496 context);
497 httpExchange.submitResponse(responseProducer);
498 }
499 }
500 if (state.getResponseState() == MessageState.INIT) {
501 final Outgoing outgoing;
502 synchronized (state) {
503 outgoing = state.getOutgoing();
504 if (outgoing == null) {
505 conn.suspendOutput();
506 return;
507 }
508 }
509 final HttpResponse response = outgoing.getResponse();
510 final int status = response.getStatusLine().getStatusCode();
511 if (status >= 200) {
512 commitFinalResponse(conn, state);
513 } else {
514 throw new HttpException("Invalid response: " + response.getStatusLine());
515 }
516 }
517 }
518 }
519
520 @Override
521 public void outputReady(
522 final NHttpServerConnection conn,
523 final ContentEncoder encoder) throws HttpException, IOException {
524 final State state = getState(conn);
525 Asserts.notNull(state, "Connection state");
526 Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
527 "Unexpected response state %s", state.getResponseState());
528
529 final Outgoing outgoing = state.getOutgoing();
530 Asserts.notNull(outgoing, "Outgoing response");
531 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
532
533 responseProducer.produceContent(encoder, conn);
534
535 if (encoder.isCompleted()) {
536 completeResponse(outgoing, conn, state);
537 }
538 }
539
540 @Override
541 public void endOfInput(final NHttpServerConnection conn) throws IOException {
542
543
544
545
546 if (conn.getSocketTimeout() <= 0) {
547 conn.setSocketTimeout(1000);
548 }
549 conn.close();
550 }
551
552 @Override
553 public void timeout(final NHttpServerConnection conn) throws IOException {
554 final State state = getState(conn);
555 if (state != null) {
556 closeHandlers(state, new SocketTimeoutException(
557 String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
558 }
559 if (conn.getStatus() == NHttpConnection.ACTIVE) {
560 conn.close();
561 if (conn.getStatus() == NHttpConnection.CLOSING) {
562
563
564 conn.setSocketTimeout(250);
565 }
566 } else {
567 conn.shutdown();
568 }
569 }
570
571 private State getState(final NHttpConnection conn) {
572 return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
573 }
574
575
576
577
578
579
580
581
582 protected void log(final Exception ex) {
583 this.exceptionLogger.log(ex);
584 }
585
586 private void shutdownConnection(final NHttpConnection conn) {
587 try {
588 conn.shutdown();
589 } catch (final IOException ex) {
590 log(ex);
591 }
592 }
593
594 private void closeHandlers(final State state, final Exception ex) {
595 final HttpAsyncRequestConsumer<Object> consumer =
596 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
597 if (consumer != null) {
598 try {
599 consumer.failed(ex);
600 } finally {
601 try {
602 consumer.close();
603 } catch (final IOException ioex) {
604 log(ioex);
605 }
606 }
607 }
608 final HttpAsyncResponseProducer producer =
609 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
610 if (producer != null) {
611 try {
612 producer.failed(ex);
613 } finally {
614 try {
615 producer.close();
616 } catch (final IOException ioex) {
617 log(ioex);
618 }
619 }
620 }
621 }
622
623 private void closeHandlers(final State state) {
624 final HttpAsyncRequestConsumer<Object> consumer =
625 state.getIncoming() != null ? state.getIncoming().getConsumer() : null;
626 if (consumer != null) {
627 try {
628 consumer.close();
629 } catch (final IOException ioex) {
630 log(ioex);
631 }
632 }
633 final HttpAsyncResponseProducer producer =
634 state.getOutgoing() != null ? state.getOutgoing().getProducer() : null;
635 if (producer != null) {
636 try {
637 producer.close();
638 } catch (final IOException ioex) {
639 log(ioex);
640 }
641 }
642 }
643
644 protected HttpAsyncResponseProducer handleException(
645 final Exception ex, final HttpContext context) {
646 String message = ex.getMessage();
647 if (message == null) {
648 message = ex.toString();
649 }
650 final HttpResponse response = createHttpResponse(toStatusCode(ex, context), context);
651 return new ErrorResponseProducer(response, new NStringEntity(message, ContentType.DEFAULT_TEXT), false);
652 }
653
654 protected int toStatusCode(final Exception ex, final HttpContext context) {
655 final int code;
656 if (ex instanceof MethodNotSupportedException) {
657 code = HttpStatus.SC_NOT_IMPLEMENTED;
658 } else if (ex instanceof UnsupportedHttpVersionException) {
659 code = HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED;
660 } else if (ex instanceof ProtocolException) {
661 code = HttpStatus.SC_BAD_REQUEST;
662 } else {
663 code = HttpStatus.SC_INTERNAL_SERVER_ERROR;
664 }
665 return code;
666 }
667
668
669
670
671
672
673
674
675
676
677 protected void handleAlreadySubmittedResponse(
678 final Cancellable cancellable, final HttpContext context) {
679 throw new IllegalStateException("Response already submitted");
680 }
681
682
683
684
685
686
687
688
689
690 protected void handleAlreadySubmittedResponse(
691 final HttpAsyncResponseProducer responseProducer,
692 final HttpContext context) {
693 throw new IllegalStateException("Response already submitted");
694 }
695
696 private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
697 if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
698 return false;
699 }
700 final int status = response.getStatusLine().getStatusCode();
701 return status >= HttpStatus.SC_OK
702 && status != HttpStatus.SC_NO_CONTENT
703 && status != HttpStatus.SC_NOT_MODIFIED
704 && status != HttpStatus.SC_RESET_CONTENT;
705 }
706
707 private void completeRequest(
708 final Incoming incoming,
709 final NHttpServerConnection conn,
710 final State state) throws IOException {
711 state.setRequestState(MessageState.READY);
712 state.setIncoming(null);
713
714 final PipelineEntry pipelineEntry;
715 final HttpAsyncRequestConsumer<?> consumer = incoming.getConsumer();
716 try {
717 final HttpContext context = incoming.getContext();
718 consumer.requestCompleted(context);
719 pipelineEntry = new PipelineEntry(
720 incoming.getRequest(),
721 consumer.getResult(),
722 consumer.getException(),
723 incoming.getHandler(),
724 context);
725 } finally {
726 consumer.close();
727 }
728 final Queue<PipelineEntry> pipeline = state.getPipeline();
729 pipeline.add(pipelineEntry);
730 if (state.getResponseState() == MessageState.READY) {
731 conn.requestOutput();
732 }
733 }
734
735 private void commitFinalResponse(
736 final NHttpServerConnection conn,
737 final State state) throws IOException, HttpException {
738 final Outgoing outgoing = state.getOutgoing();
739 Asserts.notNull(outgoing, "Outgoing response");
740 final HttpRequest request = outgoing.getRequest();
741 final HttpResponse response = outgoing.getResponse();
742 final HttpContext context = outgoing.getContext();
743
744 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
745 this.httpProcessor.process(response, context);
746
747 HttpEntity entity = response.getEntity();
748 if (entity != null && !canResponseHaveBody(request, response)) {
749 response.setEntity(null);
750 entity = null;
751 }
752
753 conn.submitResponse(response);
754
755 if (entity == null) {
756 completeResponse(outgoing, conn, state);
757 } else {
758 state.setResponseState(MessageState.BODY_STREAM);
759 }
760 }
761
762 private void completeResponse(
763 final Outgoing outgoing,
764 final NHttpServerConnection conn,
765 final State state) throws IOException {
766 final HttpContext context = outgoing.getContext();
767 final HttpResponse response = outgoing.getResponse();
768 final HttpAsyncResponseProducer responseProducer = outgoing.getProducer();
769 try {
770 responseProducer.responseCompleted(context);
771 state.setOutgoing(null);
772 state.setCancellable(null);
773 state.setResponseState(MessageState.READY);
774 } finally {
775 responseProducer.close();
776 }
777 if (!this.connectionStrategy.keepAlive(response, context)) {
778 conn.close();
779 } else {
780 conn.requestInput();
781 }
782 }
783
784 @SuppressWarnings("unchecked")
785 private HttpAsyncRequestHandler<Object> getRequestHandler(final HttpRequest request) {
786 HttpAsyncRequestHandler<Object> handler = null;
787 if (this.handlerMapper != null) {
788 handler = (HttpAsyncRequestHandler<Object>) this.handlerMapper.lookup(request);
789 }
790 if (handler == null) {
791 handler = NullRequestHandler.INSTANCE;
792 }
793 return handler;
794 }
795
796 static class Incoming {
797
798 private final HttpRequest request;
799 private final HttpAsyncRequestHandler<Object> handler;
800 private final HttpAsyncRequestConsumer<Object> consumer;
801 private final HttpContext context;
802
803 Incoming(
804 final HttpRequest request,
805 final HttpAsyncRequestHandler<Object> handler,
806 final HttpAsyncRequestConsumer<Object> consumer,
807 final HttpContext context) {
808 this.request = request;
809 this.handler = handler;
810 this.consumer = consumer;
811 this.context = context;
812 }
813
814 public HttpRequest getRequest() {
815 return this.request;
816 }
817
818 public HttpAsyncRequestHandler<Object> getHandler() {
819 return this.handler;
820 }
821
822 public HttpAsyncRequestConsumer<Object> getConsumer() {
823 return this.consumer;
824 }
825
826 public HttpContext getContext() {
827 return this.context;
828 }
829 }
830
831 static class Outgoing {
832
833 private final HttpRequest request;
834 private final HttpResponse response;
835 private final HttpAsyncResponseProducer producer;
836 private final HttpContext context;
837
838 Outgoing(
839 final HttpRequest request,
840 final HttpResponse response,
841 final HttpAsyncResponseProducer producer,
842 final HttpContext context) {
843 this.request = request;
844 this.response = response;
845 this.producer = producer;
846 this.context = context;
847 }
848
849 public HttpRequest getRequest() {
850 return this.request;
851 }
852
853 public HttpResponse getResponse() {
854 return this.response;
855 }
856
857 public HttpAsyncResponseProducer getProducer() {
858 return this.producer;
859 }
860
861 public HttpContext getContext() {
862 return this.context;
863 }
864 }
865
866 static class PipelineEntry {
867
868 private final HttpRequest request;
869 private final Object result;
870 private final Exception exception;
871 private final HttpAsyncRequestHandler<Object> handler;
872 private final HttpContext context;
873
874 PipelineEntry(
875 final HttpRequest request,
876 final Object result,
877 final Exception exception,
878 final HttpAsyncRequestHandler<Object> handler,
879 final HttpContext context) {
880 this.request = request;
881 this.result = result;
882 this.exception = exception;
883 this.handler = handler;
884 this.context = context;
885 }
886
887 public HttpRequest getRequest() {
888 return this.request;
889 }
890
891 public Object getResult() {
892 return this.result;
893 }
894
895 public Exception getException() {
896 return this.exception;
897 }
898
899 public HttpAsyncRequestHandler<Object> getHandler() {
900 return this.handler;
901 }
902
903 public HttpContext getContext() {
904 return this.context;
905 }
906
907 }
908
909 static class State {
910
911 private final Queue<PipelineEntry> pipeline;
912 private volatile boolean terminated;
913 private volatile MessageState requestState;
914 private volatile MessageState responseState;
915 private volatile Incoming incoming;
916 private volatile Outgoing outgoing;
917 private volatile Cancellable cancellable;
918
919 State() {
920 super();
921 this.pipeline = new ConcurrentLinkedQueue<PipelineEntry>();
922 this.requestState = MessageState.READY;
923 this.responseState = MessageState.READY;
924 }
925
926 public boolean isTerminated() {
927 return this.terminated;
928 }
929
930 public void setTerminated() {
931 this.terminated = true;
932 }
933
934 public MessageState getRequestState() {
935 return this.requestState;
936 }
937
938 public void setRequestState(final MessageState state) {
939 this.requestState = state;
940 }
941
942 public MessageState getResponseState() {
943 return this.responseState;
944 }
945
946 public void setResponseState(final MessageState state) {
947 this.responseState = state;
948 }
949
950 public Incoming getIncoming() {
951 return this.incoming;
952 }
953
954 public void setIncoming(final Incoming incoming) {
955 this.incoming = incoming;
956 }
957
958 public Outgoing getOutgoing() {
959 return this.outgoing;
960 }
961
962 public void setOutgoing(final Outgoing outgoing) {
963 this.outgoing = outgoing;
964 }
965
966 public Cancellable getCancellable() {
967 return this.cancellable;
968 }
969
970 public void setCancellable(final Cancellable cancellable) {
971 this.cancellable = cancellable;
972 }
973
974 public Queue<PipelineEntry> getPipeline() {
975 return this.pipeline;
976 }
977
978 @Override
979 public String toString() {
980 final StringBuilder buf = new StringBuilder();
981 buf.append("[incoming ");
982 buf.append(this.requestState);
983 if (this.incoming != null) {
984 buf.append(" ");
985 buf.append(this.incoming.getRequest().getRequestLine());
986 }
987 buf.append("; outgoing ");
988 buf.append(this.responseState);
989 if (this.outgoing != null) {
990 buf.append(" ");
991 buf.append(this.outgoing.getResponse().getStatusLine());
992 }
993 buf.append("]");
994 return buf.toString();
995 }
996
997 }
998
999 class HttpAsyncExchangeImpl implements HttpAsyncExchange {
1000
1001 private final AtomicBoolean completed = new AtomicBoolean();
1002 private final HttpRequest request;
1003 private final HttpResponse response;
1004 private final State state;
1005 private final NHttpServerConnection conn;
1006 private final HttpContext context;
1007
1008 public HttpAsyncExchangeImpl(
1009 final HttpRequest request,
1010 final HttpResponse response,
1011 final State state,
1012 final NHttpServerConnection conn,
1013 final HttpContext context) {
1014 super();
1015 this.request = request;
1016 this.response = response;
1017 this.state = state;
1018 this.conn = conn;
1019 this.context = context;
1020 }
1021
1022 @Override
1023 public HttpRequest getRequest() {
1024 return this.request;
1025 }
1026
1027 @Override
1028 public HttpResponse getResponse() {
1029 return this.response;
1030 }
1031
1032 @Override
1033 public void setCallback(final Cancellable cancellable) {
1034 if (this.completed.get()) {
1035 handleAlreadySubmittedResponse(cancellable, context);
1036 } else if (this.state.isTerminated() && cancellable != null) {
1037 cancellable.cancel();
1038 } else {
1039 this.state.setCancellable(cancellable);
1040 }
1041 }
1042
1043 @Override
1044 public void submitResponse(final HttpAsyncResponseProducer responseProducer) {
1045 Args.notNull(responseProducer, "Response producer");
1046 if (this.completed.getAndSet(true)) {
1047 handleAlreadySubmittedResponse(responseProducer, context);
1048 } else if (!this.state.isTerminated()) {
1049 final HttpResponse response = responseProducer.generateResponse();
1050 final Outgoing outgoing = new Outgoing(
1051 this.request, response, responseProducer, this.context);
1052
1053 synchronized (this.state) {
1054 this.state.setOutgoing(outgoing);
1055 this.state.setCancellable(null);
1056 this.conn.requestOutput();
1057 }
1058
1059 } else {
1060 try {
1061 responseProducer.close();
1062 } catch (final IOException ex) {
1063 log(ex);
1064 }
1065 }
1066 }
1067
1068 @Override
1069 public void submitResponse() {
1070 submitResponse(new BasicAsyncResponseProducer(this.response));
1071 }
1072
1073 @Override
1074 public boolean isCompleted() {
1075 return this.completed.get();
1076 }
1077
1078 @Override
1079 public void setTimeout(final int timeout) {
1080 this.conn.setSocketTimeout(timeout);
1081 }
1082
1083 @Override
1084 public int getTimeout() {
1085 return this.conn.getSocketTimeout();
1086 }
1087
1088 }
1089
1090
1091
1092
1093
1094
1095 @Deprecated
1096 private static class HttpAsyncRequestHandlerResolverAdapter implements HttpAsyncRequestHandlerMapper {
1097
1098 private final HttpAsyncRequestHandlerResolver resolver;
1099
1100 public HttpAsyncRequestHandlerResolverAdapter(final HttpAsyncRequestHandlerResolver resolver) {
1101 this.resolver = resolver;
1102 }
1103
1104 @Override
1105 public HttpAsyncRequestHandler<?> lookup(final HttpRequest request) {
1106 return resolver.lookup(request.getRequestLine().getUri());
1107 }
1108
1109 }
1110
1111
1112
1113
1114
1115
1116
1117 public HttpResponseFactory getResponseFactory() {
1118 return responseFactory;
1119 }
1120
1121
1122
1123
1124
1125
1126
1127 public HttpProcessor getHttpProcessor() {
1128 return httpProcessor;
1129 }
1130
1131
1132
1133
1134
1135
1136
1137 public ConnectionReuseStrategy getConnectionStrategy() {
1138 return connectionStrategy;
1139 }
1140
1141
1142
1143
1144
1145
1146
1147 public HttpAsyncRequestHandlerMapper getHandlerMapper() {
1148 return handlerMapper;
1149 }
1150
1151
1152
1153
1154
1155
1156
1157 public HttpAsyncExpectationVerifier getExpectationVerifier() {
1158 return expectationVerifier;
1159 }
1160
1161
1162
1163
1164
1165
1166
1167 public ExceptionLogger getExceptionLogger() {
1168 return exceptionLogger;
1169 }
1170
1171 }