View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.examples;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.CharBuffer;
34  import java.nio.charset.StandardCharsets;
35  import java.util.Arrays;
36  import java.util.Collections;
37  import java.util.HashSet;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Set;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.atomic.AtomicLong;
43  import java.util.concurrent.locks.ReentrantLock;
44  
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.http.ConnectionClosedException;
47  import org.apache.hc.core5.http.ContentType;
48  import org.apache.hc.core5.http.EntityDetails;
49  import org.apache.hc.core5.http.Header;
50  import org.apache.hc.core5.http.HeaderElements;
51  import org.apache.hc.core5.http.HttpConnection;
52  import org.apache.hc.core5.http.HttpException;
53  import org.apache.hc.core5.http.HttpHeaders;
54  import org.apache.hc.core5.http.HttpHost;
55  import org.apache.hc.core5.http.HttpRequest;
56  import org.apache.hc.core5.http.HttpResponse;
57  import org.apache.hc.core5.http.HttpStatus;
58  import org.apache.hc.core5.http.URIScheme;
59  import org.apache.hc.core5.http.impl.BasicEntityDetails;
60  import org.apache.hc.core5.http.impl.Http1StreamListener;
61  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
62  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
63  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
64  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
65  import org.apache.hc.core5.http.impl.nio.BufferedData;
66  import org.apache.hc.core5.http.message.BasicHttpRequest;
67  import org.apache.hc.core5.http.message.BasicHttpResponse;
68  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
69  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
71  import org.apache.hc.core5.http.nio.CapacityChannel;
72  import org.apache.hc.core5.http.nio.DataStreamChannel;
73  import org.apache.hc.core5.http.nio.RequestChannel;
74  import org.apache.hc.core5.http.nio.ResponseChannel;
75  import org.apache.hc.core5.http.protocol.HttpContext;
76  import org.apache.hc.core5.http.protocol.HttpCoreContext;
77  import org.apache.hc.core5.http.protocol.HttpDateGenerator;
78  import org.apache.hc.core5.io.CloseMode;
79  import org.apache.hc.core5.pool.ConnPoolListener;
80  import org.apache.hc.core5.pool.ConnPoolStats;
81  import org.apache.hc.core5.pool.PoolStats;
82  import org.apache.hc.core5.reactor.IOReactorConfig;
83  import org.apache.hc.core5.util.TextUtils;
84  import org.apache.hc.core5.util.TimeValue;
85  import org.apache.hc.core5.util.Timeout;
86  
87  /**
88   * Example of asynchronous embedded  HTTP/1.1 reverse proxy with full content streaming.
89   */
90  public class AsyncReverseProxyExample {
91  
92      private static boolean quiet;
93  
94      public static void main(final String[] args) throws Exception {
95          if (args.length < 1) {
96              System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
97              System.exit(1);
98          }
99          // Target host
100         final HttpHost targetHost = HttpHost.create(args[0]);
101         int port = 8080;
102         if (args.length > 1) {
103             port = Integer.parseInt(args[1]);
104         }
105         for (final String s : args) {
106             if ("--quiet".equalsIgnoreCase(s)) {
107                 quiet = true;
108                 break;
109             }
110         }
111 
112         println("Reverse proxy to " + targetHost);
113 
114         final IOReactorConfig config = IOReactorConfig.custom()
115             .setSoTimeout(1, TimeUnit.MINUTES)
116             .build();
117 
118         final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
119                 .setIOReactorConfig(config)
120                 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
121 
122                     @Override
123                     public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
124                         final StringBuilder buf = new StringBuilder();
125                         buf.append("[proxy->origin] connection leased ").append(route);
126                         println(buf.toString());
127                     }
128 
129                     @Override
130                     public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
131                         final StringBuilder buf = new StringBuilder();
132                         buf.append("[proxy->origin] connection released ").append(route);
133                         final PoolStats totals = connPoolStats.getTotalStats();
134                         buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
135                         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
136                         buf.append(" of ").append(totals.getMax());
137                         println(buf.toString());
138                     }
139 
140                 })
141                 .setStreamListener(new Http1StreamListener() {
142 
143                     @Override
144                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
145                         // empty
146                     }
147 
148                     @Override
149                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
150                         // empty
151                     }
152 
153                     @Override
154                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
155                         println("[proxy<-origin] connection " +
156                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
157                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
158                     }
159 
160                 })
161                 .setMaxTotal(100)
162                 .setDefaultMaxPerRoute(20)
163                 .create();
164 
165         final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
166                 .setExceptionCallback(e -> e.printStackTrace())
167                 .setIOReactorConfig(config)
168                 .setStreamListener(new Http1StreamListener() {
169 
170                     @Override
171                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
172                         // empty
173                     }
174 
175                     @Override
176                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
177                         // empty
178                     }
179 
180                     @Override
181                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
182                         println("[client<-proxy] connection " +
183                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
184                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
185                     }
186 
187                 })
188                 .register("*", () -> new IncomingExchangeHandler(targetHost, requester))
189                 .create();
190 
191         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
192             println("Reverse proxy shutting down");
193             server.close(CloseMode.GRACEFUL);
194             requester.close(CloseMode.GRACEFUL);
195         }));
196 
197         requester.start();
198         server.start();
199         server.listen(new InetSocketAddress(port), URIScheme.HTTP);
200         println("Listening on port " + port);
201 
202         server.awaitShutdown(TimeValue.MAX_VALUE);
203     }
204 
205     private static class ProxyBuffer extends BufferedData {
206 
207         ProxyBuffer(final int bufferSize) {
208             super(bufferSize);
209         }
210 
211         int write(final DataStreamChannel channel) throws IOException {
212             setOutputMode();
213             if (buffer().hasRemaining()) {
214                 return channel.write(buffer());
215             }
216             return 0;
217         }
218 
219     }
220 
221     private static final AtomicLong COUNT = new AtomicLong(0);
222 
223     private static class ProxyExchangeState {
224 
225         final String id;
226 
227         HttpRequest request;
228         EntityDetails requestEntityDetails;
229         DataStreamChannel requestDataChannel;
230         CapacityChannel requestCapacityChannel;
231         ProxyBuffer inBuf;
232         boolean inputEnd;
233 
234         HttpResponse response;
235         EntityDetails responseEntityDetails;
236         ResponseChannel responseMessageChannel;
237         DataStreamChannel responseDataChannel;
238         CapacityChannel responseCapacityChannel;
239         ProxyBuffer outBuf;
240         boolean outputEnd;
241 
242         AsyncClientEndpoint clientEndpoint;
243 
244         ProxyExchangeState() {
245             this.id = String.format("%010d", COUNT.getAndIncrement());
246         }
247 
248     }
249 
250     private static final int INIT_BUFFER_SIZE = 4096;
251 
252     private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
253 
254         private final HttpHost targetHost;
255         private final HttpAsyncRequester requester;
256         private final ProxyExchangeState exchangeState;
257         private final ReentrantLock lock;
258 
259         IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
260             super();
261             this.targetHost = targetHost;
262             this.requester = requester;
263             this.exchangeState = new ProxyExchangeState();
264             this.lock = new ReentrantLock();
265         }
266 
267         @Override
268         public void handleRequest(
269                 final HttpRequest incomingRequest,
270                 final EntityDetails entityDetails,
271                 final ResponseChannel responseChannel,
272                 final HttpContext httpContext) throws HttpException, IOException {
273 
274             lock.lock();
275             try {
276                 println("[client->proxy] " + exchangeState.id + " " +
277                         incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
278                 exchangeState.request = incomingRequest;
279                 exchangeState.requestEntityDetails = entityDetails;
280                 exchangeState.inputEnd = entityDetails == null;
281                 exchangeState.responseMessageChannel = responseChannel;
282 
283                 if (entityDetails != null) {
284                     final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
285                     if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
286                         responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
287                     }
288                 }
289             } finally {
290                 lock.unlock();
291             }
292 
293             println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
294 
295             requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
296 
297                 @Override
298                 public void completed(final AsyncClientEndpoint clientEndpoint) {
299                     println("[proxy->origin] " + exchangeState.id + " connection leased");
300                     lock.lock();
301                     try {
302                         exchangeState.clientEndpoint = clientEndpoint;
303                     } finally {
304                         lock.unlock();
305                     }
306                     clientEndpoint.execute(
307                             new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
308                             HttpCoreContext.create());
309                 }
310 
311                 @Override
312                 public void failed(final Exception cause) {
313                     final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
314                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
315                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
316                     final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
317                                     ContentType.TEXT_PLAIN);
318                     lock.lock();
319                     try {
320                         exchangeState.response = outgoingResponse;
321                         exchangeState.responseEntityDetails = exEntityDetails;
322                         exchangeState.outBuf = new ProxyBuffer(1024);
323                         exchangeState.outBuf.put(msg);
324                         exchangeState.outputEnd = true;
325                     } finally {
326                         lock.unlock();
327                     }
328                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
329 
330                     try {
331                         responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
332                     } catch (final HttpException | IOException ignore) {
333                         // ignore
334                     }
335                 }
336 
337                 @Override
338                 public void cancelled() {
339                     failed(new InterruptedIOException());
340                 }
341 
342             });
343 
344         }
345 
346         @Override
347         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
348             lock.lock();
349             try {
350                 exchangeState.requestCapacityChannel = capacityChannel;
351                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
352                 if (capacity > 0) {
353                     println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
354                     capacityChannel.update(capacity);
355                 }
356             } finally {
357                 lock.unlock();
358             }
359         }
360 
361         @Override
362         public void consume(final ByteBuffer src) throws IOException {
363             lock.lock();
364             try {
365                 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
366                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
367                 if (dataChannel != null && exchangeState.inBuf != null) {
368                     if (exchangeState.inBuf.hasData()) {
369                         final int bytesWritten = exchangeState.inBuf.write(dataChannel);
370                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
371                     }
372                     if (!exchangeState.inBuf.hasData()) {
373                         final int bytesWritten = dataChannel.write(src);
374                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
375                     }
376                 }
377                 if (src.hasRemaining()) {
378                     if (exchangeState.inBuf == null) {
379                         exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
380                     }
381                     exchangeState.inBuf.put(src);
382                 }
383                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
384                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
385                 if (dataChannel != null) {
386                     dataChannel.requestOutput();
387                 }
388             } finally {
389                 lock.unlock();
390             }
391         }
392 
393         @Override
394         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
395             lock.lock();
396             try {
397                 println("[client->proxy] " + exchangeState.id + " end of input");
398                 exchangeState.inputEnd = true;
399                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
400                 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
401                     println("[proxy->origin] " + exchangeState.id + " end of output");
402                     dataChannel.endStream();
403                 }
404             } finally {
405                 lock.unlock();
406             }
407         }
408 
409         @Override
410         public int available() {
411             lock.lock();
412             try {
413                 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
414                 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
415                 return available;
416             } finally {
417                 lock.unlock();
418             }
419         }
420 
421         @Override
422         public void produce(final DataStreamChannel channel) throws IOException {
423             lock.lock();
424             try {
425                 println("[client<-proxy] " + exchangeState.id + " produce output");
426                 exchangeState.responseDataChannel = channel;
427 
428                 if (exchangeState.outBuf != null) {
429                     if (exchangeState.outBuf.hasData()) {
430                         final int bytesWritten = exchangeState.outBuf.write(channel);
431                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
432                     }
433                     if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
434                         channel.endStream();
435                         println("[client<-proxy] " + exchangeState.id + " end of output");
436                     }
437                     if (!exchangeState.outputEnd) {
438                         final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
439                         if (capacityChannel != null) {
440                             final int capacity = exchangeState.outBuf.capacity();
441                             if (capacity > 0) {
442                                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
443                                 capacityChannel.update(capacity);
444                             }
445                         }
446                     }
447                 }
448             } finally {
449                 lock.unlock();
450             }
451         }
452 
453         @Override
454         public void failed(final Exception cause) {
455             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
456             if (!(cause instanceof ConnectionClosedException)) {
457                 cause.printStackTrace(System.out);
458             }
459             lock.lock();
460             try {
461                 if (exchangeState.clientEndpoint != null) {
462                     exchangeState.clientEndpoint.releaseAndDiscard();
463                 }
464             } finally {
465                 lock.unlock();
466             }
467         }
468 
469         @Override
470         public void releaseResources() {
471             lock.lock();
472             try {
473                 exchangeState.responseMessageChannel = null;
474                 exchangeState.responseDataChannel = null;
475                 exchangeState.requestCapacityChannel = null;
476             } finally {
477                 lock.unlock();
478             }
479         }
480 
481     }
482 
483     private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
484 
485         private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
486                 TextUtils.toLowerCase(HttpHeaders.HOST),
487                 TextUtils.toLowerCase(HttpHeaders.CONTENT_LENGTH),
488                 TextUtils.toLowerCase(HttpHeaders.TRANSFER_ENCODING),
489                 TextUtils.toLowerCase(HttpHeaders.CONNECTION),
490                 TextUtils.toLowerCase(HttpHeaders.KEEP_ALIVE),
491                 TextUtils.toLowerCase(HttpHeaders.PROXY_AUTHENTICATE),
492                 TextUtils.toLowerCase(HttpHeaders.TE),
493                 TextUtils.toLowerCase(HttpHeaders.TRAILER),
494                 TextUtils.toLowerCase(HttpHeaders.UPGRADE))));
495 
496         private final HttpHost targetHost;
497         private final AsyncClientEndpoint clientEndpoint;
498         private final ProxyExchangeState exchangeState;
499         private final ReentrantLock lock;
500 
501         OutgoingExchangeHandler(
502                 final HttpHost targetHost,
503                 final AsyncClientEndpoint clientEndpoint,
504                 final ProxyExchangeState exchangeState) {
505             this.targetHost = targetHost;
506             this.clientEndpoint = clientEndpoint;
507             this.exchangeState = exchangeState;
508             this.lock = new ReentrantLock();
509         }
510 
511         @Override
512         public void produceRequest(
513                 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
514             lock.lock();
515             try {
516                 final HttpRequest incomingRequest = exchangeState.request;
517                 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
518                 final HttpRequest outgoingRequest = new BasicHttpRequest(
519                         incomingRequest.getMethod(),
520                         targetHost,
521                         incomingRequest.getPath());
522                 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
523                     final Header header = it.next();
524                     if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
525                         outgoingRequest.addHeader(header);
526                     }
527                 }
528 
529                 println("[proxy->origin] " + exchangeState.id + " " +
530                         outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
531 
532                 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
533             } finally {
534                 lock.unlock();
535             }
536         }
537 
538         @Override
539         public int available() {
540             lock.lock();
541             try {
542                 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
543                 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
544                 return available;
545             } finally {
546                 lock.unlock();
547             }
548         }
549 
550         @Override
551         public void produce(final DataStreamChannel channel) throws IOException {
552             lock.lock();
553             try {
554                 println("[proxy->origin] " + exchangeState.id + " produce output");
555                 exchangeState.requestDataChannel = channel;
556                 if (exchangeState.inBuf != null) {
557                     if (exchangeState.inBuf.hasData()) {
558                         final int bytesWritten = exchangeState.inBuf.write(channel);
559                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
560                     }
561                     if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
562                         channel.endStream();
563                         println("[proxy->origin] " + exchangeState.id + " end of output");
564                     }
565                     if (!exchangeState.inputEnd) {
566                         final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
567                         if (capacityChannel != null) {
568                             final int capacity = exchangeState.inBuf.capacity();
569                             if (capacity > 0) {
570                                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
571                                 capacityChannel.update(capacity);
572                             }
573                         }
574                     }
575                 }
576             } finally {
577                 lock.unlock();
578             }
579         }
580 
581         @Override
582         public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
583             // ignore
584         }
585 
586         @Override
587         public void consumeResponse(
588                 final HttpResponse incomingResponse,
589                 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
590             lock.lock();
591             try {
592                 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
593                 if (entityDetails == null) {
594                     println("[proxy<-origin] " + exchangeState.id + " end of input");
595                 }
596 
597                 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
598                 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
599                     final Header header = it.next();
600                     if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
601                         outgoingResponse.addHeader(header);
602                     }
603                 }
604 
605                 exchangeState.response = outgoingResponse;
606                 exchangeState.responseEntityDetails = entityDetails;
607                 exchangeState.outputEnd = entityDetails == null;
608 
609                 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
610                 if (responseChannel != null) {
611                     // responseChannel can be null under load.
612                     responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
613                 }
614 
615                 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
616                 if (entityDetails == null) {
617                     println("[client<-proxy] " + exchangeState.id + " end of output");
618                     clientEndpoint.releaseAndReuse();
619                 }
620             } finally {
621                 lock.unlock();
622             }
623         }
624 
625         @Override
626         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
627             lock.lock();
628             try {
629                 exchangeState.responseCapacityChannel = capacityChannel;
630                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
631                 if (capacity > 0) {
632                     println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
633                     capacityChannel.update(capacity);
634                 }
635             } finally {
636                 lock.unlock();
637             }
638         }
639 
640         @Override
641         public void consume(final ByteBuffer src) throws IOException {
642             lock.lock();
643             try {
644                 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
645                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
646                 if (dataChannel != null && exchangeState.outBuf != null) {
647                     if (exchangeState.outBuf.hasData()) {
648                         final int bytesWritten = exchangeState.outBuf.write(dataChannel);
649                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
650                     }
651                     if (!exchangeState.outBuf.hasData()) {
652                         final int bytesWritten = dataChannel.write(src);
653                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
654                     }
655                 }
656                 if (src.hasRemaining()) {
657                     if (exchangeState.outBuf == null) {
658                         exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
659                     }
660                     exchangeState.outBuf.put(src);
661                 }
662                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
663                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
664                 if (dataChannel != null) {
665                     dataChannel.requestOutput();
666                 }
667             } finally {
668                 lock.unlock();
669             }
670         }
671 
672         @Override
673         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
674             lock.lock();
675             try {
676                 println("[proxy<-origin] " + exchangeState.id + " end of input");
677                 exchangeState.outputEnd = true;
678                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
679                 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
680                     println("[client<-proxy] " + exchangeState.id + " end of output");
681                     dataChannel.endStream();
682                     clientEndpoint.releaseAndReuse();
683                 }
684             } finally {
685                 lock.unlock();
686             }
687         }
688 
689         @Override
690         public void cancel() {
691             clientEndpoint.releaseAndDiscard();
692         }
693 
694         @Override
695         public void failed(final Exception cause) {
696             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
697             if (!(cause instanceof ConnectionClosedException)) {
698                 cause.printStackTrace(System.out);
699             }
700             lock.lock();
701             try {
702                 if (exchangeState.response == null) {
703                     final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
704                     final HttpResponse outgoingResponse = new BasicHttpResponse(status);
705                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
706                     exchangeState.response = outgoingResponse;
707 
708                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
709                     final int contentLen = msg.remaining();
710                     exchangeState.outBuf = new ProxyBuffer(1024);
711                     exchangeState.outBuf.put(msg);
712                     exchangeState.outputEnd = true;
713 
714                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
715 
716                     try {
717                         final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
718                         exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
719                     } catch (final HttpException | IOException ignore) {
720                         // ignore
721                     }
722                 } else {
723                     exchangeState.outputEnd = true;
724                 }
725                 clientEndpoint.releaseAndDiscard();
726             } finally {
727                 lock.unlock();
728             }
729         }
730 
731         @Override
732         public void releaseResources() {
733             lock.lock();
734             try {
735                 exchangeState.requestDataChannel = null;
736                 exchangeState.responseCapacityChannel = null;
737                 clientEndpoint.releaseAndDiscard();
738             } finally {
739                 lock.unlock();
740             }
741         }
742 
743     }
744 
745     static void println(final String msg) {
746         if (!quiet) {
747             System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
748         }
749     }
750 }