1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.InetSocketAddress;
33 import java.net.Socket;
34 import java.net.SocketAddress;
35 import java.nio.channels.ReadableByteChannel;
36 import java.nio.channels.WritableByteChannel;
37 import java.nio.charset.Charset;
38 import java.nio.charset.CharsetDecoder;
39 import java.nio.charset.CharsetEncoder;
40 import java.nio.charset.CodingErrorAction;
41
42 import org.apache.http.ConnectionClosedException;
43 import org.apache.http.Consts;
44 import org.apache.http.Header;
45 import org.apache.http.HttpConnectionMetrics;
46 import org.apache.http.HttpEntity;
47 import org.apache.http.HttpException;
48 import org.apache.http.HttpInetConnection;
49 import org.apache.http.HttpMessage;
50 import org.apache.http.HttpRequest;
51 import org.apache.http.HttpResponse;
52 import org.apache.http.config.MessageConstraints;
53 import org.apache.http.entity.BasicHttpEntity;
54 import org.apache.http.entity.ContentLengthStrategy;
55 import org.apache.http.impl.HttpConnectionMetricsImpl;
56 import org.apache.http.impl.entity.LaxContentLengthStrategy;
57 import org.apache.http.impl.entity.StrictContentLengthStrategy;
58 import org.apache.http.impl.io.HttpTransportMetricsImpl;
59 import org.apache.http.impl.nio.codecs.ChunkDecoder;
60 import org.apache.http.impl.nio.codecs.ChunkEncoder;
61 import org.apache.http.impl.nio.codecs.IdentityDecoder;
62 import org.apache.http.impl.nio.codecs.IdentityEncoder;
63 import org.apache.http.impl.nio.codecs.LengthDelimitedDecoder;
64 import org.apache.http.impl.nio.codecs.LengthDelimitedEncoder;
65 import org.apache.http.impl.nio.reactor.SessionInputBufferImpl;
66 import org.apache.http.impl.nio.reactor.SessionOutputBufferImpl;
67 import org.apache.http.io.HttpTransportMetrics;
68 import org.apache.http.nio.ContentDecoder;
69 import org.apache.http.nio.ContentEncoder;
70 import org.apache.http.nio.NHttpConnection;
71 import org.apache.http.nio.reactor.EventMask;
72 import org.apache.http.nio.reactor.IOSession;
73 import org.apache.http.nio.reactor.SessionBufferStatus;
74 import org.apache.http.nio.reactor.SessionInputBuffer;
75 import org.apache.http.nio.reactor.SessionOutputBuffer;
76 import org.apache.http.nio.reactor.SocketAccessor;
77 import org.apache.http.nio.util.ByteBufferAllocator;
78 import org.apache.http.params.CoreConnectionPNames;
79 import org.apache.http.params.CoreProtocolPNames;
80 import org.apache.http.params.HttpParams;
81 import org.apache.http.protocol.HTTP;
82 import org.apache.http.protocol.HttpContext;
83 import org.apache.http.util.Args;
84 import org.apache.http.util.CharsetUtils;
85 import org.apache.http.util.NetUtils;
86
87
88
89
90
91
92
93 @SuppressWarnings("deprecation")
94 public class NHttpConnectionBase
95 implements NHttpConnection, HttpInetConnection, SessionBufferStatus, SocketAccessor {
96
97 protected final ContentLengthStrategy incomingContentStrategy;
98 protected final ContentLengthStrategy outgoingContentStrategy;
99
100 protected final SessionInputBufferImpl inbuf;
101 protected final SessionOutputBufferImpl outbuf;
102 private final int fragmentSizeHint;
103 private final MessageConstraints constraints;
104
105 protected final HttpTransportMetricsImpl inTransportMetrics;
106 protected final HttpTransportMetricsImpl outTransportMetrics;
107 protected final HttpConnectionMetricsImpl connMetrics;
108
109 protected HttpContext context;
110 protected IOSession session;
111 protected SocketAddress remote;
112 protected volatile ContentDecoder contentDecoder;
113 protected volatile boolean hasBufferedInput;
114 protected volatile ContentEncoder contentEncoder;
115 protected volatile boolean hasBufferedOutput;
116 protected volatile HttpRequest request;
117 protected volatile HttpResponse response;
118
119 protected volatile int status;
120
121
122
123
124
125
126
127
128
129
130
131
132 @Deprecated
133 public NHttpConnectionBase(
134 final IOSession session,
135 final ByteBufferAllocator allocator,
136 final HttpParams params) {
137 super();
138 Args.notNull(session, "I/O session");
139 Args.notNull(params, "HTTP params");
140
141 int bufferSize = params.getIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, -1);
142 if (bufferSize <= 0) {
143 bufferSize = 4096;
144 }
145 int lineBufferSize = bufferSize;
146 if (lineBufferSize > 512) {
147 lineBufferSize = 512;
148 }
149
150 CharsetDecoder decoder = null;
151 CharsetEncoder encoder = null;
152 Charset charset = CharsetUtils.lookup(
153 (String) params.getParameter(CoreProtocolPNames.HTTP_ELEMENT_CHARSET));
154 if (charset != null) {
155 charset = Consts.ASCII;
156 decoder = charset.newDecoder();
157 encoder = charset.newEncoder();
158 final CodingErrorAction malformedCharAction = (CodingErrorAction) params.getParameter(
159 CoreProtocolPNames.HTTP_MALFORMED_INPUT_ACTION);
160 final CodingErrorAction unmappableCharAction = (CodingErrorAction) params.getParameter(
161 CoreProtocolPNames.HTTP_UNMAPPABLE_INPUT_ACTION);
162 decoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
163 encoder.onMalformedInput(malformedCharAction).onUnmappableCharacter(unmappableCharAction);
164 }
165 this.inbuf = new SessionInputBufferImpl(bufferSize, lineBufferSize, decoder, allocator);
166 this.outbuf = new SessionOutputBufferImpl(bufferSize, lineBufferSize, encoder, allocator);
167 this.fragmentSizeHint = bufferSize;
168 this.constraints = MessageConstraints.DEFAULT;
169
170 this.incomingContentStrategy = createIncomingContentStrategy();
171 this.outgoingContentStrategy = createOutgoingContentStrategy();
172
173 this.inTransportMetrics = createTransportMetrics();
174 this.outTransportMetrics = createTransportMetrics();
175 this.connMetrics = createConnectionMetrics(
176 this.inTransportMetrics,
177 this.outTransportMetrics);
178
179 setSession(session);
180 this.status = ACTIVE;
181 }
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205 protected NHttpConnectionBase(
206 final IOSession session,
207 final int bufferSize,
208 final int fragmentSizeHint,
209 final ByteBufferAllocator allocator,
210 final CharsetDecoder charDecoder,
211 final CharsetEncoder charEncoder,
212 final MessageConstraints constraints,
213 final ContentLengthStrategy incomingContentStrategy,
214 final ContentLengthStrategy outgoingContentStrategy) {
215 Args.notNull(session, "I/O session");
216 Args.positive(bufferSize, "Buffer size");
217 int lineBufferSize = bufferSize;
218 if (lineBufferSize > 512) {
219 lineBufferSize = 512;
220 }
221 this.inbuf = new SessionInputBufferImpl(bufferSize, lineBufferSize, charDecoder, allocator);
222 this.outbuf = new SessionOutputBufferImpl(bufferSize, lineBufferSize, charEncoder, allocator);
223 this.fragmentSizeHint = fragmentSizeHint >= 0 ? fragmentSizeHint : bufferSize;
224
225 this.inTransportMetrics = new HttpTransportMetricsImpl();
226 this.outTransportMetrics = new HttpTransportMetricsImpl();
227 this.connMetrics = new HttpConnectionMetricsImpl(this.inTransportMetrics, this.outTransportMetrics);
228 this.constraints = constraints != null ? constraints : MessageConstraints.DEFAULT;
229 this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
230 LaxContentLengthStrategy.INSTANCE;
231 this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
232 StrictContentLengthStrategy.INSTANCE;
233
234 setSession(session);
235 this.status = ACTIVE;
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 protected NHttpConnectionBase(
259 final IOSession session,
260 final int bufferSize,
261 final int fragmentSizeHint,
262 final ByteBufferAllocator allocator,
263 final CharsetDecoder charDecoder,
264 final CharsetEncoder charEncoder,
265 final ContentLengthStrategy incomingContentStrategy,
266 final ContentLengthStrategy outgoingContentStrategy) {
267 this(session, bufferSize, fragmentSizeHint, allocator, charDecoder, charEncoder,
268 null, incomingContentStrategy, outgoingContentStrategy);
269 }
270
271 private void setSession(final IOSession session) {
272 this.session = session;
273 this.context = new SessionHttpContext(this.session);
274 this.session.setBufferStatus(this);
275 this.remote = this.session.getRemoteAddress();
276 }
277
278
279
280
281
282
283
284 protected void bind(final IOSession session) {
285 Args.notNull(session, "I/O session");
286 setSession(session);
287 }
288
289
290
291
292
293
294 @Deprecated
295 protected ContentLengthStrategy createIncomingContentStrategy() {
296 return new LaxContentLengthStrategy();
297 }
298
299
300
301
302
303
304 @Deprecated
305 protected ContentLengthStrategy createOutgoingContentStrategy() {
306 return new StrictContentLengthStrategy();
307 }
308
309
310
311
312
313
314 @Deprecated
315 protected HttpTransportMetricsImpl createTransportMetrics() {
316 return new HttpTransportMetricsImpl();
317 }
318
319
320
321
322
323
324 @Deprecated
325 protected HttpConnectionMetricsImpl createConnectionMetrics(
326 final HttpTransportMetrics inTransportMetric,
327 final HttpTransportMetrics outTransportMetric) {
328 return new HttpConnectionMetricsImpl(inTransportMetric, outTransportMetric);
329 }
330
331 @Override
332 public int getStatus() {
333 return this.status;
334 }
335
336 @Override
337 public HttpContext getContext() {
338 return this.context;
339 }
340
341 @Override
342 public HttpRequest getHttpRequest() {
343 return this.request;
344 }
345
346 @Override
347 public HttpResponse getHttpResponse() {
348 return this.response;
349 }
350
351 @Override
352 public void requestInput() {
353 this.session.setEvent(EventMask.READ);
354 }
355
356 @Override
357 public void requestOutput() {
358 this.session.setEvent(EventMask.WRITE);
359 }
360
361 @Override
362 public void suspendInput() {
363 this.session.clearEvent(EventMask.READ);
364 }
365
366 @Override
367 public void suspendOutput() {
368 synchronized (this.session) {
369 if (!this.outbuf.hasData()) {
370 this.session.clearEvent(EventMask.WRITE);
371 }
372 }
373 }
374
375
376
377
378
379
380
381
382
383
384 protected HttpEntity prepareDecoder(final HttpMessage message) throws HttpException {
385 final BasicHttpEntity entity = new BasicHttpEntity();
386 final long len = this.incomingContentStrategy.determineLength(message);
387 this.contentDecoder = createContentDecoder(
388 len,
389 this.session.channel(),
390 this.inbuf,
391 this.inTransportMetrics);
392 if (len == ContentLengthStrategy.CHUNKED) {
393 entity.setChunked(true);
394 entity.setContentLength(-1);
395 } else if (len == ContentLengthStrategy.IDENTITY) {
396 entity.setChunked(false);
397 entity.setContentLength(-1);
398 } else {
399 entity.setChunked(false);
400 entity.setContentLength(len);
401 }
402
403 final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
404 if (contentTypeHeader != null) {
405 entity.setContentType(contentTypeHeader);
406 }
407 final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
408 if (contentEncodingHeader != null) {
409 entity.setContentEncoding(contentEncodingHeader);
410 }
411 return entity;
412 }
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427 protected ContentDecoder createContentDecoder(
428 final long len,
429 final ReadableByteChannel channel,
430 final SessionInputBuffer buffer,
431 final HttpTransportMetricsImpl metrics) {
432 if (len == ContentLengthStrategy.CHUNKED) {
433 return new ChunkDecoder(channel, buffer, this.constraints, metrics);
434 } else if (len == ContentLengthStrategy.IDENTITY) {
435 return new IdentityDecoder(channel, buffer, metrics);
436 } else {
437 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
438 }
439 }
440
441
442
443
444
445
446
447
448 protected void prepareEncoder(final HttpMessage message) throws HttpException {
449 final long len = this.outgoingContentStrategy.determineLength(message);
450 this.contentEncoder = createContentEncoder(
451 len,
452 this.session.channel(),
453 this.outbuf,
454 this.outTransportMetrics);
455 }
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 protected ContentEncoder createContentEncoder(
471 final long len,
472 final WritableByteChannel channel,
473 final SessionOutputBuffer buffer,
474 final HttpTransportMetricsImpl metrics) {
475 if (len == ContentLengthStrategy.CHUNKED) {
476 return new ChunkEncoder(channel, buffer, metrics, this.fragmentSizeHint);
477 } else if (len == ContentLengthStrategy.IDENTITY) {
478 return new IdentityEncoder(channel, buffer, metrics, this.fragmentSizeHint);
479 } else {
480 return new LengthDelimitedEncoder(channel, buffer, metrics, len, this.fragmentSizeHint);
481 }
482 }
483
484 @Override
485 public boolean hasBufferedInput() {
486 return this.hasBufferedInput;
487 }
488
489 @Override
490 public boolean hasBufferedOutput() {
491 return this.hasBufferedOutput;
492 }
493
494
495
496
497
498
499
500 protected void assertNotClosed() throws ConnectionClosedException {
501 if (this.status != ACTIVE) {
502 throw new ConnectionClosedException();
503 }
504 }
505
506 @Override
507 public void close() throws IOException {
508 if (this.status != ACTIVE) {
509 return;
510 }
511 this.status = CLOSING;
512 this.inbuf.clear();
513 this.hasBufferedInput = false;
514 if (this.outbuf.hasData()) {
515 this.session.setEvent(EventMask.WRITE);
516 } else {
517 this.session.close();
518 this.status = CLOSED;
519 }
520 }
521
522 @Override
523 public boolean isOpen() {
524 return this.status == ACTIVE && !this.session.isClosed();
525 }
526
527 @Override
528 public boolean isStale() {
529 return this.session.isClosed();
530 }
531
532 @Override
533 public InetAddress getLocalAddress() {
534 final SocketAddress address = this.session.getLocalAddress();
535 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getAddress() : null;
536 }
537
538 @Override
539 public int getLocalPort() {
540 final SocketAddress address = this.session.getLocalAddress();
541 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getPort() : -1;
542 }
543
544 @Override
545 public InetAddress getRemoteAddress() {
546 final SocketAddress address = this.session.getRemoteAddress();
547 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getAddress() : null;
548 }
549
550 @Override
551 public int getRemotePort() {
552 final SocketAddress address = this.session.getRemoteAddress();
553 return address instanceof InetSocketAddress ? ((InetSocketAddress) address).getPort() : -1;
554 }
555
556 @Override
557 public void setSocketTimeout(final int timeout) {
558 this.session.setSocketTimeout(timeout);
559 }
560
561 @Override
562 public int getSocketTimeout() {
563 return this.session.getSocketTimeout();
564 }
565
566 @Override
567 public void shutdown() throws IOException {
568 this.status = CLOSED;
569 this.session.shutdown();
570 }
571
572 @Override
573 public HttpConnectionMetrics getMetrics() {
574 return this.connMetrics;
575 }
576
577 @Override
578 public String toString() {
579 final SocketAddress remoteAddress = this.session.getRemoteAddress();
580 final SocketAddress localAddress = this.session.getLocalAddress();
581 if (remoteAddress != null && localAddress != null) {
582 final StringBuilder buffer = new StringBuilder();
583 NetUtils.formatAddress(buffer, localAddress);
584 buffer.append("<->");
585 NetUtils.formatAddress(buffer, remoteAddress);
586 return buffer.toString();
587 }
588 return "[Not bound]";
589 }
590
591 @Override
592 public Socket getSocket() {
593 return this.session instanceof SocketAccessor/../../../../org/apache/http/nio/reactor/SocketAccessor.html#SocketAccessor">SocketAccessor ? ((SocketAccessor) this.session).getSocket() : null;
594 }
595
596 }