View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.examples;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.CharBuffer;
34  import java.nio.charset.StandardCharsets;
35  import java.util.Arrays;
36  import java.util.Collections;
37  import java.util.HashSet;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Locale;
41  import java.util.Set;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Supplier;
47  import org.apache.hc.core5.http.ConnectionClosedException;
48  import org.apache.hc.core5.http.ContentType;
49  import org.apache.hc.core5.http.EntityDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HeaderElements;
52  import org.apache.hc.core5.http.HttpConnection;
53  import org.apache.hc.core5.http.HttpException;
54  import org.apache.hc.core5.http.HttpHeaders;
55  import org.apache.hc.core5.http.HttpHost;
56  import org.apache.hc.core5.http.HttpRequest;
57  import org.apache.hc.core5.http.HttpResponse;
58  import org.apache.hc.core5.http.HttpStatus;
59  import org.apache.hc.core5.http.impl.BasicEntityDetails;
60  import org.apache.hc.core5.http.impl.Http1StreamListener;
61  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
62  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
63  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
64  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
65  import org.apache.hc.core5.http.impl.nio.BufferedData;
66  import org.apache.hc.core5.http.message.BasicHttpRequest;
67  import org.apache.hc.core5.http.message.BasicHttpResponse;
68  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
69  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
71  import org.apache.hc.core5.http.nio.CapacityChannel;
72  import org.apache.hc.core5.http.nio.DataStreamChannel;
73  import org.apache.hc.core5.http.nio.RequestChannel;
74  import org.apache.hc.core5.http.nio.ResponseChannel;
75  import org.apache.hc.core5.http.protocol.HttpContext;
76  import org.apache.hc.core5.http.protocol.HttpCoreContext;
77  import org.apache.hc.core5.http.protocol.HttpDateGenerator;
78  import org.apache.hc.core5.io.CloseMode;
79  import org.apache.hc.core5.pool.ConnPoolListener;
80  import org.apache.hc.core5.pool.ConnPoolStats;
81  import org.apache.hc.core5.pool.PoolStats;
82  import org.apache.hc.core5.reactor.IOReactorConfig;
83  import org.apache.hc.core5.util.TimeValue;
84  import org.apache.hc.core5.util.Timeout;
85  
86  /**
87   * Example of asynchronous embedded  HTTP/1.1 reverse proxy with full content streaming.
88   */
89  public class AsyncReverseProxyExample {
90  
91      private static boolean quiet;
92  
93      public static void main(final String[] args) throws Exception {
94          if (args.length < 1) {
95              System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
96              System.exit(1);
97          }
98          // Target host
99          final HttpHost targetHost = HttpHost.create(args[0]);
100         int port = 8080;
101         if (args.length > 1) {
102             port = Integer.parseInt(args[1]);
103         }
104         for (final String s : args) {
105             if ("--quiet".equalsIgnoreCase(s)) {
106                 quiet = true;
107                 break;
108             }
109         }
110 
111         println("Reverse proxy to " + targetHost);
112 
113         final IOReactorConfig config = IOReactorConfig.custom()
114             .setSoTimeout(1, TimeUnit.MINUTES)
115             .build();
116 
117         final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
118                 .setIOReactorConfig(config)
119                 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
120 
121                     @Override
122                     public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
123                         final StringBuilder buf = new StringBuilder();
124                         buf.append("[proxy->origin] connection leased ").append(route);
125                         println(buf.toString());
126                     }
127 
128                     @Override
129                     public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
130                         final StringBuilder buf = new StringBuilder();
131                         buf.append("[proxy->origin] connection released ").append(route);
132                         final PoolStats totals = connPoolStats.getTotalStats();
133                         buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
134                         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
135                         buf.append(" of ").append(totals.getMax());
136                         println(buf.toString());
137                     }
138 
139                 })
140                 .setStreamListener(new Http1StreamListener() {
141 
142                     @Override
143                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
144                         // empty
145                     }
146 
147                     @Override
148                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
149                         // empty
150                     }
151 
152                     @Override
153                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
154                         println("[proxy<-origin] connection " +
155                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
156                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
157                     }
158 
159                 })
160                 .setMaxTotal(100)
161                 .setDefaultMaxPerRoute(20)
162                 .create();
163 
164         final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
165                 .setIOReactorConfig(config)
166                 .setStreamListener(new Http1StreamListener() {
167 
168                     @Override
169                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
170                         // empty
171                     }
172 
173                     @Override
174                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
175                         // empty
176                     }
177 
178                     @Override
179                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
180                         println("[client<-proxy] connection " +
181                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
182                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
183                     }
184 
185                 })
186                 .register("*", new Supplier<AsyncServerExchangeHandler>() {
187 
188                     @Override
189                     public AsyncServerExchangeHandler get() {
190                         return new IncomingExchangeHandler(targetHost, requester);
191                     }
192 
193                 })
194                 .create();
195 
196         Runtime.getRuntime().addShutdownHook(new Thread() {
197             @Override
198             public void run() {
199                 println("Reverse proxy shutting down");
200                 server.close(CloseMode.GRACEFUL);
201                 requester.close(CloseMode.GRACEFUL);
202             }
203         });
204 
205         requester.start();
206         server.start();
207         server.listen(new InetSocketAddress(port));
208         println("Listening on port " + port);
209 
210         server.awaitShutdown(TimeValue.MAX_VALUE);
211     }
212 
213     private static class ProxyBuffer extends BufferedData {
214 
215         ProxyBuffer(final int bufferSize) {
216             super(bufferSize);
217         }
218 
219         int write(final DataStreamChannel channel) throws IOException {
220             setOutputMode();
221             if (buffer().hasRemaining()) {
222                 return channel.write(buffer());
223             }
224             return 0;
225         }
226 
227     }
228 
229     private static final AtomicLong COUNT = new AtomicLong(0);
230 
231     private static class ProxyExchangeState {
232 
233         final String id;
234 
235         HttpRequest request;
236         EntityDetails requestEntityDetails;
237         DataStreamChannel requestDataChannel;
238         CapacityChannel requestCapacityChannel;
239         ProxyBuffer inBuf;
240         boolean inputEnd;
241 
242         HttpResponse response;
243         EntityDetails responseEntityDetails;
244         ResponseChannel responseMessageChannel;
245         DataStreamChannel responseDataChannel;
246         CapacityChannel responseCapacityChannel;
247         ProxyBuffer outBuf;
248         boolean outputEnd;
249 
250         AsyncClientEndpoint clientEndpoint;
251 
252         ProxyExchangeState() {
253             this.id = String.format("%08X", COUNT.getAndIncrement());
254         }
255 
256     }
257 
258     private static final int INIT_BUFFER_SIZE = 4096;
259 
260     private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
261 
262         private final HttpHost targetHost;
263         private final HttpAsyncRequester requester;
264         private final ProxyExchangeState exchangeState;
265 
266         IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
267             super();
268             this.targetHost = targetHost;
269             this.requester = requester;
270             this.exchangeState = new ProxyExchangeState();
271         }
272 
273         @Override
274         public void handleRequest(
275                 final HttpRequest incomingRequest,
276                 final EntityDetails entityDetails,
277                 final ResponseChannel responseChannel,
278                 final HttpContext httpContext) throws HttpException, IOException {
279 
280             synchronized (exchangeState) {
281                 println("[client->proxy] " + exchangeState.id + " " +
282                         incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
283                 exchangeState.request = incomingRequest;
284                 exchangeState.requestEntityDetails = entityDetails;
285                 exchangeState.inputEnd = entityDetails == null;
286                 exchangeState.responseMessageChannel = responseChannel;
287 
288                 if (entityDetails != null) {
289                     final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
290                     if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
291                         responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
292                     }
293                 }
294             }
295 
296             println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
297 
298             requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
299 
300                 @Override
301                 public void completed(final AsyncClientEndpoint clientEndpoint) {
302                     println("[proxy->origin] " + exchangeState.id + " connection leased");
303                     synchronized (exchangeState) {
304                         exchangeState.clientEndpoint = clientEndpoint;
305                     }
306                     clientEndpoint.execute(
307                             new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
308                             HttpCoreContext.create());
309                 }
310 
311                 @Override
312                 public void failed(final Exception cause) {
313                     final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
314                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
315                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
316                     final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
317                                     ContentType.TEXT_PLAIN);
318                     synchronized (exchangeState) {
319                         exchangeState.response = outgoingResponse;
320                         exchangeState.responseEntityDetails = exEntityDetails;
321                         exchangeState.outBuf = new ProxyBuffer(1024);
322                         exchangeState.outBuf.put(msg);
323                         exchangeState.outputEnd = true;
324                     }
325                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
326 
327                     try {
328                         responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
329                     } catch (final HttpException | IOException ignore) {
330                         // ignore
331                     }
332                 }
333 
334                 @Override
335                 public void cancelled() {
336                     failed(new InterruptedIOException());
337                 }
338 
339             });
340 
341         }
342 
343         @Override
344         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
345             synchronized (exchangeState) {
346                 exchangeState.requestCapacityChannel = capacityChannel;
347                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
348                 if (capacity > 0) {
349                     println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
350                     capacityChannel.update(capacity);
351                 }
352             }
353         }
354 
355         @Override
356         public void consume(final ByteBuffer src) throws IOException {
357             synchronized (exchangeState) {
358                 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
359                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
360                 if (dataChannel != null && exchangeState.inBuf != null) {
361                     if (exchangeState.inBuf.hasData()) {
362                         final int bytesWritten = exchangeState.inBuf.write(dataChannel);
363                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
364                     }
365                     if (!exchangeState.inBuf.hasData()) {
366                         final int bytesWritten = dataChannel.write(src);
367                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
368                     }
369                 }
370                 if (src.hasRemaining()) {
371                     if (exchangeState.inBuf == null) {
372                         exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
373                     }
374                     exchangeState.inBuf.put(src);
375                 }
376                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
377                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
378                 if (dataChannel != null) {
379                     dataChannel.requestOutput();
380                 }
381             }
382         }
383 
384         @Override
385         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
386             synchronized (exchangeState) {
387                 println("[client->proxy] " + exchangeState.id + " end of input");
388                 exchangeState.inputEnd = true;
389                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
390                 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
391                     println("[proxy->origin] " + exchangeState.id + " end of output");
392                     dataChannel.endStream();
393                 }
394             }
395         }
396 
397         @Override
398         public int available() {
399             synchronized (exchangeState) {
400                 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
401                 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
402                 return available;
403             }
404         }
405 
406         @Override
407         public void produce(final DataStreamChannel channel) throws IOException {
408             synchronized (exchangeState) {
409                 println("[client<-proxy] " + exchangeState.id + " produce output");
410                 exchangeState.responseDataChannel = channel;
411 
412                 if (exchangeState.outBuf != null) {
413                     if (exchangeState.outBuf.hasData()) {
414                         final int bytesWritten = exchangeState.outBuf.write(channel);
415                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
416                     }
417                     if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
418                         channel.endStream();
419                         println("[client<-proxy] " + exchangeState.id + " end of output");
420                     }
421                     if (!exchangeState.outputEnd) {
422                         final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
423                         if (capacityChannel != null) {
424                             final int capacity = exchangeState.outBuf.capacity();
425                             if (capacity > 0) {
426                                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
427                                 capacityChannel.update(capacity);
428                             }
429                         }
430                     }
431                 }
432             }
433         }
434 
435         @Override
436         public void failed(final Exception cause) {
437             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
438             if (!(cause instanceof ConnectionClosedException)) {
439                 cause.printStackTrace(System.out);
440             }
441             synchronized (exchangeState) {
442                 if (exchangeState.clientEndpoint != null) {
443                     exchangeState.clientEndpoint.releaseAndDiscard();
444                 }
445             }
446         }
447 
448         @Override
449         public void releaseResources() {
450             synchronized (exchangeState) {
451                 exchangeState.responseMessageChannel = null;
452                 exchangeState.responseDataChannel = null;
453                 exchangeState.requestCapacityChannel = null;
454             }
455         }
456 
457     }
458 
459     private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
460 
461         private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
462                         HttpHeaders.HOST.toLowerCase(Locale.ROOT),
463                         HttpHeaders.CONTENT_LENGTH.toLowerCase(Locale.ROOT),
464                         HttpHeaders.TRANSFER_ENCODING.toLowerCase(Locale.ROOT),
465                         HttpHeaders.CONNECTION.toLowerCase(Locale.ROOT),
466                         HttpHeaders.KEEP_ALIVE.toLowerCase(Locale.ROOT),
467                         HttpHeaders.PROXY_AUTHENTICATE.toLowerCase(Locale.ROOT),
468                         HttpHeaders.TE.toLowerCase(Locale.ROOT),
469                         HttpHeaders.TRAILER.toLowerCase(Locale.ROOT),
470                         HttpHeaders.UPGRADE.toLowerCase(Locale.ROOT))));
471 
472         private final HttpHost targetHost;
473         private final AsyncClientEndpoint clientEndpoint;
474         private final ProxyExchangeState exchangeState;
475 
476         OutgoingExchangeHandler(
477                 final HttpHost targetHost,
478                 final AsyncClientEndpoint clientEndpoint,
479                 final ProxyExchangeState exchangeState) {
480             this.targetHost = targetHost;
481             this.clientEndpoint = clientEndpoint;
482             this.exchangeState = exchangeState;
483         }
484 
485         @Override
486         public void produceRequest(
487                 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
488             synchronized (exchangeState) {
489                 final HttpRequest incomingRequest = exchangeState.request;
490                 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
491                 final HttpRequest outgoingRequest = new BasicHttpRequest(
492                         incomingRequest.getMethod(),
493                         targetHost,
494                         incomingRequest.getPath());
495                 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
496                     final Header header = it.next();
497                     if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
498                         outgoingRequest.addHeader(header);
499                     }
500                 }
501 
502                 println("[proxy->origin] " + exchangeState.id + " " +
503                         outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
504 
505                 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
506             }
507         }
508 
509         @Override
510         public int available() {
511             synchronized (exchangeState) {
512                 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
513                 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
514                 return available;
515             }
516         }
517 
518         @Override
519         public void produce(final DataStreamChannel channel) throws IOException {
520             synchronized (exchangeState) {
521                 println("[proxy->origin] " + exchangeState.id + " produce output");
522                 exchangeState.requestDataChannel = channel;
523                 if (exchangeState.inBuf != null) {
524                     if (exchangeState.inBuf.hasData()) {
525                         final int bytesWritten = exchangeState.inBuf.write(channel);
526                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
527                     }
528                     if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
529                         channel.endStream();
530                         println("[proxy->origin] " + exchangeState.id + " end of output");
531                     }
532                     if (!exchangeState.inputEnd) {
533                         final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
534                         if (capacityChannel != null) {
535                             final int capacity = exchangeState.inBuf.capacity();
536                             if (capacity > 0) {
537                                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
538                                 capacityChannel.update(capacity);
539                             }
540                         }
541                     }
542                 }
543             }
544         }
545 
546         @Override
547         public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
548             // ignore
549         }
550 
551         @Override
552         public void consumeResponse(
553                 final HttpResponse incomingResponse,
554                 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
555             synchronized (exchangeState) {
556                 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
557                 if (entityDetails == null) {
558                     println("[proxy<-origin] " + exchangeState.id + " end of input");
559                 }
560 
561                 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
562                 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
563                     final Header header = it.next();
564                     if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
565                         outgoingResponse.addHeader(header);
566                     }
567                 }
568 
569                 exchangeState.response = outgoingResponse;
570                 exchangeState.responseEntityDetails = entityDetails;
571                 exchangeState.outputEnd = entityDetails == null;
572 
573                 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
574                 if (responseChannel != null) {
575                     // responseChannel can be null under load.
576                     responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
577                 }
578 
579                 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
580                 if (entityDetails == null) {
581                     println("[client<-proxy] " + exchangeState.id + " end of output");
582                     clientEndpoint.releaseAndReuse();
583                 }
584             }
585         }
586 
587         @Override
588         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
589             synchronized (exchangeState) {
590                 exchangeState.responseCapacityChannel = capacityChannel;
591                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
592                 if (capacity > 0) {
593                     println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
594                     capacityChannel.update(capacity);
595                 }
596             }
597         }
598 
599         @Override
600         public void consume(final ByteBuffer src) throws IOException {
601             synchronized (exchangeState) {
602                 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
603                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
604                 if (dataChannel != null && exchangeState.outBuf != null) {
605                     if (exchangeState.outBuf.hasData()) {
606                         final int bytesWritten = exchangeState.outBuf.write(dataChannel);
607                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
608                     }
609                     if (!exchangeState.outBuf.hasData()) {
610                         final int bytesWritten = dataChannel.write(src);
611                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
612                     }
613                 }
614                 if (src.hasRemaining()) {
615                     if (exchangeState.outBuf == null) {
616                         exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
617                     }
618                     exchangeState.outBuf.put(src);
619                 }
620                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
621                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
622                 if (dataChannel != null) {
623                     dataChannel.requestOutput();
624                 }
625             }
626         }
627 
628         @Override
629         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
630             synchronized (exchangeState) {
631                 println("[proxy<-origin] " + exchangeState.id + " end of input");
632                 exchangeState.outputEnd = true;
633                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
634                 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
635                     println("[client<-proxy] " + exchangeState.id + " end of output");
636                     dataChannel.endStream();
637                     clientEndpoint.releaseAndReuse();
638                 }
639             }
640         }
641 
642         @Override
643         public void cancel() {
644             clientEndpoint.releaseAndDiscard();
645         }
646 
647         @Override
648         public void failed(final Exception cause) {
649             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
650             if (!(cause instanceof ConnectionClosedException)) {
651                 cause.printStackTrace(System.out);
652             }
653             synchronized (exchangeState) {
654                 if (exchangeState.response == null) {
655                     final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
656                     final HttpResponse outgoingResponse = new BasicHttpResponse(status);
657                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
658                     exchangeState.response = outgoingResponse;
659 
660                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
661                     final int contentLen = msg.remaining();
662                     exchangeState.outBuf = new ProxyBuffer(1024);
663                     exchangeState.outBuf.put(msg);
664                     exchangeState.outputEnd = true;
665 
666                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
667 
668                     try {
669                         final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
670                         exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
671                     } catch (final HttpException | IOException ignore) {
672                         // ignore
673                     }
674                 } else {
675                     exchangeState.outputEnd = true;
676                 }
677                 clientEndpoint.releaseAndDiscard();
678             }
679         }
680 
681         @Override
682         public void releaseResources() {
683             synchronized (exchangeState) {
684                 exchangeState.requestDataChannel = null;
685                 exchangeState.responseCapacityChannel = null;
686                 clientEndpoint.releaseAndDiscard();
687             }
688         }
689 
690     }
691 
692     static final void println(final String msg) {
693         if (!quiet) {
694             System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
695         }
696     }
697 }