1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.net.InetSocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.CharBuffer;
34 import java.nio.charset.StandardCharsets;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Locale;
41 import java.util.Set;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.function.Supplier;
47 import org.apache.hc.core5.http.ConnectionClosedException;
48 import org.apache.hc.core5.http.ContentType;
49 import org.apache.hc.core5.http.EntityDetails;
50 import org.apache.hc.core5.http.Header;
51 import org.apache.hc.core5.http.HeaderElements;
52 import org.apache.hc.core5.http.HttpConnection;
53 import org.apache.hc.core5.http.HttpException;
54 import org.apache.hc.core5.http.HttpHeaders;
55 import org.apache.hc.core5.http.HttpHost;
56 import org.apache.hc.core5.http.HttpRequest;
57 import org.apache.hc.core5.http.HttpResponse;
58 import org.apache.hc.core5.http.HttpStatus;
59 import org.apache.hc.core5.http.impl.BasicEntityDetails;
60 import org.apache.hc.core5.http.impl.Http1StreamListener;
61 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
62 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
63 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
64 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
65 import org.apache.hc.core5.http.impl.nio.BufferedData;
66 import org.apache.hc.core5.http.message.BasicHttpRequest;
67 import org.apache.hc.core5.http.message.BasicHttpResponse;
68 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
69 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
71 import org.apache.hc.core5.http.nio.CapacityChannel;
72 import org.apache.hc.core5.http.nio.DataStreamChannel;
73 import org.apache.hc.core5.http.nio.RequestChannel;
74 import org.apache.hc.core5.http.nio.ResponseChannel;
75 import org.apache.hc.core5.http.protocol.HttpContext;
76 import org.apache.hc.core5.http.protocol.HttpCoreContext;
77 import org.apache.hc.core5.http.protocol.HttpDateGenerator;
78 import org.apache.hc.core5.io.CloseMode;
79 import org.apache.hc.core5.pool.ConnPoolListener;
80 import org.apache.hc.core5.pool.ConnPoolStats;
81 import org.apache.hc.core5.pool.PoolStats;
82 import org.apache.hc.core5.reactor.IOReactorConfig;
83 import org.apache.hc.core5.util.TimeValue;
84 import org.apache.hc.core5.util.Timeout;
85
86
87
88
89 public class AsyncReverseProxyExample {
90
91 private static boolean quiet;
92
93 public static void main(final String[] args) throws Exception {
94 if (args.length < 1) {
95 System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
96 System.exit(1);
97 }
98
99 final HttpHost targetHost = HttpHost.create(args[0]);
100 int port = 8080;
101 if (args.length > 1) {
102 port = Integer.parseInt(args[1]);
103 }
104 for (final String s : args) {
105 if ("--quiet".equalsIgnoreCase(s)) {
106 quiet = true;
107 break;
108 }
109 }
110
111 println("Reverse proxy to " + targetHost);
112
113 final IOReactorConfig config = IOReactorConfig.custom()
114 .setSoTimeout(1, TimeUnit.MINUTES)
115 .build();
116
117 final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
118 .setIOReactorConfig(config)
119 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
120
121 @Override
122 public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
123 final StringBuilder buf = new StringBuilder();
124 buf.append("[proxy->origin] connection leased ").append(route);
125 println(buf.toString());
126 }
127
128 @Override
129 public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
130 final StringBuilder buf = new StringBuilder();
131 buf.append("[proxy->origin] connection released ").append(route);
132 final PoolStats totals = connPoolStats.getTotalStats();
133 buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
134 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
135 buf.append(" of ").append(totals.getMax());
136 println(buf.toString());
137 }
138
139 })
140 .setStreamListener(new Http1StreamListener() {
141
142 @Override
143 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
144
145 }
146
147 @Override
148 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
149
150 }
151
152 @Override
153 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
154 println("[proxy<-origin] connection " +
155 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
156 (keepAlive ? " kept alive" : " cannot be kept alive"));
157 }
158
159 })
160 .setMaxTotal(100)
161 .setDefaultMaxPerRoute(20)
162 .create();
163
164 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
165 .setIOReactorConfig(config)
166 .setStreamListener(new Http1StreamListener() {
167
168 @Override
169 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
170
171 }
172
173 @Override
174 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
175
176 }
177
178 @Override
179 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
180 println("[client<-proxy] connection " +
181 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
182 (keepAlive ? " kept alive" : " cannot be kept alive"));
183 }
184
185 })
186 .register("*", new Supplier<AsyncServerExchangeHandler>() {
187
188 @Override
189 public AsyncServerExchangeHandler get() {
190 return new IncomingExchangeHandler(targetHost, requester);
191 }
192
193 })
194 .create();
195
196 Runtime.getRuntime().addShutdownHook(new Thread() {
197 @Override
198 public void run() {
199 println("Reverse proxy shutting down");
200 server.close(CloseMode.GRACEFUL);
201 requester.close(CloseMode.GRACEFUL);
202 }
203 });
204
205 requester.start();
206 server.start();
207 server.listen(new InetSocketAddress(port));
208 println("Listening on port " + port);
209
210 server.awaitShutdown(TimeValue.MAX_VALUE);
211 }
212
213 private static class ProxyBuffer extends BufferedData {
214
215 ProxyBuffer(final int bufferSize) {
216 super(bufferSize);
217 }
218
219 int write(final DataStreamChannel channel) throws IOException {
220 setOutputMode();
221 if (buffer().hasRemaining()) {
222 return channel.write(buffer());
223 }
224 return 0;
225 }
226
227 }
228
229 private static final AtomicLong COUNT = new AtomicLong(0);
230
231 private static class ProxyExchangeState {
232
233 final String id;
234
235 HttpRequest request;
236 EntityDetails requestEntityDetails;
237 DataStreamChannel requestDataChannel;
238 CapacityChannel requestCapacityChannel;
239 ProxyBuffer inBuf;
240 boolean inputEnd;
241
242 HttpResponse response;
243 EntityDetails responseEntityDetails;
244 ResponseChannel responseMessageChannel;
245 DataStreamChannel responseDataChannel;
246 CapacityChannel responseCapacityChannel;
247 ProxyBuffer outBuf;
248 boolean outputEnd;
249
250 AsyncClientEndpoint clientEndpoint;
251
252 ProxyExchangeState() {
253 this.id = String.format("%08X", COUNT.getAndIncrement());
254 }
255
256 }
257
258 private static final int INIT_BUFFER_SIZE = 4096;
259
260 private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
261
262 private final HttpHost targetHost;
263 private final HttpAsyncRequester requester;
264 private final ProxyExchangeState exchangeState;
265
266 IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
267 super();
268 this.targetHost = targetHost;
269 this.requester = requester;
270 this.exchangeState = new ProxyExchangeState();
271 }
272
273 @Override
274 public void handleRequest(
275 final HttpRequest incomingRequest,
276 final EntityDetails entityDetails,
277 final ResponseChannel responseChannel,
278 final HttpContext httpContext) throws HttpException, IOException {
279
280 synchronized (exchangeState) {
281 println("[client->proxy] " + exchangeState.id + " " +
282 incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
283 exchangeState.request = incomingRequest;
284 exchangeState.requestEntityDetails = entityDetails;
285 exchangeState.inputEnd = entityDetails == null;
286 exchangeState.responseMessageChannel = responseChannel;
287
288 if (entityDetails != null) {
289 final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
290 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
291 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
292 }
293 }
294 }
295
296 println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
297
298 requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
299
300 @Override
301 public void completed(final AsyncClientEndpoint clientEndpoint) {
302 println("[proxy->origin] " + exchangeState.id + " connection leased");
303 synchronized (exchangeState) {
304 exchangeState.clientEndpoint = clientEndpoint;
305 }
306 clientEndpoint.execute(
307 new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
308 HttpCoreContext.create());
309 }
310
311 @Override
312 public void failed(final Exception cause) {
313 final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
314 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
315 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
316 final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
317 ContentType.TEXT_PLAIN);
318 synchronized (exchangeState) {
319 exchangeState.response = outgoingResponse;
320 exchangeState.responseEntityDetails = exEntityDetails;
321 exchangeState.outBuf = new ProxyBuffer(1024);
322 exchangeState.outBuf.put(msg);
323 exchangeState.outputEnd = true;
324 }
325 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
326
327 try {
328 responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
329 } catch (final HttpException | IOException ignore) {
330
331 }
332 }
333
334 @Override
335 public void cancelled() {
336 failed(new InterruptedIOException());
337 }
338
339 });
340
341 }
342
343 @Override
344 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
345 synchronized (exchangeState) {
346 exchangeState.requestCapacityChannel = capacityChannel;
347 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
348 if (capacity > 0) {
349 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
350 capacityChannel.update(capacity);
351 }
352 }
353 }
354
355 @Override
356 public void consume(final ByteBuffer src) throws IOException {
357 synchronized (exchangeState) {
358 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
359 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
360 if (dataChannel != null && exchangeState.inBuf != null) {
361 if (exchangeState.inBuf.hasData()) {
362 final int bytesWritten = exchangeState.inBuf.write(dataChannel);
363 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
364 }
365 if (!exchangeState.inBuf.hasData()) {
366 final int bytesWritten = dataChannel.write(src);
367 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
368 }
369 }
370 if (src.hasRemaining()) {
371 if (exchangeState.inBuf == null) {
372 exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
373 }
374 exchangeState.inBuf.put(src);
375 }
376 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
377 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
378 if (dataChannel != null) {
379 dataChannel.requestOutput();
380 }
381 }
382 }
383
384 @Override
385 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
386 synchronized (exchangeState) {
387 println("[client->proxy] " + exchangeState.id + " end of input");
388 exchangeState.inputEnd = true;
389 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
390 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
391 println("[proxy->origin] " + exchangeState.id + " end of output");
392 dataChannel.endStream();
393 }
394 }
395 }
396
397 @Override
398 public int available() {
399 synchronized (exchangeState) {
400 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
401 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
402 return available;
403 }
404 }
405
406 @Override
407 public void produce(final DataStreamChannel channel) throws IOException {
408 synchronized (exchangeState) {
409 println("[client<-proxy] " + exchangeState.id + " produce output");
410 exchangeState.responseDataChannel = channel;
411
412 if (exchangeState.outBuf != null) {
413 if (exchangeState.outBuf.hasData()) {
414 final int bytesWritten = exchangeState.outBuf.write(channel);
415 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
416 }
417 if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
418 channel.endStream();
419 println("[client<-proxy] " + exchangeState.id + " end of output");
420 }
421 if (!exchangeState.outputEnd) {
422 final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
423 if (capacityChannel != null) {
424 final int capacity = exchangeState.outBuf.capacity();
425 if (capacity > 0) {
426 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
427 capacityChannel.update(capacity);
428 }
429 }
430 }
431 }
432 }
433 }
434
435 @Override
436 public void failed(final Exception cause) {
437 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
438 if (!(cause instanceof ConnectionClosedException)) {
439 cause.printStackTrace(System.out);
440 }
441 synchronized (exchangeState) {
442 if (exchangeState.clientEndpoint != null) {
443 exchangeState.clientEndpoint.releaseAndDiscard();
444 }
445 }
446 }
447
448 @Override
449 public void releaseResources() {
450 synchronized (exchangeState) {
451 exchangeState.responseMessageChannel = null;
452 exchangeState.responseDataChannel = null;
453 exchangeState.requestCapacityChannel = null;
454 }
455 }
456
457 }
458
459 private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
460
461 private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
462 HttpHeaders.HOST.toLowerCase(Locale.ROOT),
463 HttpHeaders.CONTENT_LENGTH.toLowerCase(Locale.ROOT),
464 HttpHeaders.TRANSFER_ENCODING.toLowerCase(Locale.ROOT),
465 HttpHeaders.CONNECTION.toLowerCase(Locale.ROOT),
466 HttpHeaders.KEEP_ALIVE.toLowerCase(Locale.ROOT),
467 HttpHeaders.PROXY_AUTHENTICATE.toLowerCase(Locale.ROOT),
468 HttpHeaders.TE.toLowerCase(Locale.ROOT),
469 HttpHeaders.TRAILER.toLowerCase(Locale.ROOT),
470 HttpHeaders.UPGRADE.toLowerCase(Locale.ROOT))));
471
472 private final HttpHost targetHost;
473 private final AsyncClientEndpoint clientEndpoint;
474 private final ProxyExchangeState exchangeState;
475
476 OutgoingExchangeHandler(
477 final HttpHost targetHost,
478 final AsyncClientEndpoint clientEndpoint,
479 final ProxyExchangeState exchangeState) {
480 this.targetHost = targetHost;
481 this.clientEndpoint = clientEndpoint;
482 this.exchangeState = exchangeState;
483 }
484
485 @Override
486 public void produceRequest(
487 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
488 synchronized (exchangeState) {
489 final HttpRequest incomingRequest = exchangeState.request;
490 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
491 final HttpRequest outgoingRequest = new BasicHttpRequest(
492 incomingRequest.getMethod(),
493 targetHost,
494 incomingRequest.getPath());
495 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
496 final Header header = it.next();
497 if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
498 outgoingRequest.addHeader(header);
499 }
500 }
501
502 println("[proxy->origin] " + exchangeState.id + " " +
503 outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
504
505 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
506 }
507 }
508
509 @Override
510 public int available() {
511 synchronized (exchangeState) {
512 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
513 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
514 return available;
515 }
516 }
517
518 @Override
519 public void produce(final DataStreamChannel channel) throws IOException {
520 synchronized (exchangeState) {
521 println("[proxy->origin] " + exchangeState.id + " produce output");
522 exchangeState.requestDataChannel = channel;
523 if (exchangeState.inBuf != null) {
524 if (exchangeState.inBuf.hasData()) {
525 final int bytesWritten = exchangeState.inBuf.write(channel);
526 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
527 }
528 if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
529 channel.endStream();
530 println("[proxy->origin] " + exchangeState.id + " end of output");
531 }
532 if (!exchangeState.inputEnd) {
533 final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
534 if (capacityChannel != null) {
535 final int capacity = exchangeState.inBuf.capacity();
536 if (capacity > 0) {
537 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
538 capacityChannel.update(capacity);
539 }
540 }
541 }
542 }
543 }
544 }
545
546 @Override
547 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
548
549 }
550
551 @Override
552 public void consumeResponse(
553 final HttpResponse incomingResponse,
554 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
555 synchronized (exchangeState) {
556 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
557 if (entityDetails == null) {
558 println("[proxy<-origin] " + exchangeState.id + " end of input");
559 }
560
561 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
562 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
563 final Header header = it.next();
564 if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
565 outgoingResponse.addHeader(header);
566 }
567 }
568
569 exchangeState.response = outgoingResponse;
570 exchangeState.responseEntityDetails = entityDetails;
571 exchangeState.outputEnd = entityDetails == null;
572
573 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
574 if (responseChannel != null) {
575
576 responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
577 }
578
579 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
580 if (entityDetails == null) {
581 println("[client<-proxy] " + exchangeState.id + " end of output");
582 clientEndpoint.releaseAndReuse();
583 }
584 }
585 }
586
587 @Override
588 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
589 synchronized (exchangeState) {
590 exchangeState.responseCapacityChannel = capacityChannel;
591 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
592 if (capacity > 0) {
593 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
594 capacityChannel.update(capacity);
595 }
596 }
597 }
598
599 @Override
600 public void consume(final ByteBuffer src) throws IOException {
601 synchronized (exchangeState) {
602 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
603 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
604 if (dataChannel != null && exchangeState.outBuf != null) {
605 if (exchangeState.outBuf.hasData()) {
606 final int bytesWritten = exchangeState.outBuf.write(dataChannel);
607 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
608 }
609 if (!exchangeState.outBuf.hasData()) {
610 final int bytesWritten = dataChannel.write(src);
611 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
612 }
613 }
614 if (src.hasRemaining()) {
615 if (exchangeState.outBuf == null) {
616 exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
617 }
618 exchangeState.outBuf.put(src);
619 }
620 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
621 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
622 if (dataChannel != null) {
623 dataChannel.requestOutput();
624 }
625 }
626 }
627
628 @Override
629 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
630 synchronized (exchangeState) {
631 println("[proxy<-origin] " + exchangeState.id + " end of input");
632 exchangeState.outputEnd = true;
633 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
634 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
635 println("[client<-proxy] " + exchangeState.id + " end of output");
636 dataChannel.endStream();
637 clientEndpoint.releaseAndReuse();
638 }
639 }
640 }
641
642 @Override
643 public void cancel() {
644 clientEndpoint.releaseAndDiscard();
645 }
646
647 @Override
648 public void failed(final Exception cause) {
649 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
650 if (!(cause instanceof ConnectionClosedException)) {
651 cause.printStackTrace(System.out);
652 }
653 synchronized (exchangeState) {
654 if (exchangeState.response == null) {
655 final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
656 final HttpResponse outgoingResponse = new BasicHttpResponse(status);
657 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
658 exchangeState.response = outgoingResponse;
659
660 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
661 final int contentLen = msg.remaining();
662 exchangeState.outBuf = new ProxyBuffer(1024);
663 exchangeState.outBuf.put(msg);
664 exchangeState.outputEnd = true;
665
666 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
667
668 try {
669 final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
670 exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
671 } catch (final HttpException | IOException ignore) {
672
673 }
674 } else {
675 exchangeState.outputEnd = true;
676 }
677 clientEndpoint.releaseAndDiscard();
678 }
679 }
680
681 @Override
682 public void releaseResources() {
683 synchronized (exchangeState) {
684 exchangeState.requestDataChannel = null;
685 exchangeState.responseCapacityChannel = null;
686 clientEndpoint.releaseAndDiscard();
687 }
688 }
689
690 }
691
692 static final void println(final String msg) {
693 if (!quiet) {
694 System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
695 }
696 }
697 }