View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.io.OutputStream;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.http.ConnectionReuseStrategy;
35  import org.apache.http.HttpEntity;
36  import org.apache.http.HttpEntityEnclosingRequest;
37  import org.apache.http.HttpException;
38  import org.apache.http.HttpRequest;
39  import org.apache.http.HttpResponse;
40  import org.apache.http.HttpStatus;
41  import org.apache.http.annotation.ThreadingBehavior;
42  import org.apache.http.annotation.Contract;
43  import org.apache.http.nio.ContentDecoder;
44  import org.apache.http.nio.ContentEncoder;
45  import org.apache.http.nio.IOControl;
46  import org.apache.http.nio.NHttpClientConnection;
47  import org.apache.http.nio.NHttpClientHandler;
48  import org.apache.http.nio.entity.ContentBufferEntity;
49  import org.apache.http.nio.entity.ContentOutputStream;
50  import org.apache.http.nio.params.NIOReactorPNames;
51  import org.apache.http.nio.protocol.ThrottlingHttpServiceHandler.ServerConnState;
52  import org.apache.http.nio.util.ByteBufferAllocator;
53  import org.apache.http.nio.util.ContentInputBuffer;
54  import org.apache.http.nio.util.ContentOutputBuffer;
55  import org.apache.http.nio.util.DirectByteBufferAllocator;
56  import org.apache.http.nio.util.SharedInputBuffer;
57  import org.apache.http.nio.util.SharedOutputBuffer;
58  import org.apache.http.params.CoreProtocolPNames;
59  import org.apache.http.params.DefaultedHttpParams;
60  import org.apache.http.params.HttpParams;
61  import org.apache.http.protocol.ExecutionContext;
62  import org.apache.http.protocol.HttpContext;
63  import org.apache.http.protocol.HttpProcessor;
64  import org.apache.http.util.Args;
65  
66  /**
67   * Client protocol handler implementation that provide compatibility with
68   * the blocking I/O by utilizing shared content buffers and a fairly small pool
69   * of worker threads. The throttling protocol handler allocates input / output
70   * buffers of a constant length upon initialization and controls the rate of
71   * I/O events in order to ensure those content buffers do not ever get
72   * overflown. This helps ensure nearly constant memory footprint for HTTP
73   * connections and avoid the out of memory condition while streaming content
74   * in and out. The {@link HttpRequestExecutionHandler#handleResponse(HttpResponse, HttpContext)}
75   * method will fire immediately when a message is received. The protocol handler
76   * delegate the task of processing requests and generating response content to
77   * an {@link Executor}, which is expected to perform those tasks using
78   * dedicated worker threads in order to avoid blocking the I/O thread.
79   * <p>
80   * Usually throttling protocol handlers need only a modest number of worker
81   * threads, much fewer than the number of concurrent connections. If the length
82   * of the message is smaller or about the size of the shared content buffer
83   * worker thread will just store content in the buffer and terminate almost
84   * immediately without blocking. The I/O dispatch thread in its turn will take
85   * care of sending out the buffered content asynchronously. The worker thread
86   * will have to block only when processing large messages and the shared buffer
87   * fills up. It is generally advisable to allocate shared buffers of a size of
88   * an average content body for optimal performance.
89   * <p>
90   * The following parameters can be used to customize the behavior of this
91   * class:
92   * <ul>
93   *  <li>{@link org.apache.http.nio.params.NIOReactorPNames#CONTENT_BUFFER_SIZE}</li>
94   *  <li>{@link org.apache.http.params.CoreProtocolPNames#WAIT_FOR_CONTINUE}</li>
95   * </ul>
96   *
97   * @since 4.0
98   *
99   * @deprecated (4.2) use {@link HttpAsyncRequestExecutor} and {@link HttpAsyncRequester}
100  */
101 @Deprecated
102 @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
103 public class ThrottlingHttpClientHandler extends NHttpHandlerBase
104                                          implements NHttpClientHandler {
105 
106     protected HttpRequestExecutionHandler execHandler;
107     protected final Executor executor;
108 
109     private final int bufsize;
110 
111     public ThrottlingHttpClientHandler(
112             final HttpProcessor httpProcessor,
113             final HttpRequestExecutionHandler execHandler,
114             final ConnectionReuseStrategy connStrategy,
115             final ByteBufferAllocator allocator,
116             final Executor executor,
117             final HttpParams params) {
118         super(httpProcessor, connStrategy, allocator, params);
119         Args.notNull(execHandler, "HTTP request execution handler");
120         Args.notNull(executor, "Executor");
121         this.execHandler = execHandler;
122         this.executor = executor;
123         this.bufsize = this.params.getIntParameter(NIOReactorPNames.CONTENT_BUFFER_SIZE, 20480);
124     }
125 
126     public ThrottlingHttpClientHandler(
127             final HttpProcessor httpProcessor,
128             final HttpRequestExecutionHandler execHandler,
129             final ConnectionReuseStrategy connStrategy,
130             final Executor executor,
131             final HttpParams params) {
132         this(httpProcessor, execHandler, connStrategy,
133                 DirectByteBufferAllocator.INSTANCE, executor, params);
134     }
135 
136     @Override
137     public void connected(final NHttpClientConnection conn, final Object attachment) {
138         final HttpContext context = conn.getContext();
139 
140         initialize(conn, attachment);
141 
142         final ClientConnState connState = new ClientConnState(this.bufsize, conn, this.allocator);
143         context.setAttribute(CONN_STATE, connState);
144 
145         if (this.eventListener != null) {
146             this.eventListener.connectionOpen(conn);
147         }
148 
149         requestReady(conn);
150     }
151 
152     @Override
153     public void closed(final NHttpClientConnection conn) {
154         final HttpContext context = conn.getContext();
155         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
156 
157         if (connState != null) {
158             synchronized (connState) {
159                 connState.close();
160                 connState.notifyAll();
161             }
162         }
163         this.execHandler.finalizeContext(context);
164 
165         if (this.eventListener != null) {
166             this.eventListener.connectionClosed(conn);
167         }
168     }
169 
170     @Override
171     public void exception(final NHttpClientConnection conn, final HttpException ex) {
172         closeConnection(conn, ex);
173         if (this.eventListener != null) {
174             this.eventListener.fatalProtocolException(ex, conn);
175         }
176     }
177 
178     @Override
179     public void exception(final NHttpClientConnection conn, final IOException ex) {
180         shutdownConnection(conn, ex);
181         if (this.eventListener != null) {
182             this.eventListener.fatalIOException(ex, conn);
183         }
184     }
185 
186 
187     @Override
188     public void requestReady(final NHttpClientConnection conn) {
189         final HttpContext context = conn.getContext();
190 
191         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
192 
193         try {
194 
195             synchronized (connState) {
196                 if (connState.getOutputState() != ClientConnState.READY) {
197                     return;
198                 }
199 
200                 final HttpRequest request = this.execHandler.submitRequest(context);
201                 if (request == null) {
202                     return;
203                 }
204 
205                 request.setParams(
206                         new DefaultedHttpParams(request.getParams(), this.params));
207 
208                 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
209                 this.httpProcessor.process(request, context);
210                 connState.setRequest(request);
211                 conn.submitRequest(request);
212                 connState.setOutputState(ClientConnState.REQUEST_SENT);
213 
214                 conn.requestInput();
215 
216                 if (request instanceof HttpEntityEnclosingRequest) {
217                     if (((HttpEntityEnclosingRequest) request).expectContinue()) {
218                         int timeout = conn.getSocketTimeout();
219                         connState.setTimeout(timeout);
220                         timeout = this.params.getIntParameter(
221                                 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
222                         conn.setSocketTimeout(timeout);
223                         connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
224                     } else {
225                         sendRequestBody(
226                                 (HttpEntityEnclosingRequest) request,
227                                 connState,
228                                 conn);
229                     }
230                 }
231 
232                 connState.notifyAll();
233             }
234 
235         } catch (final IOException ex) {
236             shutdownConnection(conn, ex);
237             if (this.eventListener != null) {
238                 this.eventListener.fatalIOException(ex, conn);
239             }
240         } catch (final HttpException ex) {
241             closeConnection(conn, ex);
242             if (this.eventListener != null) {
243                 this.eventListener.fatalProtocolException(ex, conn);
244             }
245         }
246     }
247 
248     @Override
249     public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
250         final HttpContext context = conn.getContext();
251 
252         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
253 
254         try {
255 
256             synchronized (connState) {
257                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
258                     conn.suspendOutput();
259                     return;
260                 }
261                 final ContentOutputBuffer buffer = connState.getOutbuffer();
262                 buffer.produceContent(encoder);
263                 if (encoder.isCompleted()) {
264                     connState.setInputState(ClientConnState.REQUEST_BODY_DONE);
265                 } else {
266                     connState.setInputState(ClientConnState.REQUEST_BODY_STREAM);
267                 }
268 
269                 connState.notifyAll();
270             }
271 
272         } catch (final IOException ex) {
273             shutdownConnection(conn, ex);
274             if (this.eventListener != null) {
275                 this.eventListener.fatalIOException(ex, conn);
276             }
277         }
278     }
279 
280     @Override
281     public void responseReceived(final NHttpClientConnection conn) {
282         final HttpContext context = conn.getContext();
283         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
284 
285         try {
286 
287             synchronized (connState) {
288                 final HttpResponse response = conn.getHttpResponse();
289                 response.setParams(
290                         new DefaultedHttpParams(response.getParams(), this.params));
291 
292                 final HttpRequest request = connState.getRequest();
293 
294                 final int statusCode = response.getStatusLine().getStatusCode();
295                 if (statusCode < HttpStatus.SC_OK) {
296                     // 1xx intermediate response
297                     if (statusCode == HttpStatus.SC_CONTINUE
298                             && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
299                         connState.setOutputState(ClientConnState.REQUEST_SENT);
300                         continueRequest(conn, connState);
301                     }
302                     return;
303                 } else {
304                     connState.setResponse(response);
305                     connState.setInputState(ClientConnState.RESPONSE_RECEIVED);
306 
307                     if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
308                         final int timeout = connState.getTimeout();
309                         conn.setSocketTimeout(timeout);
310                         conn.resetOutput();
311                     }
312                 }
313 
314                 if (!canResponseHaveBody(request, response)) {
315                     conn.resetInput();
316                     response.setEntity(null);
317                     connState.setInputState(ClientConnState.RESPONSE_DONE);
318 
319                     if (!this.connStrategy.keepAlive(response, context)) {
320                         conn.close();
321                     }
322                 }
323 
324                 if (response.getEntity() != null) {
325                     response.setEntity(new ContentBufferEntity(
326                             response.getEntity(),
327                             connState.getInbuffer()));
328                 }
329 
330                 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
331 
332                 this.httpProcessor.process(response, context);
333 
334                 handleResponse(response, connState, conn);
335 
336                 connState.notifyAll();
337             }
338 
339         } catch (final IOException ex) {
340             shutdownConnection(conn, ex);
341             if (this.eventListener != null) {
342                 this.eventListener.fatalIOException(ex, conn);
343             }
344         } catch (final HttpException ex) {
345             closeConnection(conn, ex);
346             if (this.eventListener != null) {
347                 this.eventListener.fatalProtocolException(ex, conn);
348             }
349         }
350     }
351 
352     @Override
353     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
354         final HttpContext context = conn.getContext();
355 
356         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
357         try {
358 
359             synchronized (connState) {
360                 final HttpResponse response = connState.getResponse();
361                 final ContentInputBuffer buffer = connState.getInbuffer();
362 
363                 buffer.consumeContent(decoder);
364                 if (decoder.isCompleted()) {
365                     connState.setInputState(ClientConnState.RESPONSE_BODY_DONE);
366 
367                     if (!this.connStrategy.keepAlive(response, context)) {
368                         conn.close();
369                     }
370                 } else {
371                     connState.setInputState(ClientConnState.RESPONSE_BODY_STREAM);
372                 }
373 
374                 connState.notifyAll();
375             }
376 
377         } catch (final IOException ex) {
378             shutdownConnection(conn, ex);
379             if (this.eventListener != null) {
380                 this.eventListener.fatalIOException(ex, conn);
381             }
382         }
383     }
384 
385     @Override
386     public void timeout(final NHttpClientConnection conn) {
387         final HttpContext context = conn.getContext();
388         final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
389 
390         try {
391 
392             synchronized (connState) {
393                 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
394                     connState.setOutputState(ClientConnState.REQUEST_SENT);
395                     continueRequest(conn, connState);
396 
397                     connState.notifyAll();
398                     return;
399                 }
400             }
401 
402         } catch (final IOException ex) {
403             shutdownConnection(conn, ex);
404             if (this.eventListener != null) {
405                 this.eventListener.fatalIOException(ex, conn);
406             }
407         }
408 
409         handleTimeout(conn);
410     }
411 
412     private void initialize(
413             final NHttpClientConnection conn,
414             final Object attachment) {
415         final HttpContext context = conn.getContext();
416 
417         context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
418         this.execHandler.initalizeContext(context, attachment);
419     }
420 
421     private void continueRequest(
422             final NHttpClientConnection conn,
423             final ClientConnState connState) throws IOException {
424 
425         final HttpRequest request = connState.getRequest();
426 
427         final int timeout = connState.getTimeout();
428         conn.setSocketTimeout(timeout);
429 
430         sendRequestBody(
431                 (HttpEntityEnclosingRequest) request,
432                 connState,
433                 conn);
434     }
435 
436     /**
437      * @throws IOException - not thrown currently
438      */
439     private void sendRequestBody(
440             final HttpEntityEnclosingRequest request,
441             final ClientConnState connState,
442             final NHttpClientConnection conn) throws IOException {
443         final HttpEntity entity = request.getEntity();
444         if (entity != null) {
445 
446             this.executor.execute(new Runnable() {
447 
448                 @Override
449                 public void run() {
450                     try {
451 
452                         // Block until previous request is fully processed and
453                         // the worker thread no longer holds the shared buffer
454                         synchronized (connState) {
455                             try {
456                                 for (;;) {
457                                     final int currentState = connState.getOutputState();
458                                     if (!connState.isWorkerRunning()) {
459                                         break;
460                                     }
461                                     if (currentState == ServerConnState.SHUTDOWN) {
462                                         return;
463                                     }
464                                     connState.wait();
465                                 }
466                             } catch (final InterruptedException ex) {
467                                 connState.shutdown();
468                                 return;
469                             }
470                             connState.setWorkerRunning(true);
471                         }
472 
473                         final OutputStream outStream = new ContentOutputStream(
474                                 connState.getOutbuffer());
475                         request.getEntity().writeTo(outStream);
476                         outStream.flush();
477                         outStream.close();
478 
479                         synchronized (connState) {
480                             connState.setWorkerRunning(false);
481                             connState.notifyAll();
482                         }
483 
484                     } catch (final IOException ex) {
485                         shutdownConnection(conn, ex);
486                         if (eventListener != null) {
487                             eventListener.fatalIOException(ex, conn);
488                         }
489                     }
490                 }
491 
492             });
493         }
494     }
495 
496     private void handleResponse(
497             final HttpResponse response,
498             final ClientConnState connState,
499             final NHttpClientConnection conn) {
500 
501         final HttpContext context = conn.getContext();
502 
503         this.executor.execute(new Runnable() {
504 
505             @Override
506             public void run() {
507                 try {
508 
509                     // Block until previous request is fully processed and
510                     // the worker thread no longer holds the shared buffer
511                     synchronized (connState) {
512                         try {
513                             for (;;) {
514                                 final int currentState = connState.getOutputState();
515                                 if (!connState.isWorkerRunning()) {
516                                     break;
517                                 }
518                                 if (currentState == ServerConnState.SHUTDOWN) {
519                                     return;
520                                 }
521                                 connState.wait();
522                             }
523                         } catch (final InterruptedException ex) {
524                             connState.shutdown();
525                             return;
526                         }
527                         connState.setWorkerRunning(true);
528                     }
529 
530                     execHandler.handleResponse(response, context);
531 
532                     synchronized (connState) {
533 
534                         try {
535                             for (;;) {
536                                 final int currentState = connState.getInputState();
537                                 if (currentState == ClientConnState.RESPONSE_DONE) {
538                                     break;
539                                 }
540                                 if (currentState == ServerConnState.SHUTDOWN) {
541                                     return;
542                                 }
543                                 connState.wait();
544                             }
545                         } catch (final InterruptedException ex) {
546                             connState.shutdown();
547                         }
548 
549                         connState.resetInput();
550                         connState.resetOutput();
551                         if (conn.isOpen()) {
552                             conn.requestOutput();
553                         }
554                         connState.setWorkerRunning(false);
555                         connState.notifyAll();
556                     }
557 
558                 } catch (final IOException ex) {
559                     shutdownConnection(conn, ex);
560                     if (eventListener != null) {
561                         eventListener.fatalIOException(ex, conn);
562                     }
563                 }
564             }
565 
566         });
567 
568     }
569 
570     static class ClientConnState {
571 
572         public static final int SHUTDOWN                   = -1;
573         public static final int READY                      = 0;
574         public static final int REQUEST_SENT               = 1;
575         public static final int EXPECT_CONTINUE            = 2;
576         public static final int REQUEST_BODY_STREAM        = 4;
577         public static final int REQUEST_BODY_DONE          = 8;
578         public static final int RESPONSE_RECEIVED          = 16;
579         public static final int RESPONSE_BODY_STREAM       = 32;
580         public static final int RESPONSE_BODY_DONE         = 64;
581         public static final int RESPONSE_DONE              = 64;
582 
583         private final SharedInputBuffer inBuffer;
584         private final SharedOutputBuffer outbuffer;
585 
586         private volatile int inputState;
587         private volatile int outputState;
588 
589         private volatile HttpRequest request;
590         private volatile HttpResponse response;
591 
592         private volatile int timeout;
593 
594         private volatile boolean workerRunning;
595 
596         public ClientConnState(
597                 final int bufsize,
598                 final IOControl ioControl,
599                 final ByteBufferAllocator allocator) {
600             super();
601             this.inBuffer = new SharedInputBuffer(bufsize, ioControl, allocator);
602             this.outbuffer = new SharedOutputBuffer(bufsize, ioControl, allocator);
603             this.inputState = READY;
604             this.outputState = READY;
605         }
606 
607         public ContentInputBuffer getInbuffer() {
608             return this.inBuffer;
609         }
610 
611         public ContentOutputBuffer getOutbuffer() {
612             return this.outbuffer;
613         }
614 
615         public int getInputState() {
616             return this.inputState;
617         }
618 
619         public void setInputState(final int inputState) {
620             this.inputState = inputState;
621         }
622 
623         public int getOutputState() {
624             return this.outputState;
625         }
626 
627         public void setOutputState(final int outputState) {
628             this.outputState = outputState;
629         }
630 
631         public HttpRequest getRequest() {
632             return this.request;
633         }
634 
635         public void setRequest(final HttpRequest request) {
636             this.request = request;
637         }
638 
639         public HttpResponse getResponse() {
640             return this.response;
641         }
642 
643         public void setResponse(final HttpResponse response) {
644             this.response = response;
645         }
646 
647         public int getTimeout() {
648             return this.timeout;
649         }
650 
651         public void setTimeout(final int timeout) {
652             this.timeout = timeout;
653         }
654 
655         public boolean isWorkerRunning() {
656             return this.workerRunning;
657         }
658 
659         public void setWorkerRunning(final boolean b) {
660             this.workerRunning = b;
661         }
662 
663         public void close() {
664             this.inBuffer.close();
665             this.outbuffer.close();
666             this.inputState = SHUTDOWN;
667             this.outputState = SHUTDOWN;
668         }
669 
670         public void shutdown() {
671             this.inBuffer.shutdown();
672             this.outbuffer.shutdown();
673             this.inputState = SHUTDOWN;
674             this.outputState = SHUTDOWN;
675         }
676 
677         public void resetInput() {
678             this.inBuffer.reset();
679             this.request = null;
680             this.inputState = READY;
681         }
682 
683         public void resetOutput() {
684             this.outbuffer.reset();
685             this.response = null;
686             this.outputState = READY;
687         }
688 
689     }
690 
691 }