1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.concurrent.Executor;
33
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpEntity;
36 import org.apache.http.HttpEntityEnclosingRequest;
37 import org.apache.http.HttpException;
38 import org.apache.http.HttpRequest;
39 import org.apache.http.HttpResponse;
40 import org.apache.http.HttpResponseFactory;
41 import org.apache.http.HttpStatus;
42 import org.apache.http.HttpVersion;
43 import org.apache.http.MethodNotSupportedException;
44 import org.apache.http.ProtocolException;
45 import org.apache.http.ProtocolVersion;
46 import org.apache.http.UnsupportedHttpVersionException;
47 import org.apache.http.annotation.ThreadingBehavior;
48 import org.apache.http.annotation.Contract;
49 import org.apache.http.entity.ByteArrayEntity;
50 import org.apache.http.nio.ContentDecoder;
51 import org.apache.http.nio.ContentEncoder;
52 import org.apache.http.nio.IOControl;
53 import org.apache.http.nio.NHttpConnection;
54 import org.apache.http.nio.NHttpServerConnection;
55 import org.apache.http.nio.NHttpServiceHandler;
56 import org.apache.http.nio.entity.ContentBufferEntity;
57 import org.apache.http.nio.entity.ContentOutputStream;
58 import org.apache.http.nio.params.NIOReactorPNames;
59 import org.apache.http.nio.util.ByteBufferAllocator;
60 import org.apache.http.nio.util.ContentInputBuffer;
61 import org.apache.http.nio.util.ContentOutputBuffer;
62 import org.apache.http.nio.util.DirectByteBufferAllocator;
63 import org.apache.http.nio.util.SharedInputBuffer;
64 import org.apache.http.nio.util.SharedOutputBuffer;
65 import org.apache.http.params.DefaultedHttpParams;
66 import org.apache.http.params.HttpParams;
67 import org.apache.http.protocol.ExecutionContext;
68 import org.apache.http.protocol.HttpContext;
69 import org.apache.http.protocol.HttpExpectationVerifier;
70 import org.apache.http.protocol.HttpProcessor;
71 import org.apache.http.protocol.HttpRequestHandler;
72 import org.apache.http.protocol.HttpRequestHandlerResolver;
73 import org.apache.http.util.Args;
74 import org.apache.http.util.EncodingUtils;
75 import org.apache.http.util.EntityUtils;
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 @Deprecated
113 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
114 public class ThrottlingHttpServiceHandler extends NHttpHandlerBase
115 implements NHttpServiceHandler {
116
117 protected final HttpResponseFactory responseFactory;
118 protected final Executor executor;
119
120 protected HttpRequestHandlerResolver handlerResolver;
121 protected HttpExpectationVerifier expectationVerifier;
122
123 private final int bufsize;
124
125 public ThrottlingHttpServiceHandler(
126 final HttpProcessor httpProcessor,
127 final HttpResponseFactory responseFactory,
128 final ConnectionReuseStrategy connStrategy,
129 final ByteBufferAllocator allocator,
130 final Executor executor,
131 final HttpParams params) {
132 super(httpProcessor, connStrategy, allocator, params);
133 Args.notNull(responseFactory, "Response factory");
134 Args.notNull(executor, "Executor");
135 this.responseFactory = responseFactory;
136 this.executor = executor;
137 this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
138 }
139
140 public ThrottlingHttpServiceHandler(
141 final HttpProcessor httpProcessor,
142 final HttpResponseFactory responseFactory,
143 final ConnectionReuseStrategy connStrategy,
144 final Executor executor,
145 final HttpParams params) {
146 this(httpProcessor, responseFactory, connStrategy,
147 DirectByteBufferAllocator.INSTANCE, executor, params);
148 }
149
150 public void setHandlerResolver(final HttpRequestHandlerResolver handlerResolver) {
151 this.handlerResolver = handlerResolver;
152 }
153
154 public void setExpectationVerifier(final HttpExpectationVerifier expectationVerifier) {
155 this.expectationVerifier = expectationVerifier;
156 }
157
158 @Override
159 public void connected(final NHttpServerConnection conn) {
160 final HttpContext context = conn.getContext();
161
162 final ServerConnState connState = new ServerConnState(this.bufsize, conn, allocator);
163 context.setAttribute(CONN_STATE, connState);
164
165 if (this.eventListener != null) {
166 this.eventListener.connectionOpen(conn);
167 }
168 }
169
170 @Override
171 public void closed(final NHttpServerConnection conn) {
172 final HttpContext context = conn.getContext();
173 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
174
175 if (connState != null) {
176 synchronized (connState) {
177 connState.close();
178 connState.notifyAll();
179 }
180 }
181
182 if (this.eventListener != null) {
183 this.eventListener.connectionClosed(conn);
184 }
185 }
186
187 @Override
188 public void exception(final NHttpServerConnection conn, final HttpException httpex) {
189 if (conn.isResponseSubmitted()) {
190 if (eventListener != null) {
191 eventListener.fatalProtocolException(httpex, conn);
192 }
193 return;
194 }
195
196 final HttpContext context = conn.getContext();
197
198 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
199
200 try {
201
202 final HttpResponse response = this.responseFactory.newHttpResponse(
203 HttpVersion.HTTP_1_0,
204 HttpStatus.SC_INTERNAL_SERVER_ERROR,
205 context);
206 response.setParams(
207 new DefaultedHttpParams(response.getParams(), this.params));
208 handleException(httpex, response);
209 response.setEntity(null);
210
211 this.httpProcessor.process(response, context);
212
213 synchronized (connState) {
214 connState.setResponse(response);
215
216 conn.requestOutput();
217 }
218
219 } catch (final IOException ex) {
220 shutdownConnection(conn, ex);
221 if (eventListener != null) {
222 eventListener.fatalIOException(ex, conn);
223 }
224 } catch (final HttpException ex) {
225 closeConnection(conn, ex);
226 if (eventListener != null) {
227 eventListener.fatalProtocolException(ex, conn);
228 }
229 }
230 }
231
232 @Override
233 public void exception(final NHttpServerConnection conn, final IOException ex) {
234 shutdownConnection(conn, ex);
235
236 if (this.eventListener != null) {
237 this.eventListener.fatalIOException(ex, conn);
238 }
239 }
240
241 @Override
242 public void timeout(final NHttpServerConnection conn) {
243 handleTimeout(conn);
244 }
245
246 @Override
247 public void requestReceived(final NHttpServerConnection conn) {
248 final HttpContext context = conn.getContext();
249
250 final HttpRequest request = conn.getHttpRequest();
251 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
252
253 synchronized (connState) {
254 boolean contentExpected = false;
255 if (request instanceof HttpEntityEnclosingRequest) {
256 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
257 if (entity != null) {
258 contentExpected = true;
259 }
260 }
261
262 if (!contentExpected) {
263 conn.suspendInput();
264 }
265
266 this.executor.execute(new Runnable() {
267
268 @Override
269 public void run() {
270 try {
271
272 handleRequest(request, connState, conn);
273
274 } catch (final IOException ex) {
275 shutdownConnection(conn, ex);
276 if (eventListener != null) {
277 eventListener.fatalIOException(ex, conn);
278 }
279 } catch (final HttpException ex) {
280 shutdownConnection(conn, ex);
281 if (eventListener != null) {
282 eventListener.fatalProtocolException(ex, conn);
283 }
284 }
285 }
286
287 });
288
289 connState.notifyAll();
290 }
291
292 }
293
294 @Override
295 public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
296 final HttpContext context = conn.getContext();
297
298 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
299
300 try {
301
302 synchronized (connState) {
303 final ContentInputBuffer buffer = connState.getInbuffer();
304
305 buffer.consumeContent(decoder);
306 if (decoder.isCompleted()) {
307 connState.setInputState(ServerConnState.REQUEST_BODY_DONE);
308 } else {
309 connState.setInputState(ServerConnState.REQUEST_BODY_STREAM);
310 }
311
312 connState.notifyAll();
313 }
314
315 } catch (final IOException ex) {
316 shutdownConnection(conn, ex);
317 if (this.eventListener != null) {
318 this.eventListener.fatalIOException(ex, conn);
319 }
320 }
321
322 }
323
324 @Override
325 public void responseReady(final NHttpServerConnection conn) {
326 final HttpContext context = conn.getContext();
327
328 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
329
330 try {
331
332 synchronized (connState) {
333 if (connState.isExpectationFailed()) {
334
335
336
337 conn.resetInput();
338 connState.setExpectationFailed(false);
339 }
340
341 final HttpResponse response = connState.getResponse();
342 if (connState.getOutputState() == ServerConnState.READY
343 && response != null
344 && !conn.isResponseSubmitted()) {
345
346 conn.submitResponse(response);
347 final int statusCode = response.getStatusLine().getStatusCode();
348 final HttpEntity entity = response.getEntity();
349
350 if (statusCode >= 200 && entity == null) {
351 connState.setOutputState(ServerConnState.RESPONSE_DONE);
352
353 if (!this.connStrategy.keepAlive(response, context)) {
354 conn.close();
355 }
356 } else {
357 connState.setOutputState(ServerConnState.RESPONSE_SENT);
358 }
359 }
360
361 connState.notifyAll();
362 }
363
364 } catch (final IOException ex) {
365 shutdownConnection(conn, ex);
366 if (eventListener != null) {
367 eventListener.fatalIOException(ex, conn);
368 }
369 } catch (final HttpException ex) {
370 closeConnection(conn, ex);
371 if (eventListener != null) {
372 eventListener.fatalProtocolException(ex, conn);
373 }
374 }
375 }
376
377 @Override
378 public void outputReady(final NHttpServerConnection conn, final ContentEncoder encoder) {
379 final HttpContext context = conn.getContext();
380
381 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
382
383 try {
384
385 synchronized (connState) {
386 final HttpResponse response = connState.getResponse();
387 final ContentOutputBuffer buffer = connState.getOutbuffer();
388
389 buffer.produceContent(encoder);
390 if (encoder.isCompleted()) {
391 connState.setOutputState(ServerConnState.RESPONSE_BODY_DONE);
392
393 if (!this.connStrategy.keepAlive(response, context)) {
394 conn.close();
395 }
396 } else {
397 connState.setOutputState(ServerConnState.RESPONSE_BODY_STREAM);
398 }
399
400 connState.notifyAll();
401 }
402
403 } catch (final IOException ex) {
404 shutdownConnection(conn, ex);
405 if (this.eventListener != null) {
406 this.eventListener.fatalIOException(ex, conn);
407 }
408 }
409 }
410
411 private void handleException(final HttpException ex, final HttpResponse response) {
412 if (ex instanceof MethodNotSupportedException) {
413 response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
414 } else if (ex instanceof UnsupportedHttpVersionException) {
415 response.setStatusCode(HttpStatus.SC_HTTP_VERSION_NOT_SUPPORTED);
416 } else if (ex instanceof ProtocolException) {
417 response.setStatusCode(HttpStatus.SC_BAD_REQUEST);
418 } else {
419 response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
420 }
421 final byte[] msg = EncodingUtils.getAsciiBytes(ex.getMessage());
422 final ByteArrayEntity entity = new ByteArrayEntity(msg);
423 entity.setContentType("text/plain; charset=US-ASCII");
424 response.setEntity(entity);
425 }
426
427 private void handleRequest(
428 final HttpRequest request,
429 final ServerConnState connState,
430 final NHttpServerConnection conn) throws HttpException, IOException {
431
432 final HttpContext context = conn.getContext();
433
434
435
436 synchronized (connState) {
437 try {
438 for (;;) {
439 final int currentState = connState.getOutputState();
440 if (currentState == ServerConnState.READY) {
441 break;
442 }
443 if (currentState == ServerConnState.SHUTDOWN) {
444 return;
445 }
446 connState.wait();
447 }
448 } catch (final InterruptedException ex) {
449 connState.shutdown();
450 return;
451 }
452 connState.setInputState(ServerConnState.REQUEST_RECEIVED);
453 connState.setRequest(request);
454 }
455
456 request.setParams(new DefaultedHttpParams(request.getParams(), this.params));
457
458 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
459 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
460
461 ProtocolVersion ver = request.getRequestLine().getProtocolVersion();
462
463 if (!ver.lessEquals(HttpVersion.HTTP_1_1)) {
464
465 ver = HttpVersion.HTTP_1_1;
466 }
467
468 HttpResponse response = null;
469
470 if (request instanceof HttpEntityEnclosingRequest) {
471 final HttpEntityEnclosingRequest eeRequest = (HttpEntityEnclosingRequest) request;
472
473 if (eeRequest.expectContinue()) {
474 response = this.responseFactory.newHttpResponse(
475 ver,
476 HttpStatus.SC_CONTINUE,
477 context);
478 response.setParams(
479 new DefaultedHttpParams(response.getParams(), this.params));
480 if (this.expectationVerifier != null) {
481 try {
482 this.expectationVerifier.verify(request, response, context);
483 } catch (final HttpException ex) {
484 response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0,
485 HttpStatus.SC_INTERNAL_SERVER_ERROR, context);
486 response.setParams(
487 new DefaultedHttpParams(response.getParams(), this.params));
488 handleException(ex, response);
489 }
490 }
491
492 synchronized (connState) {
493 if (response.getStatusLine().getStatusCode() < 200) {
494
495
496 connState.setResponse(response);
497 conn.requestOutput();
498
499
500 try {
501 for (;;) {
502 final int currentState = connState.getOutputState();
503 if (currentState == ServerConnState.RESPONSE_SENT) {
504 break;
505 }
506 if (currentState == ServerConnState.SHUTDOWN) {
507 return;
508 }
509 connState.wait();
510 }
511 } catch (final InterruptedException ex) {
512 connState.shutdown();
513 return;
514 }
515 connState.resetOutput();
516 response = null;
517 } else {
518
519 eeRequest.setEntity(null);
520 conn.suspendInput();
521 connState.setExpectationFailed(true);
522 }
523 }
524 }
525
526
527 if (eeRequest.getEntity() != null) {
528 eeRequest.setEntity(new ContentBufferEntity(
529 eeRequest.getEntity(),
530 connState.getInbuffer()));
531 }
532
533 }
534
535 if (response == null) {
536 response = this.responseFactory.newHttpResponse(
537 ver,
538 HttpStatus.SC_OK,
539 context);
540 response.setParams(
541 new DefaultedHttpParams(response.getParams(), this.params));
542
543 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
544
545 try {
546
547 this.httpProcessor.process(request, context);
548
549 HttpRequestHandler handler = null;
550 if (this.handlerResolver != null) {
551 final String requestURI = request.getRequestLine().getUri();
552 handler = this.handlerResolver.lookup(requestURI);
553 }
554 if (handler != null) {
555 handler.handle(request, response, context);
556 } else {
557 response.setStatusCode(HttpStatus.SC_NOT_IMPLEMENTED);
558 }
559
560 } catch (final HttpException ex) {
561 response = this.responseFactory.newHttpResponse(HttpVersion.HTTP_1_0,
562 HttpStatus.SC_INTERNAL_SERVER_ERROR, context);
563 response.setParams(
564 new DefaultedHttpParams(response.getParams(), this.params));
565 handleException(ex, response);
566 }
567 }
568
569 if (request instanceof HttpEntityEnclosingRequest) {
570 final HttpEntityEnclosingRequest eeRequest = (HttpEntityEnclosingRequest) request;
571 final HttpEntity entity = eeRequest.getEntity();
572 EntityUtils.consume(entity);
573 }
574
575
576 connState.resetInput();
577
578 this.httpProcessor.process(response, context);
579
580 if (!canResponseHaveBody(request, response)) {
581 response.setEntity(null);
582 }
583
584 connState.setResponse(response);
585
586 conn.requestOutput();
587
588 if (response.getEntity() != null) {
589 final ContentOutputBuffer buffer = connState.getOutbuffer();
590 final OutputStream outStream = new ContentOutputStream(buffer);
591
592 final HttpEntity entity = response.getEntity();
593 entity.writeTo(outStream);
594 outStream.flush();
595 outStream.close();
596 }
597
598 synchronized (connState) {
599 try {
600 for (;;) {
601 final int currentState = connState.getOutputState();
602 if (currentState == ServerConnState.RESPONSE_DONE) {
603 break;
604 }
605 if (currentState == ServerConnState.SHUTDOWN) {
606 return;
607 }
608 connState.wait();
609 }
610 } catch (final InterruptedException ex) {
611 connState.shutdown();
612 return;
613 }
614 connState.resetOutput();
615 conn.requestInput();
616 connState.notifyAll();
617 }
618 }
619
620 @Override
621 protected void shutdownConnection(final NHttpConnection conn, final Throwable cause) {
622 final HttpContext context = conn.getContext();
623
624 final ServerConnState connState = (ServerConnState) context.getAttribute(CONN_STATE);
625
626 super.shutdownConnection(conn, cause);
627
628 if (connState != null) {
629 connState.shutdown();
630 }
631 }
632
633 static class ServerConnState {
634
635 public static final int SHUTDOWN = -1;
636 public static final int READY = 0;
637 public static final int REQUEST_RECEIVED = 1;
638 public static final int REQUEST_BODY_STREAM = 2;
639 public static final int REQUEST_BODY_DONE = 4;
640 public static final int RESPONSE_SENT = 8;
641 public static final int RESPONSE_BODY_STREAM = 16;
642 public static final int RESPONSE_BODY_DONE = 32;
643 public static final int RESPONSE_DONE = 32;
644
645 private final SharedInputBuffer inBuffer;
646 private final SharedOutputBuffer outbuffer;
647
648 private volatile int inputState;
649 private volatile int outputState;
650
651 private volatile HttpRequest request;
652 private volatile HttpResponse response;
653
654 private volatile boolean expectationFailure;
655
656 public ServerConnState(
657 final int bufsize,
658 final IOControl ioControl,
659 final ByteBufferAllocator allocator) {
660 super();
661 this.inBuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
662 this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
663 this.inputState = READY;
664 this.outputState = READY;
665 }
666
667 public ContentInputBuffer getInbuffer() {
668 return this.inBuffer;
669 }
670
671 public ContentOutputBuffer getOutbuffer() {
672 return this.outbuffer;
673 }
674
675 public int getInputState() {
676 return this.inputState;
677 }
678
679 public void setInputState(final int inputState) {
680 this.inputState = inputState;
681 }
682
683 public int getOutputState() {
684 return this.outputState;
685 }
686
687 public void setOutputState(final int outputState) {
688 this.outputState = outputState;
689 }
690
691 public HttpRequest getRequest() {
692 return this.request;
693 }
694
695 public void setRequest(final HttpRequest request) {
696 this.request = request;
697 }
698
699 public HttpResponse getResponse() {
700 return this.response;
701 }
702
703 public void setResponse(final HttpResponse response) {
704 this.response = response;
705 }
706
707 public boolean isExpectationFailed() {
708 return expectationFailure;
709 }
710
711 public void setExpectationFailed(final boolean b) {
712 this.expectationFailure = b;
713 }
714
715 public void close() {
716 this.inBuffer.close();
717 this.outbuffer.close();
718 this.inputState = SHUTDOWN;
719 this.outputState = SHUTDOWN;
720 }
721
722 public void shutdown() {
723 this.inBuffer.shutdown();
724 this.outbuffer.shutdown();
725 this.inputState = SHUTDOWN;
726 this.outputState = SHUTDOWN;
727 }
728
729 public void resetInput() {
730 this.inBuffer.reset();
731 this.request = null;
732 this.inputState = READY;
733 }
734
735 public void resetOutput() {
736 this.outbuffer.reset();
737 this.response = null;
738 this.outputState = READY;
739 this.expectationFailure = false;
740 }
741
742 }
743
744 }