1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.net.InetSocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.CharBuffer;
34 import java.nio.charset.StandardCharsets;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Set;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import org.apache.hc.core5.concurrent.FutureCallback;
45 import org.apache.hc.core5.http.ConnectionClosedException;
46 import org.apache.hc.core5.http.ContentType;
47 import org.apache.hc.core5.http.EntityDetails;
48 import org.apache.hc.core5.http.Header;
49 import org.apache.hc.core5.http.HeaderElements;
50 import org.apache.hc.core5.http.HttpConnection;
51 import org.apache.hc.core5.http.HttpException;
52 import org.apache.hc.core5.http.HttpHeaders;
53 import org.apache.hc.core5.http.HttpHost;
54 import org.apache.hc.core5.http.HttpRequest;
55 import org.apache.hc.core5.http.HttpResponse;
56 import org.apache.hc.core5.http.HttpStatus;
57 import org.apache.hc.core5.http.URIScheme;
58 import org.apache.hc.core5.http.impl.BasicEntityDetails;
59 import org.apache.hc.core5.http.impl.Http1StreamListener;
60 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
61 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
62 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
63 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
64 import org.apache.hc.core5.http.impl.nio.BufferedData;
65 import org.apache.hc.core5.http.message.BasicHttpRequest;
66 import org.apache.hc.core5.http.message.BasicHttpResponse;
67 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
68 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
69 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
70 import org.apache.hc.core5.http.nio.CapacityChannel;
71 import org.apache.hc.core5.http.nio.DataStreamChannel;
72 import org.apache.hc.core5.http.nio.RequestChannel;
73 import org.apache.hc.core5.http.nio.ResponseChannel;
74 import org.apache.hc.core5.http.protocol.HttpContext;
75 import org.apache.hc.core5.http.protocol.HttpCoreContext;
76 import org.apache.hc.core5.http.protocol.HttpDateGenerator;
77 import org.apache.hc.core5.io.CloseMode;
78 import org.apache.hc.core5.pool.ConnPoolListener;
79 import org.apache.hc.core5.pool.ConnPoolStats;
80 import org.apache.hc.core5.pool.PoolStats;
81 import org.apache.hc.core5.reactor.IOReactorConfig;
82 import org.apache.hc.core5.util.TextUtils;
83 import org.apache.hc.core5.util.TimeValue;
84 import org.apache.hc.core5.util.Timeout;
85
86
87
88
89 public class AsyncReverseProxyExample {
90
91 private static boolean quiet;
92
93 public static void main(final String[] args) throws Exception {
94 if (args.length < 1) {
95 System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
96 System.exit(1);
97 }
98
99 final HttpHost targetHost = HttpHost.create(args[0]);
100 int port = 8080;
101 if (args.length > 1) {
102 port = Integer.parseInt(args[1]);
103 }
104 for (final String s : args) {
105 if ("--quiet".equalsIgnoreCase(s)) {
106 quiet = true;
107 break;
108 }
109 }
110
111 println("Reverse proxy to " + targetHost);
112
113 final IOReactorConfig config = IOReactorConfig.custom()
114 .setSoTimeout(1, TimeUnit.MINUTES)
115 .build();
116
117 final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
118 .setIOReactorConfig(config)
119 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
120
121 @Override
122 public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
123 final StringBuilder buf = new StringBuilder();
124 buf.append("[proxy->origin] connection leased ").append(route);
125 println(buf.toString());
126 }
127
128 @Override
129 public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
130 final StringBuilder buf = new StringBuilder();
131 buf.append("[proxy->origin] connection released ").append(route);
132 final PoolStats totals = connPoolStats.getTotalStats();
133 buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
134 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
135 buf.append(" of ").append(totals.getMax());
136 println(buf.toString());
137 }
138
139 })
140 .setStreamListener(new Http1StreamListener() {
141
142 @Override
143 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
144
145 }
146
147 @Override
148 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
149
150 }
151
152 @Override
153 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
154 println("[proxy<-origin] connection " +
155 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
156 (keepAlive ? " kept alive" : " cannot be kept alive"));
157 }
158
159 })
160 .setMaxTotal(100)
161 .setDefaultMaxPerRoute(20)
162 .create();
163
164 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
165 .setExceptionCallback(e -> e.printStackTrace())
166 .setIOReactorConfig(config)
167 .setStreamListener(new Http1StreamListener() {
168
169 @Override
170 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
171
172 }
173
174 @Override
175 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
176
177 }
178
179 @Override
180 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
181 println("[client<-proxy] connection " +
182 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
183 (keepAlive ? " kept alive" : " cannot be kept alive"));
184 }
185
186 })
187 .register("*", () -> new IncomingExchangeHandler(targetHost, requester))
188 .create();
189
190 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
191 println("Reverse proxy shutting down");
192 server.close(CloseMode.GRACEFUL);
193 requester.close(CloseMode.GRACEFUL);
194 }));
195
196 requester.start();
197 server.start();
198 server.listen(new InetSocketAddress(port), URIScheme.HTTP);
199 println("Listening on port " + port);
200
201 server.awaitShutdown(TimeValue.MAX_VALUE);
202 }
203
204 private static class ProxyBuffer extends BufferedData {
205
206 ProxyBuffer(final int bufferSize) {
207 super(bufferSize);
208 }
209
210 int write(final DataStreamChannel channel) throws IOException {
211 setOutputMode();
212 if (buffer().hasRemaining()) {
213 return channel.write(buffer());
214 }
215 return 0;
216 }
217
218 }
219
220 private static final AtomicLong COUNT = new AtomicLong(0);
221
222 private static class ProxyExchangeState {
223
224 final String id;
225
226 HttpRequest request;
227 EntityDetails requestEntityDetails;
228 DataStreamChannel requestDataChannel;
229 CapacityChannel requestCapacityChannel;
230 ProxyBuffer inBuf;
231 boolean inputEnd;
232
233 HttpResponse response;
234 EntityDetails responseEntityDetails;
235 ResponseChannel responseMessageChannel;
236 DataStreamChannel responseDataChannel;
237 CapacityChannel responseCapacityChannel;
238 ProxyBuffer outBuf;
239 boolean outputEnd;
240
241 AsyncClientEndpoint clientEndpoint;
242
243 ProxyExchangeState() {
244 this.id = String.format("%010d", COUNT.getAndIncrement());
245 }
246
247 }
248
249 private static final int INIT_BUFFER_SIZE = 4096;
250
251 private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
252
253 private final HttpHost targetHost;
254 private final HttpAsyncRequester requester;
255 private final ProxyExchangeState exchangeState;
256
257 IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
258 super();
259 this.targetHost = targetHost;
260 this.requester = requester;
261 this.exchangeState = new ProxyExchangeState();
262 }
263
264 @Override
265 public void handleRequest(
266 final HttpRequest incomingRequest,
267 final EntityDetails entityDetails,
268 final ResponseChannel responseChannel,
269 final HttpContext httpContext) throws HttpException, IOException {
270
271 synchronized (exchangeState) {
272 println("[client->proxy] " + exchangeState.id + " " +
273 incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
274 exchangeState.request = incomingRequest;
275 exchangeState.requestEntityDetails = entityDetails;
276 exchangeState.inputEnd = entityDetails == null;
277 exchangeState.responseMessageChannel = responseChannel;
278
279 if (entityDetails != null) {
280 final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
281 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
282 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
283 }
284 }
285 }
286
287 println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
288
289 requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
290
291 @Override
292 public void completed(final AsyncClientEndpoint clientEndpoint) {
293 println("[proxy->origin] " + exchangeState.id + " connection leased");
294 synchronized (exchangeState) {
295 exchangeState.clientEndpoint = clientEndpoint;
296 }
297 clientEndpoint.execute(
298 new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
299 HttpCoreContext.create());
300 }
301
302 @Override
303 public void failed(final Exception cause) {
304 final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
305 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
306 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
307 final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
308 ContentType.TEXT_PLAIN);
309 synchronized (exchangeState) {
310 exchangeState.response = outgoingResponse;
311 exchangeState.responseEntityDetails = exEntityDetails;
312 exchangeState.outBuf = new ProxyBuffer(1024);
313 exchangeState.outBuf.put(msg);
314 exchangeState.outputEnd = true;
315 }
316 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
317
318 try {
319 responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
320 } catch (final HttpException | IOException ignore) {
321
322 }
323 }
324
325 @Override
326 public void cancelled() {
327 failed(new InterruptedIOException());
328 }
329
330 });
331
332 }
333
334 @Override
335 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
336 synchronized (exchangeState) {
337 exchangeState.requestCapacityChannel = capacityChannel;
338 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
339 if (capacity > 0) {
340 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
341 capacityChannel.update(capacity);
342 }
343 }
344 }
345
346 @Override
347 public void consume(final ByteBuffer src) throws IOException {
348 synchronized (exchangeState) {
349 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
350 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
351 if (dataChannel != null && exchangeState.inBuf != null) {
352 if (exchangeState.inBuf.hasData()) {
353 final int bytesWritten = exchangeState.inBuf.write(dataChannel);
354 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
355 }
356 if (!exchangeState.inBuf.hasData()) {
357 final int bytesWritten = dataChannel.write(src);
358 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
359 }
360 }
361 if (src.hasRemaining()) {
362 if (exchangeState.inBuf == null) {
363 exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
364 }
365 exchangeState.inBuf.put(src);
366 }
367 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
368 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
369 if (dataChannel != null) {
370 dataChannel.requestOutput();
371 }
372 }
373 }
374
375 @Override
376 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
377 synchronized (exchangeState) {
378 println("[client->proxy] " + exchangeState.id + " end of input");
379 exchangeState.inputEnd = true;
380 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
381 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
382 println("[proxy->origin] " + exchangeState.id + " end of output");
383 dataChannel.endStream();
384 }
385 }
386 }
387
388 @Override
389 public int available() {
390 synchronized (exchangeState) {
391 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
392 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
393 return available;
394 }
395 }
396
397 @Override
398 public void produce(final DataStreamChannel channel) throws IOException {
399 synchronized (exchangeState) {
400 println("[client<-proxy] " + exchangeState.id + " produce output");
401 exchangeState.responseDataChannel = channel;
402
403 if (exchangeState.outBuf != null) {
404 if (exchangeState.outBuf.hasData()) {
405 final int bytesWritten = exchangeState.outBuf.write(channel);
406 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
407 }
408 if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
409 channel.endStream();
410 println("[client<-proxy] " + exchangeState.id + " end of output");
411 }
412 if (!exchangeState.outputEnd) {
413 final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
414 if (capacityChannel != null) {
415 final int capacity = exchangeState.outBuf.capacity();
416 if (capacity > 0) {
417 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
418 capacityChannel.update(capacity);
419 }
420 }
421 }
422 }
423 }
424 }
425
426 @Override
427 public void failed(final Exception cause) {
428 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
429 if (!(cause instanceof ConnectionClosedException)) {
430 cause.printStackTrace(System.out);
431 }
432 synchronized (exchangeState) {
433 if (exchangeState.clientEndpoint != null) {
434 exchangeState.clientEndpoint.releaseAndDiscard();
435 }
436 }
437 }
438
439 @Override
440 public void releaseResources() {
441 synchronized (exchangeState) {
442 exchangeState.responseMessageChannel = null;
443 exchangeState.responseDataChannel = null;
444 exchangeState.requestCapacityChannel = null;
445 }
446 }
447
448 }
449
450 private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
451
452 private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
453 TextUtils.toLowerCase(HttpHeaders.HOST),
454 TextUtils.toLowerCase(HttpHeaders.CONTENT_LENGTH),
455 TextUtils.toLowerCase(HttpHeaders.TRANSFER_ENCODING),
456 TextUtils.toLowerCase(HttpHeaders.CONNECTION),
457 TextUtils.toLowerCase(HttpHeaders.KEEP_ALIVE),
458 TextUtils.toLowerCase(HttpHeaders.PROXY_AUTHENTICATE),
459 TextUtils.toLowerCase(HttpHeaders.TE),
460 TextUtils.toLowerCase(HttpHeaders.TRAILER),
461 TextUtils.toLowerCase(HttpHeaders.UPGRADE))));
462
463 private final HttpHost targetHost;
464 private final AsyncClientEndpoint clientEndpoint;
465 private final ProxyExchangeState exchangeState;
466
467 OutgoingExchangeHandler(
468 final HttpHost targetHost,
469 final AsyncClientEndpoint clientEndpoint,
470 final ProxyExchangeState exchangeState) {
471 this.targetHost = targetHost;
472 this.clientEndpoint = clientEndpoint;
473 this.exchangeState = exchangeState;
474 }
475
476 @Override
477 public void produceRequest(
478 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
479 synchronized (exchangeState) {
480 final HttpRequest incomingRequest = exchangeState.request;
481 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
482 final HttpRequest outgoingRequest = new BasicHttpRequest(
483 incomingRequest.getMethod(),
484 targetHost,
485 incomingRequest.getPath());
486 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
487 final Header header = it.next();
488 if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
489 outgoingRequest.addHeader(header);
490 }
491 }
492
493 println("[proxy->origin] " + exchangeState.id + " " +
494 outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
495
496 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
497 }
498 }
499
500 @Override
501 public int available() {
502 synchronized (exchangeState) {
503 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
504 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
505 return available;
506 }
507 }
508
509 @Override
510 public void produce(final DataStreamChannel channel) throws IOException {
511 synchronized (exchangeState) {
512 println("[proxy->origin] " + exchangeState.id + " produce output");
513 exchangeState.requestDataChannel = channel;
514 if (exchangeState.inBuf != null) {
515 if (exchangeState.inBuf.hasData()) {
516 final int bytesWritten = exchangeState.inBuf.write(channel);
517 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
518 }
519 if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
520 channel.endStream();
521 println("[proxy->origin] " + exchangeState.id + " end of output");
522 }
523 if (!exchangeState.inputEnd) {
524 final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
525 if (capacityChannel != null) {
526 final int capacity = exchangeState.inBuf.capacity();
527 if (capacity > 0) {
528 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
529 capacityChannel.update(capacity);
530 }
531 }
532 }
533 }
534 }
535 }
536
537 @Override
538 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
539
540 }
541
542 @Override
543 public void consumeResponse(
544 final HttpResponse incomingResponse,
545 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
546 synchronized (exchangeState) {
547 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
548 if (entityDetails == null) {
549 println("[proxy<-origin] " + exchangeState.id + " end of input");
550 }
551
552 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
553 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
554 final Header header = it.next();
555 if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
556 outgoingResponse.addHeader(header);
557 }
558 }
559
560 exchangeState.response = outgoingResponse;
561 exchangeState.responseEntityDetails = entityDetails;
562 exchangeState.outputEnd = entityDetails == null;
563
564 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
565 if (responseChannel != null) {
566
567 responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
568 }
569
570 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
571 if (entityDetails == null) {
572 println("[client<-proxy] " + exchangeState.id + " end of output");
573 clientEndpoint.releaseAndReuse();
574 }
575 }
576 }
577
578 @Override
579 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
580 synchronized (exchangeState) {
581 exchangeState.responseCapacityChannel = capacityChannel;
582 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
583 if (capacity > 0) {
584 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
585 capacityChannel.update(capacity);
586 }
587 }
588 }
589
590 @Override
591 public void consume(final ByteBuffer src) throws IOException {
592 synchronized (exchangeState) {
593 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
594 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
595 if (dataChannel != null && exchangeState.outBuf != null) {
596 if (exchangeState.outBuf.hasData()) {
597 final int bytesWritten = exchangeState.outBuf.write(dataChannel);
598 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
599 }
600 if (!exchangeState.outBuf.hasData()) {
601 final int bytesWritten = dataChannel.write(src);
602 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
603 }
604 }
605 if (src.hasRemaining()) {
606 if (exchangeState.outBuf == null) {
607 exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
608 }
609 exchangeState.outBuf.put(src);
610 }
611 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
612 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
613 if (dataChannel != null) {
614 dataChannel.requestOutput();
615 }
616 }
617 }
618
619 @Override
620 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
621 synchronized (exchangeState) {
622 println("[proxy<-origin] " + exchangeState.id + " end of input");
623 exchangeState.outputEnd = true;
624 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
625 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
626 println("[client<-proxy] " + exchangeState.id + " end of output");
627 dataChannel.endStream();
628 clientEndpoint.releaseAndReuse();
629 }
630 }
631 }
632
633 @Override
634 public void cancel() {
635 clientEndpoint.releaseAndDiscard();
636 }
637
638 @Override
639 public void failed(final Exception cause) {
640 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
641 if (!(cause instanceof ConnectionClosedException)) {
642 cause.printStackTrace(System.out);
643 }
644 synchronized (exchangeState) {
645 if (exchangeState.response == null) {
646 final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
647 final HttpResponse outgoingResponse = new BasicHttpResponse(status);
648 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
649 exchangeState.response = outgoingResponse;
650
651 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
652 final int contentLen = msg.remaining();
653 exchangeState.outBuf = new ProxyBuffer(1024);
654 exchangeState.outBuf.put(msg);
655 exchangeState.outputEnd = true;
656
657 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
658
659 try {
660 final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
661 exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
662 } catch (final HttpException | IOException ignore) {
663
664 }
665 } else {
666 exchangeState.outputEnd = true;
667 }
668 clientEndpoint.releaseAndDiscard();
669 }
670 }
671
672 @Override
673 public void releaseResources() {
674 synchronized (exchangeState) {
675 exchangeState.requestDataChannel = null;
676 exchangeState.responseCapacityChannel = null;
677 clientEndpoint.releaseAndDiscard();
678 }
679 }
680
681 }
682
683 static void println(final String msg) {
684 if (!quiet) {
685 System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
686 }
687 }
688 }