1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.util.concurrent.Executor;
33
34 import org.apache.http.ConnectionReuseStrategy;
35 import org.apache.http.HttpEntity;
36 import org.apache.http.HttpEntityEnclosingRequest;
37 import org.apache.http.HttpException;
38 import org.apache.http.HttpRequest;
39 import org.apache.http.HttpResponse;
40 import org.apache.http.HttpStatus;
41 import org.apache.http.annotation.ThreadingBehavior;
42 import org.apache.http.annotation.Contract;
43 import org.apache.http.nio.ContentDecoder;
44 import org.apache.http.nio.ContentEncoder;
45 import org.apache.http.nio.IOControl;
46 import org.apache.http.nio.NHttpClientConnection;
47 import org.apache.http.nio.NHttpClientHandler;
48 import org.apache.http.nio.entity.ContentBufferEntity;
49 import org.apache.http.nio.entity.ContentOutputStream;
50 import org.apache.http.nio.params.NIOReactorPNames;
51 import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
52 import org.apache.http.nio.util.ByteBufferAllocator;
53 import org.apache.http.nio.util.ContentInputBuffer;
54 import org.apache.http.nio.util.ContentOutputBuffer;
55 import org.apache.http.nio.util.DirectByteBufferAllocator;
56 import org.apache.http.nio.util.SharedInputBuffer;
57 import org.apache.http.nio.util.SharedOutputBuffer;
58 import org.apache.http.params.CoreProtocolPNames;
59 import org.apache.http.params.DefaultedHttpParams;
60 import org.apache.http.params.HttpParams;
61 import org.apache.http.protocol.ExecutionContext;
62 import org.apache.http.protocol.HttpContext;
63 import org.apache.http.protocol.HttpProcessor;
64 import org.apache.http.util.Args;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 @Deprecated
102 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
103 public class ThrottlingHttpClientHandler extends NHttpHandlerBase
104 implements NHttpClientHandler {
105
106 protected HttpRequestExecutionHandler execHandler;
107 protected final Executor executor;
108
109 private final int bufsize;
110
111 public ThrottlingHttpClientHandler(
112 final HttpProcessor httpProcessor,
113 final HttpRequestExecutionHandler execHandler,
114 final ConnectionReuseStrategy connStrategy,
115 final ByteBufferAllocator allocator,
116 final Executor executor,
117 final HttpParams params) {
118 super(httpProcessor, connStrategy, allocator, params);
119 Args.notNull(execHandler, "HTTP request execution handler");
120 Args.notNull(executor, "Executor");
121 this.execHandler = execHandler;
122 this.executor = executor;
123 this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
124 }
125
126 public ThrottlingHttpClientHandler(
127 final HttpProcessor httpProcessor,
128 final HttpRequestExecutionHandler execHandler,
129 final ConnectionReuseStrategy connStrategy,
130 final Executor executor,
131 final HttpParams params) {
132 this(httpProcessor, execHandler, connStrategy,
133 DirectByteBufferAllocator.INSTANCE, executor, params);
134 }
135
136 @Override
137 public void connected(final NHttpClientConnection conn, final Object attachment) {
138 final HttpContext context = conn.getContext();
139
140 initialize(conn, attachment);
141
142 final ClientConnState connState = new ClientConnState(this.bufsize, conn, this.allocator);
143 context.setAttribute(CONN_STATE, connState);
144
145 if (this.eventListener != null) {
146 this.eventListener.connectionOpen(conn);
147 }
148
149 requestReady(conn);
150 }
151
152 @Override
153 public void closed(final NHttpClientConnection conn) {
154 final HttpContext context = conn.getContext();
155 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
156
157 if (connState != null) {
158 synchronized (connState) {
159 connState.close();
160 connState.notifyAll();
161 }
162 }
163 this.execHandler.finalizeContext(context);
164
165 if (this.eventListener != null) {
166 this.eventListener.connectionClosed(conn);
167 }
168 }
169
170 @Override
171 public void exception(final NHttpClientConnection conn, final HttpException ex) {
172 closeConnection(conn, ex);
173 if (this.eventListener != null) {
174 this.eventListener.fatalProtocolException(ex, conn);
175 }
176 }
177
178 @Override
179 public void exception(final NHttpClientConnection conn, final IOException ex) {
180 shutdownConnection(conn, ex);
181 if (this.eventListener != null) {
182 this.eventListener.fatalIOException(ex, conn);
183 }
184 }
185
186
187 @Override
188 public void requestReady(final NHttpClientConnection conn) {
189 final HttpContext context = conn.getContext();
190
191 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
192
193 try {
194
195 synchronized (connState) {
196 if (connState.getOutputState() != ClientConnState.READY) {
197 return;
198 }
199
200 final HttpRequest request = this.execHandler.submitRequest(context);
201 if (request == null) {
202 return;
203 }
204
205 request.setParams(
206 new DefaultedHttpParams(request.getParams(), this.params));
207
208 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
209 this.httpProcessor.process(request, context);
210 connState.setRequest(request);
211 conn.submitRequest(request);
212 connState.setOutputState(ClientConnState.REQUEST_SENT);
213
214 conn.requestInput();
215
216 if (request instanceof HttpEntityEnclosingRequest) {
217 if (((HttpEntityEnclosingRequest) request).expectContinue()) {
218 int timeout = conn.getSocketTimeout();
219 connState.setTimeout(timeout);
220 timeout = this.params.getIntParameter(
221 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
222 conn.setSocketTimeout(timeout);
223 connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
224 } else {
225 sendRequestBody(
226 (HttpEntityEnclosingRequest) request,
227 connState,
228 conn);
229 }
230 }
231
232 connState.notifyAll();
233 }
234
235 } catch (final IOException ex) {
236 shutdownConnection(conn, ex);
237 if (this.eventListener != null) {
238 this.eventListener.fatalIOException(ex, conn);
239 }
240 } catch (final HttpException ex) {
241 closeConnection(conn, ex);
242 if (this.eventListener != null) {
243 this.eventListener.fatalProtocolException(ex, conn);
244 }
245 }
246 }
247
248 @Override
249 public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
250 final HttpContext context = conn.getContext();
251
252 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
253
254 try {
255
256 synchronized (connState) {
257 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
258 conn.suspendOutput();
259 return;
260 }
261 final ContentOutputBuffer buffer = connState.getOutbuffer();
262 buffer.produceContent(encoder);
263 if (encoder.isCompleted()) {
264 connState.setInputState(ClientConnState.REQUEST_BODY_DONE);
265 } else {
266 connState.setInputState(ClientConnState.REQUEST_BODY_STREAM);
267 }
268
269 connState.notifyAll();
270 }
271
272 } catch (final IOException ex) {
273 shutdownConnection(conn, ex);
274 if (this.eventListener != null) {
275 this.eventListener.fatalIOException(ex, conn);
276 }
277 }
278 }
279
280 @Override
281 public void responseReceived(final NHttpClientConnection conn) {
282 final HttpContext context = conn.getContext();
283 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
284
285 try {
286
287 synchronized (connState) {
288 final HttpResponse response = conn.getHttpResponse();
289 response.setParams(
290 new DefaultedHttpParams(response.getParams(), this.params));
291
292 final HttpRequest request = connState.getRequest();
293
294 final int statusCode = response.getStatusLine().getStatusCode();
295 if (statusCode < HttpStatus.SC_OK) {
296
297 if (statusCode == HttpStatus.SC_CONTINUE
298 && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
299 connState.setOutputState(ClientConnState.REQUEST_SENT);
300 continueRequest(conn, connState);
301 }
302 return;
303 } else {
304 connState.setResponse(response);
305 connState.setInputState(ClientConnState.RESPONSE_RECEIVED);
306
307 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
308 final int timeout = connState.getTimeout();
309 conn.setSocketTimeout(timeout);
310 conn.resetOutput();
311 }
312 }
313
314 if (!canResponseHaveBody(request, response)) {
315 conn.resetInput();
316 response.setEntity(null);
317 connState.setInputState(ClientConnState.RESPONSE_DONE);
318
319 if (!this.connStrategy.keepAlive(response, context)) {
320 conn.close();
321 }
322 }
323
324 if (response.getEntity() != null) {
325 response.setEntity(new ContentBufferEntity(
326 response.getEntity(),
327 connState.getInbuffer()));
328 }
329
330 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
331
332 this.httpProcessor.process(response, context);
333
334 handleResponse(response, connState, conn);
335
336 connState.notifyAll();
337 }
338
339 } catch (final IOException ex) {
340 shutdownConnection(conn, ex);
341 if (this.eventListener != null) {
342 this.eventListener.fatalIOException(ex, conn);
343 }
344 } catch (final HttpException ex) {
345 closeConnection(conn, ex);
346 if (this.eventListener != null) {
347 this.eventListener.fatalProtocolException(ex, conn);
348 }
349 }
350 }
351
352 @Override
353 public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
354 final HttpContext context = conn.getContext();
355
356 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
357 try {
358
359 synchronized (connState) {
360 final HttpResponse response = connState.getResponse();
361 final ContentInputBuffer buffer = connState.getInbuffer();
362
363 buffer.consumeContent(decoder);
364 if (decoder.isCompleted()) {
365 connState.setInputState(ClientConnState.RESPONSE_BODY_DONE);
366
367 if (!this.connStrategy.keepAlive(response, context)) {
368 conn.close();
369 }
370 } else {
371 connState.setInputState(ClientConnState.RESPONSE_BODY_STREAM);
372 }
373
374 connState.notifyAll();
375 }
376
377 } catch (final IOException ex) {
378 shutdownConnection(conn, ex);
379 if (this.eventListener != null) {
380 this.eventListener.fatalIOException(ex, conn);
381 }
382 }
383 }
384
385 @Override
386 public void timeout(final NHttpClientConnection conn) {
387 final HttpContext context = conn.getContext();
388 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
389
390 try {
391
392 synchronized (connState) {
393 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
394 connState.setOutputState(ClientConnState.REQUEST_SENT);
395 continueRequest(conn, connState);
396
397 connState.notifyAll();
398 return;
399 }
400 }
401
402 } catch (final IOException ex) {
403 shutdownConnection(conn, ex);
404 if (this.eventListener != null) {
405 this.eventListener.fatalIOException(ex, conn);
406 }
407 }
408
409 handleTimeout(conn);
410 }
411
412 private void initialize(
413 final NHttpClientConnection conn,
414 final Object attachment) {
415 final HttpContext context = conn.getContext();
416
417 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
418 this.execHandler.initalizeContext(context, attachment);
419 }
420
421 private void continueRequest(
422 final NHttpClientConnection conn,
423 final ClientConnState connState) throws IOException {
424
425 final HttpRequest request = connState.getRequest();
426
427 final int timeout = connState.getTimeout();
428 conn.setSocketTimeout(timeout);
429
430 sendRequestBody(
431 (HttpEntityEnclosingRequest) request,
432 connState,
433 conn);
434 }
435
436
437
438
439 private void sendRequestBody(
440 final HttpEntityEnclosingRequest request,
441 final ClientConnState connState,
442 final NHttpClientConnection conn) throws IOException {
443 final HttpEntity entity = request.getEntity();
444 if (entity != null) {
445
446 this.executor.execute(new Runnable() {
447
448 @Override
449 public void run() {
450 try {
451
452
453
454 synchronized (connState) {
455 try {
456 for (;;) {
457 final int currentState = connState.getOutputState();
458 if (!connState.isWorkerRunning()) {
459 break;
460 }
461 if (currentState == ServerConnState.SHUTDOWN) {
462 return;
463 }
464 connState.wait();
465 }
466 } catch (final InterruptedException ex) {
467 connState.shutdown();
468 return;
469 }
470 connState.setWorkerRunning(true);
471 }
472
473 final OutputStream outStream = new ContentOutputStream(
474 connState.getOutbuffer());
475 request.getEntity().writeTo(outStream);
476 outStream.flush();
477 outStream.close();
478
479 synchronized (connState) {
480 connState.setWorkerRunning(false);
481 connState.notifyAll();
482 }
483
484 } catch (final IOException ex) {
485 shutdownConnection(conn, ex);
486 if (eventListener != null) {
487 eventListener.fatalIOException(ex, conn);
488 }
489 }
490 }
491
492 });
493 }
494 }
495
496 private void handleResponse(
497 final HttpResponse response,
498 final ClientConnState connState,
499 final NHttpClientConnection conn) {
500
501 final HttpContext context = conn.getContext();
502
503 this.executor.execute(new Runnable() {
504
505 @Override
506 public void run() {
507 try {
508
509
510
511 synchronized (connState) {
512 try {
513 for (;;) {
514 final int currentState = connState.getOutputState();
515 if (!connState.isWorkerRunning()) {
516 break;
517 }
518 if (currentState == ServerConnState.SHUTDOWN) {
519 return;
520 }
521 connState.wait();
522 }
523 } catch (final InterruptedException ex) {
524 connState.shutdown();
525 return;
526 }
527 connState.setWorkerRunning(true);
528 }
529
530 execHandler.handleResponse(response, context);
531
532 synchronized (connState) {
533
534 try {
535 for (;;) {
536 final int currentState = connState.getInputState();
537 if (currentState == ClientConnState.RESPONSE_DONE) {
538 break;
539 }
540 if (currentState == ServerConnState.SHUTDOWN) {
541 return;
542 }
543 connState.wait();
544 }
545 } catch (final InterruptedException ex) {
546 connState.shutdown();
547 }
548
549 connState.resetInput();
550 connState.resetOutput();
551 if (conn.isOpen()) {
552 conn.requestOutput();
553 }
554 connState.setWorkerRunning(false);
555 connState.notifyAll();
556 }
557
558 } catch (final IOException ex) {
559 shutdownConnection(conn, ex);
560 if (eventListener != null) {
561 eventListener.fatalIOException(ex, conn);
562 }
563 }
564 }
565
566 });
567
568 }
569
570 static class ClientConnState {
571
572 public static final int SHUTDOWN = -1;
573 public static final int READY = 0;
574 public static final int REQUEST_SENT = 1;
575 public static final int EXPECT_CONTINUE = 2;
576 public static final int REQUEST_BODY_STREAM = 4;
577 public static final int REQUEST_BODY_DONE = 8;
578 public static final int RESPONSE_RECEIVED = 16;
579 public static final int RESPONSE_BODY_STREAM = 32;
580 public static final int RESPONSE_BODY_DONE = 64;
581 public static final int RESPONSE_DONE = 64;
582
583 private final SharedInputBuffer inBuffer;
584 private final SharedOutputBuffer outbuffer;
585
586 private volatile int inputState;
587 private volatile int outputState;
588
589 private volatile HttpRequest request;
590 private volatile HttpResponse response;
591
592 private volatile int timeout;
593
594 private volatile boolean workerRunning;
595
596 public ClientConnState(
597 final int bufsize,
598 final IOControl ioControl,
599 final ByteBufferAllocator allocator) {
600 super();
601 this.inBuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
602 this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
603 this.inputState = READY;
604 this.outputState = READY;
605 }
606
607 public ContentInputBuffer getInbuffer() {
608 return this.inBuffer;
609 }
610
611 public ContentOutputBuffer getOutbuffer() {
612 return this.outbuffer;
613 }
614
615 public int getInputState() {
616 return this.inputState;
617 }
618
619 public void setInputState(final int inputState) {
620 this.inputState = inputState;
621 }
622
623 public int getOutputState() {
624 return this.outputState;
625 }
626
627 public void setOutputState(final int outputState) {
628 this.outputState = outputState;
629 }
630
631 public HttpRequest getRequest() {
632 return this.request;
633 }
634
635 public void setRequest(final HttpRequest request) {
636 this.request = request;
637 }
638
639 public HttpResponse getResponse() {
640 return this.response;
641 }
642
643 public void setResponse(final HttpResponse response) {
644 this.response = response;
645 }
646
647 public int getTimeout() {
648 return this.timeout;
649 }
650
651 public void setTimeout(final int timeout) {
652 this.timeout = timeout;
653 }
654
655 public boolean isWorkerRunning() {
656 return this.workerRunning;
657 }
658
659 public void setWorkerRunning(final boolean b) {
660 this.workerRunning = b;
661 }
662
663 public void close() {
664 this.inBuffer.close();
665 this.outbuffer.close();
666 this.inputState = SHUTDOWN;
667 this.outputState = SHUTDOWN;
668 }
669
670 public void shutdown() {
671 this.inBuffer.shutdown();
672 this.outbuffer.shutdown();
673 this.inputState = SHUTDOWN;
674 this.outputState = SHUTDOWN;
675 }
676
677 public void resetInput() {
678 this.inBuffer.reset();
679 this.request = null;
680 this.inputState = READY;
681 }
682
683 public void resetOutput() {
684 this.outbuffer.reset();
685 this.response = null;
686 this.outputState = READY;
687 }
688
689 }
690
691 }