1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.nio.protocol;
29
30 import java.io.IOException;
31
32 import org.apache.http.ConnectionReuseStrategy;
33 import org.apache.http.HttpEntity;
34 import org.apache.http.HttpEntityEnclosingRequest;
35 import org.apache.http.HttpException;
36 import org.apache.http.HttpRequest;
37 import org.apache.http.HttpResponse;
38 import org.apache.http.HttpStatus;
39 import org.apache.http.annotation.ThreadingBehavior;
40 import org.apache.http.annotation.Contract;
41 import org.apache.http.nio.ContentDecoder;
42 import org.apache.http.nio.ContentEncoder;
43 import org.apache.http.nio.NHttpClientConnection;
44 import org.apache.http.nio.NHttpClientHandler;
45 import org.apache.http.nio.entity.ConsumingNHttpEntity;
46 import org.apache.http.nio.entity.NHttpEntityWrapper;
47 import org.apache.http.nio.entity.ProducingNHttpEntity;
48 import org.apache.http.nio.util.ByteBufferAllocator;
49 import org.apache.http.nio.util.HeapByteBufferAllocator;
50 import org.apache.http.params.CoreProtocolPNames;
51 import org.apache.http.params.DefaultedHttpParams;
52 import org.apache.http.params.HttpParams;
53 import org.apache.http.protocol.ExecutionContext;
54 import org.apache.http.protocol.HttpContext;
55 import org.apache.http.protocol.HttpProcessor;
56 import org.apache.http.util.Args;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 @Deprecated
95 @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
96 public class AsyncNHttpClientHandler extends NHttpHandlerBase
97 implements NHttpClientHandler {
98
99 protected NHttpRequestExecutionHandler execHandler;
100
101 public AsyncNHttpClientHandler(
102 final HttpProcessor httpProcessor,
103 final NHttpRequestExecutionHandler execHandler,
104 final ConnectionReuseStrategy connStrategy,
105 final ByteBufferAllocator allocator,
106 final HttpParams params) {
107 super(httpProcessor, connStrategy, allocator, params);
108 this.execHandler = Args.notNull(execHandler, "HTTP request execution handler");
109 }
110
111 public AsyncNHttpClientHandler(
112 final HttpProcessor httpProcessor,
113 final NHttpRequestExecutionHandler execHandler,
114 final ConnectionReuseStrategy connStrategy,
115 final HttpParams params) {
116 this(httpProcessor, execHandler, connStrategy, HeapByteBufferAllocator.INSTANCE, params);
117 }
118
119 @Override
120 public void connected(final NHttpClientConnection conn, final Object attachment) {
121 final HttpContext context = conn.getContext();
122
123 initialize(conn, attachment);
124
125 final ClientConnState connState = new ClientConnState();
126 context.setAttribute(CONN_STATE, connState);
127
128 if (this.eventListener != null) {
129 this.eventListener.connectionOpen(conn);
130 }
131
132 requestReady(conn);
133 }
134
135 @Override
136 public void closed(final NHttpClientConnection conn) {
137 final HttpContext context = conn.getContext();
138
139 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
140 try {
141 connState.reset();
142 } catch (final IOException ex) {
143 if (this.eventListener != null) {
144 this.eventListener.fatalIOException(ex, conn);
145 }
146 }
147
148 this.execHandler.finalizeContext(context);
149
150 if (this.eventListener != null) {
151 this.eventListener.connectionClosed(conn);
152 }
153 }
154
155 @Override
156 public void exception(final NHttpClientConnection conn, final HttpException ex) {
157 closeConnection(conn, ex);
158 if (this.eventListener != null) {
159 this.eventListener.fatalProtocolException(ex, conn);
160 }
161 }
162
163 @Override
164 public void exception(final NHttpClientConnection conn, final IOException ex) {
165 shutdownConnection(conn, ex);
166 if (this.eventListener != null) {
167 this.eventListener.fatalIOException(ex, conn);
168 }
169 }
170
171 @Override
172 public void requestReady(final NHttpClientConnection conn) {
173 final HttpContext context = conn.getContext();
174
175 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
176 if (connState.getOutputState() != ClientConnState.READY) {
177 return;
178 }
179
180 try {
181
182 final HttpRequest request = this.execHandler.submitRequest(context);
183 if (request == null) {
184 return;
185 }
186
187 request.setParams(
188 new DefaultedHttpParams(request.getParams(), this.params));
189
190 context.setAttribute(ExecutionContext.HTTP_REQUEST, request);
191 this.httpProcessor.process(request, context);
192
193 HttpEntityEnclosingRequest entityReq = null;
194 HttpEntity entity = null;
195
196 if (request instanceof HttpEntityEnclosingRequest) {
197 entityReq = (HttpEntityEnclosingRequest) request;
198 entity = entityReq.getEntity();
199 }
200
201 if (entity instanceof ProducingNHttpEntity) {
202 connState.setProducingEntity((ProducingNHttpEntity) entity);
203 } else if (entity != null) {
204 connState.setProducingEntity(new NHttpEntityWrapper(entity));
205 }
206
207 connState.setRequest(request);
208 conn.submitRequest(request);
209 connState.setOutputState(ClientConnState.REQUEST_SENT);
210
211 if (entityReq != null && entityReq.expectContinue()) {
212 int timeout = conn.getSocketTimeout();
213 connState.setTimeout(timeout);
214 timeout = this.params.getIntParameter(
215 CoreProtocolPNames.WAIT_FOR_CONTINUE, 3000);
216 conn.setSocketTimeout(timeout);
217 connState.setOutputState(ClientConnState.EXPECT_CONTINUE);
218 } else if (connState.getProducingEntity() != null) {
219 connState.setOutputState(ClientConnState.REQUEST_BODY_STREAM);
220 }
221
222 } catch (final IOException ex) {
223 shutdownConnection(conn, ex);
224 if (this.eventListener != null) {
225 this.eventListener.fatalIOException(ex, conn);
226 }
227 } catch (final HttpException ex) {
228 closeConnection(conn, ex);
229 if (this.eventListener != null) {
230 this.eventListener.fatalProtocolException(ex, conn);
231 }
232 }
233 }
234
235 @Override
236 public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
237 final HttpContext context = conn.getContext();
238
239 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
240
241 final ConsumingNHttpEntity consumingEntity = connState.getConsumingEntity();
242
243 try {
244 consumingEntity.consumeContent(decoder, conn);
245 if (decoder.isCompleted()) {
246 processResponse(conn, connState);
247 }
248
249 } catch (final IOException ex) {
250 shutdownConnection(conn, ex);
251 if (this.eventListener != null) {
252 this.eventListener.fatalIOException(ex, conn);
253 }
254 } catch (final HttpException ex) {
255 closeConnection(conn, ex);
256 if (this.eventListener != null) {
257 this.eventListener.fatalProtocolException(ex, conn);
258 }
259 }
260 }
261
262 @Override
263 public void outputReady(final NHttpClientConnection conn, final ContentEncoder encoder) {
264 final HttpContext context = conn.getContext();
265 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
266
267 try {
268 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
269 conn.suspendOutput();
270 return;
271 }
272
273 final ProducingNHttpEntity entity = connState.getProducingEntity();
274
275 entity.produceContent(encoder, conn);
276 if (encoder.isCompleted()) {
277 connState.setOutputState(ClientConnState.REQUEST_BODY_DONE);
278 }
279 } catch (final IOException ex) {
280 shutdownConnection(conn, ex);
281 if (this.eventListener != null) {
282 this.eventListener.fatalIOException(ex, conn);
283 }
284 }
285 }
286
287 @Override
288 public void responseReceived(final NHttpClientConnection conn) {
289 final HttpContext context = conn.getContext();
290 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
291
292 final HttpResponse response = conn.getHttpResponse();
293 response.setParams(
294 new DefaultedHttpParams(response.getParams(), this.params));
295
296 final HttpRequest request = connState.getRequest();
297 try {
298
299 final int statusCode = response.getStatusLine().getStatusCode();
300 if (statusCode < HttpStatus.SC_OK) {
301
302 if (statusCode == HttpStatus.SC_CONTINUE
303 && connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
304 continueRequest(conn, connState);
305 }
306 return;
307 } else {
308 connState.setResponse(response);
309 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
310 cancelRequest(conn, connState);
311 } else if (connState.getOutputState() == ClientConnState.REQUEST_BODY_STREAM) {
312
313 cancelRequest(conn, connState);
314 connState.invalidate();
315 conn.suspendOutput();
316 }
317 }
318
319 context.setAttribute(ExecutionContext.HTTP_RESPONSE, response);
320
321 if (!canResponseHaveBody(request, response)) {
322 conn.resetInput();
323 response.setEntity(null);
324 this.httpProcessor.process(response, context);
325 processResponse(conn, connState);
326 } else {
327 final HttpEntity entity = response.getEntity();
328 if (entity != null) {
329 ConsumingNHttpEntity consumingEntity = this.execHandler.responseEntity(
330 response, context);
331 if (consumingEntity == null) {
332 consumingEntity = new NullNHttpEntity(entity);
333 }
334 response.setEntity(consumingEntity);
335 connState.setConsumingEntity(consumingEntity);
336 this.httpProcessor.process(response, context);
337 }
338 }
339
340
341 } catch (final IOException ex) {
342 shutdownConnection(conn, ex);
343 if (this.eventListener != null) {
344 this.eventListener.fatalIOException(ex, conn);
345 }
346 } catch (final HttpException ex) {
347 closeConnection(conn, ex);
348 if (this.eventListener != null) {
349 this.eventListener.fatalProtocolException(ex, conn);
350 }
351 }
352 }
353
354 @Override
355 public void timeout(final NHttpClientConnection conn) {
356 final HttpContext context = conn.getContext();
357 final ClientConnState connState = (ClientConnState) context.getAttribute(CONN_STATE);
358
359 try {
360
361 if (connState.getOutputState() == ClientConnState.EXPECT_CONTINUE) {
362 continueRequest(conn, connState);
363 return;
364 }
365
366 } catch (final IOException ex) {
367 shutdownConnection(conn, ex);
368 if (this.eventListener != null) {
369 this.eventListener.fatalIOException(ex, conn);
370 }
371 }
372
373 handleTimeout(conn);
374 }
375
376 private void initialize(
377 final NHttpClientConnection conn,
378 final Object attachment) {
379 final HttpContext context = conn.getContext();
380
381 context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
382 this.execHandler.initalizeContext(context, attachment);
383 }
384
385
386
387
388 private void continueRequest(
389 final NHttpClientConnection conn,
390 final ClientConnState connState) throws IOException {
391
392 final int timeout = connState.getTimeout();
393 conn.setSocketTimeout(timeout);
394
395 conn.requestOutput();
396 connState.setOutputState(ClientConnState.REQUEST_BODY_STREAM);
397 }
398
399 private void cancelRequest(
400 final NHttpClientConnection conn,
401 final ClientConnState connState) throws IOException {
402
403 final int timeout = connState.getTimeout();
404 conn.setSocketTimeout(timeout);
405
406 conn.resetOutput();
407 connState.resetOutput();
408 }
409
410
411
412
413 private void processResponse(
414 final NHttpClientConnection conn,
415 final ClientConnState connState) throws IOException, HttpException {
416
417 if (!connState.isValid()) {
418 conn.close();
419 }
420
421 final HttpContext context = conn.getContext();
422 final HttpResponse response = connState.getResponse();
423 this.execHandler.handleResponse(response, context);
424 if (!this.connStrategy.keepAlive(response, context)) {
425 conn.close();
426 }
427
428 if (conn.isOpen()) {
429
430 connState.resetInput();
431 connState.resetOutput();
432 conn.requestOutput();
433 }
434 }
435
436 protected static class ClientConnState {
437
438 public static final int READY = 0;
439 public static final int REQUEST_SENT = 1;
440 public static final int EXPECT_CONTINUE = 2;
441 public static final int REQUEST_BODY_STREAM = 4;
442 public static final int REQUEST_BODY_DONE = 8;
443 public static final int RESPONSE_RECEIVED = 16;
444 public static final int RESPONSE_BODY_STREAM = 32;
445 public static final int RESPONSE_BODY_DONE = 64;
446
447 private int outputState;
448
449 private HttpRequest request;
450 private HttpResponse response;
451 private ConsumingNHttpEntity consumingEntity;
452 private ProducingNHttpEntity producingEntity;
453 private boolean valid;
454 private int timeout;
455
456 public ClientConnState() {
457 super();
458 this.valid = true;
459 }
460
461 public void setConsumingEntity(final ConsumingNHttpEntity consumingEntity) {
462 this.consumingEntity = consumingEntity;
463 }
464
465 public void setProducingEntity(final ProducingNHttpEntity producingEntity) {
466 this.producingEntity = producingEntity;
467 }
468
469 public ProducingNHttpEntity getProducingEntity() {
470 return producingEntity;
471 }
472
473 public ConsumingNHttpEntity getConsumingEntity() {
474 return consumingEntity;
475 }
476
477 public int getOutputState() {
478 return this.outputState;
479 }
480
481 public void setOutputState(final int outputState) {
482 this.outputState = outputState;
483 }
484
485 public HttpRequest getRequest() {
486 return this.request;
487 }
488
489 public void setRequest(final HttpRequest request) {
490 this.request = request;
491 }
492
493 public HttpResponse getResponse() {
494 return this.response;
495 }
496
497 public void setResponse(final HttpResponse response) {
498 this.response = response;
499 }
500
501 public int getTimeout() {
502 return this.timeout;
503 }
504
505 public void setTimeout(final int timeout) {
506 this.timeout = timeout;
507 }
508
509 public void resetInput() throws IOException {
510 this.response = null;
511 if (this.consumingEntity != null) {
512 this.consumingEntity.finish();
513 this.consumingEntity = null;
514 }
515 }
516
517 public void resetOutput() throws IOException {
518 this.request = null;
519 if (this.producingEntity != null) {
520 this.producingEntity.finish();
521 this.producingEntity = null;
522 }
523 this.outputState = READY;
524 }
525
526 public void reset() throws IOException {
527 resetInput();
528 resetOutput();
529 }
530
531 public boolean isValid() {
532 return this.valid;
533 }
534
535 public void invalidate() {
536 this.valid = false;
537 }
538
539 }
540
541 }