1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.net.InetSocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.CharBuffer;
34 import java.nio.charset.StandardCharsets;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Locale;
41 import java.util.Set;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicLong;
44
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.function.Supplier;
47 import org.apache.hc.core5.http.ConnectionClosedException;
48 import org.apache.hc.core5.http.ContentType;
49 import org.apache.hc.core5.http.EntityDetails;
50 import org.apache.hc.core5.http.Header;
51 import org.apache.hc.core5.http.HeaderElements;
52 import org.apache.hc.core5.http.HttpConnection;
53 import org.apache.hc.core5.http.HttpException;
54 import org.apache.hc.core5.http.HttpHeaders;
55 import org.apache.hc.core5.http.HttpHost;
56 import org.apache.hc.core5.http.HttpRequest;
57 import org.apache.hc.core5.http.HttpResponse;
58 import org.apache.hc.core5.http.HttpStatus;
59 import org.apache.hc.core5.http.URIScheme;
60 import org.apache.hc.core5.http.impl.BasicEntityDetails;
61 import org.apache.hc.core5.http.impl.Http1StreamListener;
62 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
63 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
64 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
65 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
66 import org.apache.hc.core5.http.impl.nio.BufferedData;
67 import org.apache.hc.core5.http.message.BasicHttpRequest;
68 import org.apache.hc.core5.http.message.BasicHttpResponse;
69 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
70 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
71 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
72 import org.apache.hc.core5.http.nio.CapacityChannel;
73 import org.apache.hc.core5.http.nio.DataStreamChannel;
74 import org.apache.hc.core5.http.nio.RequestChannel;
75 import org.apache.hc.core5.http.nio.ResponseChannel;
76 import org.apache.hc.core5.http.protocol.HttpContext;
77 import org.apache.hc.core5.http.protocol.HttpCoreContext;
78 import org.apache.hc.core5.http.protocol.HttpDateGenerator;
79 import org.apache.hc.core5.io.CloseMode;
80 import org.apache.hc.core5.pool.ConnPoolListener;
81 import org.apache.hc.core5.pool.ConnPoolStats;
82 import org.apache.hc.core5.pool.PoolStats;
83 import org.apache.hc.core5.reactor.IOReactorConfig;
84 import org.apache.hc.core5.util.TimeValue;
85 import org.apache.hc.core5.util.Timeout;
86
87
88
89
90 public class AsyncReverseProxyExample {
91
92 private static boolean quiet;
93
94 public static void main(final String[] args) throws Exception {
95 if (args.length < 1) {
96 System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
97 System.exit(1);
98 }
99
100 final HttpHost targetHost = HttpHost.create(args[0]);
101 int port = 8080;
102 if (args.length > 1) {
103 port = Integer.parseInt(args[1]);
104 }
105 for (final String s : args) {
106 if ("--quiet".equalsIgnoreCase(s)) {
107 quiet = true;
108 break;
109 }
110 }
111
112 println("Reverse proxy to " + targetHost);
113
114 final IOReactorConfig config = IOReactorConfig.custom()
115 .setSoTimeout(1, TimeUnit.MINUTES)
116 .build();
117
118 final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
119 .setIOReactorConfig(config)
120 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
121
122 @Override
123 public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
124 final StringBuilder buf = new StringBuilder();
125 buf.append("[proxy->origin] connection leased ").append(route);
126 println(buf.toString());
127 }
128
129 @Override
130 public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
131 final StringBuilder buf = new StringBuilder();
132 buf.append("[proxy->origin] connection released ").append(route);
133 final PoolStats totals = connPoolStats.getTotalStats();
134 buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
135 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
136 buf.append(" of ").append(totals.getMax());
137 println(buf.toString());
138 }
139
140 })
141 .setStreamListener(new Http1StreamListener() {
142
143 @Override
144 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
145
146 }
147
148 @Override
149 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
150
151 }
152
153 @Override
154 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
155 println("[proxy<-origin] connection " +
156 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
157 (keepAlive ? " kept alive" : " cannot be kept alive"));
158 }
159
160 })
161 .setMaxTotal(100)
162 .setDefaultMaxPerRoute(20)
163 .create();
164
165 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
166 .setIOReactorConfig(config)
167 .setStreamListener(new Http1StreamListener() {
168
169 @Override
170 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
171
172 }
173
174 @Override
175 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
176
177 }
178
179 @Override
180 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
181 println("[client<-proxy] connection " +
182 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
183 (keepAlive ? " kept alive" : " cannot be kept alive"));
184 }
185
186 })
187 .register("*", new Supplier<AsyncServerExchangeHandler>() {
188
189 @Override
190 public AsyncServerExchangeHandler get() {
191 return new IncomingExchangeHandler(targetHost, requester);
192 }
193
194 })
195 .create();
196
197 Runtime.getRuntime().addShutdownHook(new Thread() {
198 @Override
199 public void run() {
200 println("Reverse proxy shutting down");
201 server.close(CloseMode.GRACEFUL);
202 requester.close(CloseMode.GRACEFUL);
203 }
204 });
205
206 requester.start();
207 server.start();
208 server.listen(new InetSocketAddress(port), URIScheme.HTTP);
209 println("Listening on port " + port);
210
211 server.awaitShutdown(TimeValue.MAX_VALUE);
212 }
213
214 private static class ProxyBuffer extends BufferedData {
215
216 ProxyBuffer(final int bufferSize) {
217 super(bufferSize);
218 }
219
220 int write(final DataStreamChannel channel) throws IOException {
221 setOutputMode();
222 if (buffer().hasRemaining()) {
223 return channel.write(buffer());
224 }
225 return 0;
226 }
227
228 }
229
230 private static final AtomicLong COUNT = new AtomicLong(0);
231
232 private static class ProxyExchangeState {
233
234 final String id;
235
236 HttpRequest request;
237 EntityDetails requestEntityDetails;
238 DataStreamChannel requestDataChannel;
239 CapacityChannel requestCapacityChannel;
240 ProxyBuffer inBuf;
241 boolean inputEnd;
242
243 HttpResponse response;
244 EntityDetails responseEntityDetails;
245 ResponseChannel responseMessageChannel;
246 DataStreamChannel responseDataChannel;
247 CapacityChannel responseCapacityChannel;
248 ProxyBuffer outBuf;
249 boolean outputEnd;
250
251 AsyncClientEndpoint clientEndpoint;
252
253 ProxyExchangeState() {
254 this.id = String.format("%010d", COUNT.getAndIncrement());
255 }
256
257 }
258
259 private static final int INIT_BUFFER_SIZE = 4096;
260
261 private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
262
263 private final HttpHost targetHost;
264 private final HttpAsyncRequester requester;
265 private final ProxyExchangeState exchangeState;
266
267 IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
268 super();
269 this.targetHost = targetHost;
270 this.requester = requester;
271 this.exchangeState = new ProxyExchangeState();
272 }
273
274 @Override
275 public void handleRequest(
276 final HttpRequest incomingRequest,
277 final EntityDetails entityDetails,
278 final ResponseChannel responseChannel,
279 final HttpContext httpContext) throws HttpException, IOException {
280
281 synchronized (exchangeState) {
282 println("[client->proxy] " + exchangeState.id + " " +
283 incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
284 exchangeState.request = incomingRequest;
285 exchangeState.requestEntityDetails = entityDetails;
286 exchangeState.inputEnd = entityDetails == null;
287 exchangeState.responseMessageChannel = responseChannel;
288
289 if (entityDetails != null) {
290 final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
291 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
292 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
293 }
294 }
295 }
296
297 println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
298
299 requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
300
301 @Override
302 public void completed(final AsyncClientEndpoint clientEndpoint) {
303 println("[proxy->origin] " + exchangeState.id + " connection leased");
304 synchronized (exchangeState) {
305 exchangeState.clientEndpoint = clientEndpoint;
306 }
307 clientEndpoint.execute(
308 new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
309 HttpCoreContext.create());
310 }
311
312 @Override
313 public void failed(final Exception cause) {
314 final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
315 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
316 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
317 final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
318 ContentType.TEXT_PLAIN);
319 synchronized (exchangeState) {
320 exchangeState.response = outgoingResponse;
321 exchangeState.responseEntityDetails = exEntityDetails;
322 exchangeState.outBuf = new ProxyBuffer(1024);
323 exchangeState.outBuf.put(msg);
324 exchangeState.outputEnd = true;
325 }
326 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
327
328 try {
329 responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
330 } catch (final HttpException | IOException ignore) {
331
332 }
333 }
334
335 @Override
336 public void cancelled() {
337 failed(new InterruptedIOException());
338 }
339
340 });
341
342 }
343
344 @Override
345 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
346 synchronized (exchangeState) {
347 exchangeState.requestCapacityChannel = capacityChannel;
348 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
349 if (capacity > 0) {
350 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
351 capacityChannel.update(capacity);
352 }
353 }
354 }
355
356 @Override
357 public void consume(final ByteBuffer src) throws IOException {
358 synchronized (exchangeState) {
359 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
360 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
361 if (dataChannel != null && exchangeState.inBuf != null) {
362 if (exchangeState.inBuf.hasData()) {
363 final int bytesWritten = exchangeState.inBuf.write(dataChannel);
364 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
365 }
366 if (!exchangeState.inBuf.hasData()) {
367 final int bytesWritten = dataChannel.write(src);
368 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
369 }
370 }
371 if (src.hasRemaining()) {
372 if (exchangeState.inBuf == null) {
373 exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
374 }
375 exchangeState.inBuf.put(src);
376 }
377 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
378 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
379 if (dataChannel != null) {
380 dataChannel.requestOutput();
381 }
382 }
383 }
384
385 @Override
386 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
387 synchronized (exchangeState) {
388 println("[client->proxy] " + exchangeState.id + " end of input");
389 exchangeState.inputEnd = true;
390 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
391 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
392 println("[proxy->origin] " + exchangeState.id + " end of output");
393 dataChannel.endStream();
394 }
395 }
396 }
397
398 @Override
399 public int available() {
400 synchronized (exchangeState) {
401 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
402 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
403 return available;
404 }
405 }
406
407 @Override
408 public void produce(final DataStreamChannel channel) throws IOException {
409 synchronized (exchangeState) {
410 println("[client<-proxy] " + exchangeState.id + " produce output");
411 exchangeState.responseDataChannel = channel;
412
413 if (exchangeState.outBuf != null) {
414 if (exchangeState.outBuf.hasData()) {
415 final int bytesWritten = exchangeState.outBuf.write(channel);
416 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
417 }
418 if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
419 channel.endStream();
420 println("[client<-proxy] " + exchangeState.id + " end of output");
421 }
422 if (!exchangeState.outputEnd) {
423 final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
424 if (capacityChannel != null) {
425 final int capacity = exchangeState.outBuf.capacity();
426 if (capacity > 0) {
427 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
428 capacityChannel.update(capacity);
429 }
430 }
431 }
432 }
433 }
434 }
435
436 @Override
437 public void failed(final Exception cause) {
438 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
439 if (!(cause instanceof ConnectionClosedException)) {
440 cause.printStackTrace(System.out);
441 }
442 synchronized (exchangeState) {
443 if (exchangeState.clientEndpoint != null) {
444 exchangeState.clientEndpoint.releaseAndDiscard();
445 }
446 }
447 }
448
449 @Override
450 public void releaseResources() {
451 synchronized (exchangeState) {
452 exchangeState.responseMessageChannel = null;
453 exchangeState.responseDataChannel = null;
454 exchangeState.requestCapacityChannel = null;
455 }
456 }
457
458 }
459
460 private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
461
462 private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
463 HttpHeaders.HOST.toLowerCase(Locale.ROOT),
464 HttpHeaders.CONTENT_LENGTH.toLowerCase(Locale.ROOT),
465 HttpHeaders.TRANSFER_ENCODING.toLowerCase(Locale.ROOT),
466 HttpHeaders.CONNECTION.toLowerCase(Locale.ROOT),
467 HttpHeaders.KEEP_ALIVE.toLowerCase(Locale.ROOT),
468 HttpHeaders.PROXY_AUTHENTICATE.toLowerCase(Locale.ROOT),
469 HttpHeaders.TE.toLowerCase(Locale.ROOT),
470 HttpHeaders.TRAILER.toLowerCase(Locale.ROOT),
471 HttpHeaders.UPGRADE.toLowerCase(Locale.ROOT))));
472
473 private final HttpHost targetHost;
474 private final AsyncClientEndpoint clientEndpoint;
475 private final ProxyExchangeState exchangeState;
476
477 OutgoingExchangeHandler(
478 final HttpHost targetHost,
479 final AsyncClientEndpoint clientEndpoint,
480 final ProxyExchangeState exchangeState) {
481 this.targetHost = targetHost;
482 this.clientEndpoint = clientEndpoint;
483 this.exchangeState = exchangeState;
484 }
485
486 @Override
487 public void produceRequest(
488 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
489 synchronized (exchangeState) {
490 final HttpRequest incomingRequest = exchangeState.request;
491 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
492 final HttpRequest outgoingRequest = new BasicHttpRequest(
493 incomingRequest.getMethod(),
494 targetHost,
495 incomingRequest.getPath());
496 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
497 final Header header = it.next();
498 if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
499 outgoingRequest.addHeader(header);
500 }
501 }
502
503 println("[proxy->origin] " + exchangeState.id + " " +
504 outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
505
506 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
507 }
508 }
509
510 @Override
511 public int available() {
512 synchronized (exchangeState) {
513 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
514 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
515 return available;
516 }
517 }
518
519 @Override
520 public void produce(final DataStreamChannel channel) throws IOException {
521 synchronized (exchangeState) {
522 println("[proxy->origin] " + exchangeState.id + " produce output");
523 exchangeState.requestDataChannel = channel;
524 if (exchangeState.inBuf != null) {
525 if (exchangeState.inBuf.hasData()) {
526 final int bytesWritten = exchangeState.inBuf.write(channel);
527 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
528 }
529 if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
530 channel.endStream();
531 println("[proxy->origin] " + exchangeState.id + " end of output");
532 }
533 if (!exchangeState.inputEnd) {
534 final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
535 if (capacityChannel != null) {
536 final int capacity = exchangeState.inBuf.capacity();
537 if (capacity > 0) {
538 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
539 capacityChannel.update(capacity);
540 }
541 }
542 }
543 }
544 }
545 }
546
547 @Override
548 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
549
550 }
551
552 @Override
553 public void consumeResponse(
554 final HttpResponse incomingResponse,
555 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
556 synchronized (exchangeState) {
557 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
558 if (entityDetails == null) {
559 println("[proxy<-origin] " + exchangeState.id + " end of input");
560 }
561
562 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
563 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
564 final Header header = it.next();
565 if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
566 outgoingResponse.addHeader(header);
567 }
568 }
569
570 exchangeState.response = outgoingResponse;
571 exchangeState.responseEntityDetails = entityDetails;
572 exchangeState.outputEnd = entityDetails == null;
573
574 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
575 if (responseChannel != null) {
576
577 responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
578 }
579
580 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
581 if (entityDetails == null) {
582 println("[client<-proxy] " + exchangeState.id + " end of output");
583 clientEndpoint.releaseAndReuse();
584 }
585 }
586 }
587
588 @Override
589 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
590 synchronized (exchangeState) {
591 exchangeState.responseCapacityChannel = capacityChannel;
592 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
593 if (capacity > 0) {
594 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
595 capacityChannel.update(capacity);
596 }
597 }
598 }
599
600 @Override
601 public void consume(final ByteBuffer src) throws IOException {
602 synchronized (exchangeState) {
603 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
604 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
605 if (dataChannel != null && exchangeState.outBuf != null) {
606 if (exchangeState.outBuf.hasData()) {
607 final int bytesWritten = exchangeState.outBuf.write(dataChannel);
608 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
609 }
610 if (!exchangeState.outBuf.hasData()) {
611 final int bytesWritten = dataChannel.write(src);
612 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
613 }
614 }
615 if (src.hasRemaining()) {
616 if (exchangeState.outBuf == null) {
617 exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
618 }
619 exchangeState.outBuf.put(src);
620 }
621 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
622 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
623 if (dataChannel != null) {
624 dataChannel.requestOutput();
625 }
626 }
627 }
628
629 @Override
630 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
631 synchronized (exchangeState) {
632 println("[proxy<-origin] " + exchangeState.id + " end of input");
633 exchangeState.outputEnd = true;
634 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
635 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
636 println("[client<-proxy] " + exchangeState.id + " end of output");
637 dataChannel.endStream();
638 clientEndpoint.releaseAndReuse();
639 }
640 }
641 }
642
643 @Override
644 public void cancel() {
645 clientEndpoint.releaseAndDiscard();
646 }
647
648 @Override
649 public void failed(final Exception cause) {
650 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
651 if (!(cause instanceof ConnectionClosedException)) {
652 cause.printStackTrace(System.out);
653 }
654 synchronized (exchangeState) {
655 if (exchangeState.response == null) {
656 final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
657 final HttpResponse outgoingResponse = new BasicHttpResponse(status);
658 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
659 exchangeState.response = outgoingResponse;
660
661 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
662 final int contentLen = msg.remaining();
663 exchangeState.outBuf = new ProxyBuffer(1024);
664 exchangeState.outBuf.put(msg);
665 exchangeState.outputEnd = true;
666
667 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
668
669 try {
670 final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
671 exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
672 } catch (final HttpException | IOException ignore) {
673
674 }
675 } else {
676 exchangeState.outputEnd = true;
677 }
678 clientEndpoint.releaseAndDiscard();
679 }
680 }
681
682 @Override
683 public void releaseResources() {
684 synchronized (exchangeState) {
685 exchangeState.requestDataChannel = null;
686 exchangeState.responseCapacityChannel = null;
687 clientEndpoint.releaseAndDiscard();
688 }
689 }
690
691 }
692
693 static final void println(final String msg) {
694 if (!quiet) {
695 System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
696 }
697 }
698 }