1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.net.InetSocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.CharBuffer;
34 import java.nio.charset.StandardCharsets;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Set;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43
44 import org.apache.hc.core5.concurrent.FutureCallback;
45 import org.apache.hc.core5.http.ConnectionClosedException;
46 import org.apache.hc.core5.http.ContentType;
47 import org.apache.hc.core5.http.EntityDetails;
48 import org.apache.hc.core5.http.Header;
49 import org.apache.hc.core5.http.HeaderElements;
50 import org.apache.hc.core5.http.HttpConnection;
51 import org.apache.hc.core5.http.HttpException;
52 import org.apache.hc.core5.http.HttpHeaders;
53 import org.apache.hc.core5.http.HttpHost;
54 import org.apache.hc.core5.http.HttpRequest;
55 import org.apache.hc.core5.http.HttpResponse;
56 import org.apache.hc.core5.http.HttpStatus;
57 import org.apache.hc.core5.http.URIScheme;
58 import org.apache.hc.core5.http.impl.BasicEntityDetails;
59 import org.apache.hc.core5.http.impl.Http1StreamListener;
60 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
61 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
62 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
63 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
64 import org.apache.hc.core5.http.impl.nio.BufferedData;
65 import org.apache.hc.core5.http.message.BasicHttpRequest;
66 import org.apache.hc.core5.http.message.BasicHttpResponse;
67 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
68 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
69 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
70 import org.apache.hc.core5.http.nio.CapacityChannel;
71 import org.apache.hc.core5.http.nio.DataStreamChannel;
72 import org.apache.hc.core5.http.nio.RequestChannel;
73 import org.apache.hc.core5.http.nio.ResponseChannel;
74 import org.apache.hc.core5.http.protocol.HttpContext;
75 import org.apache.hc.core5.http.protocol.HttpCoreContext;
76 import org.apache.hc.core5.http.protocol.HttpDateGenerator;
77 import org.apache.hc.core5.io.CloseMode;
78 import org.apache.hc.core5.pool.ConnPoolListener;
79 import org.apache.hc.core5.pool.ConnPoolStats;
80 import org.apache.hc.core5.pool.PoolStats;
81 import org.apache.hc.core5.reactor.IOReactorConfig;
82 import org.apache.hc.core5.util.TextUtils;
83 import org.apache.hc.core5.util.TimeValue;
84 import org.apache.hc.core5.util.Timeout;
85
86
87
88
89 public class AsyncReverseProxyExample {
90
91 private static boolean quiet;
92
93 public static void main(final String[] args) throws Exception {
94 if (args.length < 1) {
95 System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
96 System.exit(1);
97 }
98
99 final HttpHost targetHost = HttpHost.create(args[0]);
100 int port = 8080;
101 if (args.length > 1) {
102 port = Integer.parseInt(args[1]);
103 }
104 for (final String s : args) {
105 if ("--quiet".equalsIgnoreCase(s)) {
106 quiet = true;
107 break;
108 }
109 }
110
111 println("Reverse proxy to " + targetHost);
112
113 final IOReactorConfig config = IOReactorConfig.custom()
114 .setSoTimeout(1, TimeUnit.MINUTES)
115 .build();
116
117 final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
118 .setIOReactorConfig(config)
119 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
120
121 @Override
122 public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
123 final StringBuilder buf = new StringBuilder();
124 buf.append("[proxy->origin] connection leased ").append(route);
125 println(buf.toString());
126 }
127
128 @Override
129 public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
130 final StringBuilder buf = new StringBuilder();
131 buf.append("[proxy->origin] connection released ").append(route);
132 final PoolStats totals = connPoolStats.getTotalStats();
133 buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
134 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
135 buf.append(" of ").append(totals.getMax());
136 println(buf.toString());
137 }
138
139 })
140 .setStreamListener(new Http1StreamListener() {
141
142 @Override
143 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
144
145 }
146
147 @Override
148 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
149
150 }
151
152 @Override
153 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
154 println("[proxy<-origin] connection " +
155 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
156 (keepAlive ? " kept alive" : " cannot be kept alive"));
157 }
158
159 })
160 .setMaxTotal(100)
161 .setDefaultMaxPerRoute(20)
162 .create();
163
164 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
165 .setIOReactorConfig(config)
166 .setStreamListener(new Http1StreamListener() {
167
168 @Override
169 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
170
171 }
172
173 @Override
174 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
175
176 }
177
178 @Override
179 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
180 println("[client<-proxy] connection " +
181 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
182 (keepAlive ? " kept alive" : " cannot be kept alive"));
183 }
184
185 })
186 .register("*", () -> new IncomingExchangeHandler(targetHost, requester))
187 .create();
188
189 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
190 println("Reverse proxy shutting down");
191 server.close(CloseMode.GRACEFUL);
192 requester.close(CloseMode.GRACEFUL);
193 }));
194
195 requester.start();
196 server.start();
197 server.listen(new InetSocketAddress(port), URIScheme.HTTP);
198 println("Listening on port " + port);
199
200 server.awaitShutdown(TimeValue.MAX_VALUE);
201 }
202
203 private static class ProxyBuffer extends BufferedData {
204
205 ProxyBuffer(final int bufferSize) {
206 super(bufferSize);
207 }
208
209 int write(final DataStreamChannel channel) throws IOException {
210 setOutputMode();
211 if (buffer().hasRemaining()) {
212 return channel.write(buffer());
213 }
214 return 0;
215 }
216
217 }
218
219 private static final AtomicLong COUNT = new AtomicLong(0);
220
221 private static class ProxyExchangeState {
222
223 final String id;
224
225 HttpRequest request;
226 EntityDetails requestEntityDetails;
227 DataStreamChannel requestDataChannel;
228 CapacityChannel requestCapacityChannel;
229 ProxyBuffer inBuf;
230 boolean inputEnd;
231
232 HttpResponse response;
233 EntityDetails responseEntityDetails;
234 ResponseChannel responseMessageChannel;
235 DataStreamChannel responseDataChannel;
236 CapacityChannel responseCapacityChannel;
237 ProxyBuffer outBuf;
238 boolean outputEnd;
239
240 AsyncClientEndpoint clientEndpoint;
241
242 ProxyExchangeState() {
243 this.id = String.format("%010d", COUNT.getAndIncrement());
244 }
245
246 }
247
248 private static final int INIT_BUFFER_SIZE = 4096;
249
250 private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
251
252 private final HttpHost targetHost;
253 private final HttpAsyncRequester requester;
254 private final ProxyExchangeState exchangeState;
255
256 IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
257 super();
258 this.targetHost = targetHost;
259 this.requester = requester;
260 this.exchangeState = new ProxyExchangeState();
261 }
262
263 @Override
264 public void handleRequest(
265 final HttpRequest incomingRequest,
266 final EntityDetails entityDetails,
267 final ResponseChannel responseChannel,
268 final HttpContext httpContext) throws HttpException, IOException {
269
270 synchronized (exchangeState) {
271 println("[client->proxy] " + exchangeState.id + " " +
272 incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
273 exchangeState.request = incomingRequest;
274 exchangeState.requestEntityDetails = entityDetails;
275 exchangeState.inputEnd = entityDetails == null;
276 exchangeState.responseMessageChannel = responseChannel;
277
278 if (entityDetails != null) {
279 final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
280 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
281 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
282 }
283 }
284 }
285
286 println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
287
288 requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
289
290 @Override
291 public void completed(final AsyncClientEndpoint clientEndpoint) {
292 println("[proxy->origin] " + exchangeState.id + " connection leased");
293 synchronized (exchangeState) {
294 exchangeState.clientEndpoint = clientEndpoint;
295 }
296 clientEndpoint.execute(
297 new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
298 HttpCoreContext.create());
299 }
300
301 @Override
302 public void failed(final Exception cause) {
303 final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
304 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
305 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
306 final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
307 ContentType.TEXT_PLAIN);
308 synchronized (exchangeState) {
309 exchangeState.response = outgoingResponse;
310 exchangeState.responseEntityDetails = exEntityDetails;
311 exchangeState.outBuf = new ProxyBuffer(1024);
312 exchangeState.outBuf.put(msg);
313 exchangeState.outputEnd = true;
314 }
315 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
316
317 try {
318 responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
319 } catch (final HttpException | IOException ignore) {
320
321 }
322 }
323
324 @Override
325 public void cancelled() {
326 failed(new InterruptedIOException());
327 }
328
329 });
330
331 }
332
333 @Override
334 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
335 synchronized (exchangeState) {
336 exchangeState.requestCapacityChannel = capacityChannel;
337 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
338 if (capacity > 0) {
339 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
340 capacityChannel.update(capacity);
341 }
342 }
343 }
344
345 @Override
346 public void consume(final ByteBuffer src) throws IOException {
347 synchronized (exchangeState) {
348 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
349 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
350 if (dataChannel != null && exchangeState.inBuf != null) {
351 if (exchangeState.inBuf.hasData()) {
352 final int bytesWritten = exchangeState.inBuf.write(dataChannel);
353 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
354 }
355 if (!exchangeState.inBuf.hasData()) {
356 final int bytesWritten = dataChannel.write(src);
357 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
358 }
359 }
360 if (src.hasRemaining()) {
361 if (exchangeState.inBuf == null) {
362 exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
363 }
364 exchangeState.inBuf.put(src);
365 }
366 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
367 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
368 if (dataChannel != null) {
369 dataChannel.requestOutput();
370 }
371 }
372 }
373
374 @Override
375 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
376 synchronized (exchangeState) {
377 println("[client->proxy] " + exchangeState.id + " end of input");
378 exchangeState.inputEnd = true;
379 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
380 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
381 println("[proxy->origin] " + exchangeState.id + " end of output");
382 dataChannel.endStream();
383 }
384 }
385 }
386
387 @Override
388 public int available() {
389 synchronized (exchangeState) {
390 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
391 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
392 return available;
393 }
394 }
395
396 @Override
397 public void produce(final DataStreamChannel channel) throws IOException {
398 synchronized (exchangeState) {
399 println("[client<-proxy] " + exchangeState.id + " produce output");
400 exchangeState.responseDataChannel = channel;
401
402 if (exchangeState.outBuf != null) {
403 if (exchangeState.outBuf.hasData()) {
404 final int bytesWritten = exchangeState.outBuf.write(channel);
405 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
406 }
407 if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
408 channel.endStream();
409 println("[client<-proxy] " + exchangeState.id + " end of output");
410 }
411 if (!exchangeState.outputEnd) {
412 final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
413 if (capacityChannel != null) {
414 final int capacity = exchangeState.outBuf.capacity();
415 if (capacity > 0) {
416 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
417 capacityChannel.update(capacity);
418 }
419 }
420 }
421 }
422 }
423 }
424
425 @Override
426 public void failed(final Exception cause) {
427 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
428 if (!(cause instanceof ConnectionClosedException)) {
429 cause.printStackTrace(System.out);
430 }
431 synchronized (exchangeState) {
432 if (exchangeState.clientEndpoint != null) {
433 exchangeState.clientEndpoint.releaseAndDiscard();
434 }
435 }
436 }
437
438 @Override
439 public void releaseResources() {
440 synchronized (exchangeState) {
441 exchangeState.responseMessageChannel = null;
442 exchangeState.responseDataChannel = null;
443 exchangeState.requestCapacityChannel = null;
444 }
445 }
446
447 }
448
449 private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
450
451 private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
452 TextUtils.toLowerCase(HttpHeaders.HOST),
453 TextUtils.toLowerCase(HttpHeaders.CONTENT_LENGTH),
454 TextUtils.toLowerCase(HttpHeaders.TRANSFER_ENCODING),
455 TextUtils.toLowerCase(HttpHeaders.CONNECTION),
456 TextUtils.toLowerCase(HttpHeaders.KEEP_ALIVE),
457 TextUtils.toLowerCase(HttpHeaders.PROXY_AUTHENTICATE),
458 TextUtils.toLowerCase(HttpHeaders.TE),
459 TextUtils.toLowerCase(HttpHeaders.TRAILER),
460 TextUtils.toLowerCase(HttpHeaders.UPGRADE))));
461
462 private final HttpHost targetHost;
463 private final AsyncClientEndpoint clientEndpoint;
464 private final ProxyExchangeState exchangeState;
465
466 OutgoingExchangeHandler(
467 final HttpHost targetHost,
468 final AsyncClientEndpoint clientEndpoint,
469 final ProxyExchangeState exchangeState) {
470 this.targetHost = targetHost;
471 this.clientEndpoint = clientEndpoint;
472 this.exchangeState = exchangeState;
473 }
474
475 @Override
476 public void produceRequest(
477 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
478 synchronized (exchangeState) {
479 final HttpRequest incomingRequest = exchangeState.request;
480 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
481 final HttpRequest outgoingRequest = new BasicHttpRequest(
482 incomingRequest.getMethod(),
483 targetHost,
484 incomingRequest.getPath());
485 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
486 final Header header = it.next();
487 if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
488 outgoingRequest.addHeader(header);
489 }
490 }
491
492 println("[proxy->origin] " + exchangeState.id + " " +
493 outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
494
495 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
496 }
497 }
498
499 @Override
500 public int available() {
501 synchronized (exchangeState) {
502 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
503 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
504 return available;
505 }
506 }
507
508 @Override
509 public void produce(final DataStreamChannel channel) throws IOException {
510 synchronized (exchangeState) {
511 println("[proxy->origin] " + exchangeState.id + " produce output");
512 exchangeState.requestDataChannel = channel;
513 if (exchangeState.inBuf != null) {
514 if (exchangeState.inBuf.hasData()) {
515 final int bytesWritten = exchangeState.inBuf.write(channel);
516 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
517 }
518 if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
519 channel.endStream();
520 println("[proxy->origin] " + exchangeState.id + " end of output");
521 }
522 if (!exchangeState.inputEnd) {
523 final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
524 if (capacityChannel != null) {
525 final int capacity = exchangeState.inBuf.capacity();
526 if (capacity > 0) {
527 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
528 capacityChannel.update(capacity);
529 }
530 }
531 }
532 }
533 }
534 }
535
536 @Override
537 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
538
539 }
540
541 @Override
542 public void consumeResponse(
543 final HttpResponse incomingResponse,
544 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
545 synchronized (exchangeState) {
546 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
547 if (entityDetails == null) {
548 println("[proxy<-origin] " + exchangeState.id + " end of input");
549 }
550
551 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
552 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
553 final Header header = it.next();
554 if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
555 outgoingResponse.addHeader(header);
556 }
557 }
558
559 exchangeState.response = outgoingResponse;
560 exchangeState.responseEntityDetails = entityDetails;
561 exchangeState.outputEnd = entityDetails == null;
562
563 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
564 if (responseChannel != null) {
565
566 responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
567 }
568
569 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
570 if (entityDetails == null) {
571 println("[client<-proxy] " + exchangeState.id + " end of output");
572 clientEndpoint.releaseAndReuse();
573 }
574 }
575 }
576
577 @Override
578 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
579 synchronized (exchangeState) {
580 exchangeState.responseCapacityChannel = capacityChannel;
581 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
582 if (capacity > 0) {
583 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
584 capacityChannel.update(capacity);
585 }
586 }
587 }
588
589 @Override
590 public void consume(final ByteBuffer src) throws IOException {
591 synchronized (exchangeState) {
592 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
593 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
594 if (dataChannel != null && exchangeState.outBuf != null) {
595 if (exchangeState.outBuf.hasData()) {
596 final int bytesWritten = exchangeState.outBuf.write(dataChannel);
597 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
598 }
599 if (!exchangeState.outBuf.hasData()) {
600 final int bytesWritten = dataChannel.write(src);
601 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
602 }
603 }
604 if (src.hasRemaining()) {
605 if (exchangeState.outBuf == null) {
606 exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
607 }
608 exchangeState.outBuf.put(src);
609 }
610 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
611 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
612 if (dataChannel != null) {
613 dataChannel.requestOutput();
614 }
615 }
616 }
617
618 @Override
619 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
620 synchronized (exchangeState) {
621 println("[proxy<-origin] " + exchangeState.id + " end of input");
622 exchangeState.outputEnd = true;
623 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
624 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
625 println("[client<-proxy] " + exchangeState.id + " end of output");
626 dataChannel.endStream();
627 clientEndpoint.releaseAndReuse();
628 }
629 }
630 }
631
632 @Override
633 public void cancel() {
634 clientEndpoint.releaseAndDiscard();
635 }
636
637 @Override
638 public void failed(final Exception cause) {
639 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
640 if (!(cause instanceof ConnectionClosedException)) {
641 cause.printStackTrace(System.out);
642 }
643 synchronized (exchangeState) {
644 if (exchangeState.response == null) {
645 final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
646 final HttpResponse outgoingResponse = new BasicHttpResponse(status);
647 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
648 exchangeState.response = outgoingResponse;
649
650 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
651 final int contentLen = msg.remaining();
652 exchangeState.outBuf = new ProxyBuffer(1024);
653 exchangeState.outBuf.put(msg);
654 exchangeState.outputEnd = true;
655
656 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
657
658 try {
659 final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
660 exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
661 } catch (final HttpException | IOException ignore) {
662
663 }
664 } else {
665 exchangeState.outputEnd = true;
666 }
667 clientEndpoint.releaseAndDiscard();
668 }
669 }
670
671 @Override
672 public void releaseResources() {
673 synchronized (exchangeState) {
674 exchangeState.requestDataChannel = null;
675 exchangeState.responseCapacityChannel = null;
676 clientEndpoint.releaseAndDiscard();
677 }
678 }
679
680 }
681
682 static void println(final String msg) {
683 if (!quiet) {
684 System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
685 }
686 }
687 }