1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.core5.http.examples;
28
29 import java.io.IOException;
30 import java.io.InterruptedIOException;
31 import java.net.InetSocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.CharBuffer;
34 import java.nio.charset.StandardCharsets;
35 import java.util.Arrays;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.Iterator;
39 import java.util.List;
40 import java.util.Set;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.atomic.AtomicLong;
43 import java.util.concurrent.locks.ReentrantLock;
44
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.http.ConnectionClosedException;
47 import org.apache.hc.core5.http.ContentType;
48 import org.apache.hc.core5.http.EntityDetails;
49 import org.apache.hc.core5.http.Header;
50 import org.apache.hc.core5.http.HeaderElements;
51 import org.apache.hc.core5.http.HttpConnection;
52 import org.apache.hc.core5.http.HttpException;
53 import org.apache.hc.core5.http.HttpHeaders;
54 import org.apache.hc.core5.http.HttpHost;
55 import org.apache.hc.core5.http.HttpRequest;
56 import org.apache.hc.core5.http.HttpResponse;
57 import org.apache.hc.core5.http.HttpStatus;
58 import org.apache.hc.core5.http.URIScheme;
59 import org.apache.hc.core5.http.impl.BasicEntityDetails;
60 import org.apache.hc.core5.http.impl.Http1StreamListener;
61 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
62 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
63 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
64 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
65 import org.apache.hc.core5.http.impl.nio.BufferedData;
66 import org.apache.hc.core5.http.message.BasicHttpRequest;
67 import org.apache.hc.core5.http.message.BasicHttpResponse;
68 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
69 import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
70 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
71 import org.apache.hc.core5.http.nio.CapacityChannel;
72 import org.apache.hc.core5.http.nio.DataStreamChannel;
73 import org.apache.hc.core5.http.nio.RequestChannel;
74 import org.apache.hc.core5.http.nio.ResponseChannel;
75 import org.apache.hc.core5.http.protocol.HttpContext;
76 import org.apache.hc.core5.http.protocol.HttpCoreContext;
77 import org.apache.hc.core5.http.protocol.HttpDateGenerator;
78 import org.apache.hc.core5.io.CloseMode;
79 import org.apache.hc.core5.pool.ConnPoolListener;
80 import org.apache.hc.core5.pool.ConnPoolStats;
81 import org.apache.hc.core5.pool.PoolStats;
82 import org.apache.hc.core5.reactor.IOReactorConfig;
83 import org.apache.hc.core5.util.TextUtils;
84 import org.apache.hc.core5.util.TimeValue;
85 import org.apache.hc.core5.util.Timeout;
86
87
88
89
90 public class AsyncReverseProxyExample {
91
92 private static boolean quiet;
93
94 public static void main(final String[] args) throws Exception {
95 if (args.length < 1) {
96 System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
97 System.exit(1);
98 }
99
100 final HttpHost targetHost = HttpHost.create(args[0]);
101 int port = 8080;
102 if (args.length > 1) {
103 port = Integer.parseInt(args[1]);
104 }
105 for (final String s : args) {
106 if ("--quiet".equalsIgnoreCase(s)) {
107 quiet = true;
108 break;
109 }
110 }
111
112 println("Reverse proxy to " + targetHost);
113
114 final IOReactorConfig config = IOReactorConfig.custom()
115 .setSoTimeout(1, TimeUnit.MINUTES)
116 .build();
117
118 final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
119 .setIOReactorConfig(config)
120 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
121
122 @Override
123 public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
124 final StringBuilder buf = new StringBuilder();
125 buf.append("[proxy->origin] connection leased ").append(route);
126 println(buf.toString());
127 }
128
129 @Override
130 public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
131 final StringBuilder buf = new StringBuilder();
132 buf.append("[proxy->origin] connection released ").append(route);
133 final PoolStats totals = connPoolStats.getTotalStats();
134 buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
135 buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
136 buf.append(" of ").append(totals.getMax());
137 println(buf.toString());
138 }
139
140 })
141 .setStreamListener(new Http1StreamListener() {
142
143 @Override
144 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
145
146 }
147
148 @Override
149 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
150
151 }
152
153 @Override
154 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
155 println("[proxy<-origin] connection " +
156 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
157 (keepAlive ? " kept alive" : " cannot be kept alive"));
158 }
159
160 })
161 .setMaxTotal(100)
162 .setDefaultMaxPerRoute(20)
163 .create();
164
165 final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
166 .setExceptionCallback(e -> e.printStackTrace())
167 .setIOReactorConfig(config)
168 .setStreamListener(new Http1StreamListener() {
169
170 @Override
171 public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
172
173 }
174
175 @Override
176 public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
177
178 }
179
180 @Override
181 public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
182 println("[client<-proxy] connection " +
183 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
184 (keepAlive ? " kept alive" : " cannot be kept alive"));
185 }
186
187 })
188 .register("*", () -> new IncomingExchangeHandler(targetHost, requester))
189 .create();
190
191 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
192 println("Reverse proxy shutting down");
193 server.close(CloseMode.GRACEFUL);
194 requester.close(CloseMode.GRACEFUL);
195 }));
196
197 requester.start();
198 server.start();
199 server.listen(new InetSocketAddress(port), URIScheme.HTTP);
200 println("Listening on port " + port);
201
202 server.awaitShutdown(TimeValue.MAX_VALUE);
203 }
204
205 private static class ProxyBuffer extends BufferedData {
206
207 ProxyBuffer(final int bufferSize) {
208 super(bufferSize);
209 }
210
211 int write(final DataStreamChannel channel) throws IOException {
212 setOutputMode();
213 if (buffer().hasRemaining()) {
214 return channel.write(buffer());
215 }
216 return 0;
217 }
218
219 }
220
221 private static final AtomicLong COUNT = new AtomicLong(0);
222
223 private static class ProxyExchangeState {
224
225 final String id;
226
227 HttpRequest request;
228 EntityDetails requestEntityDetails;
229 DataStreamChannel requestDataChannel;
230 CapacityChannel requestCapacityChannel;
231 ProxyBuffer inBuf;
232 boolean inputEnd;
233
234 HttpResponse response;
235 EntityDetails responseEntityDetails;
236 ResponseChannel responseMessageChannel;
237 DataStreamChannel responseDataChannel;
238 CapacityChannel responseCapacityChannel;
239 ProxyBuffer outBuf;
240 boolean outputEnd;
241
242 AsyncClientEndpoint clientEndpoint;
243
244 ProxyExchangeState() {
245 this.id = String.format("%010d", COUNT.getAndIncrement());
246 }
247
248 }
249
250 private static final int INIT_BUFFER_SIZE = 4096;
251
252 private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
253
254 private final HttpHost targetHost;
255 private final HttpAsyncRequester requester;
256 private final ProxyExchangeState exchangeState;
257 private final ReentrantLock lock;
258
259 IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
260 super();
261 this.targetHost = targetHost;
262 this.requester = requester;
263 this.exchangeState = new ProxyExchangeState();
264 this.lock = new ReentrantLock();
265 }
266
267 @Override
268 public void handleRequest(
269 final HttpRequest incomingRequest,
270 final EntityDetails entityDetails,
271 final ResponseChannel responseChannel,
272 final HttpContext httpContext) throws HttpException, IOException {
273
274 lock.lock();
275 try {
276 println("[client->proxy] " + exchangeState.id + " " +
277 incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
278 exchangeState.request = incomingRequest;
279 exchangeState.requestEntityDetails = entityDetails;
280 exchangeState.inputEnd = entityDetails == null;
281 exchangeState.responseMessageChannel = responseChannel;
282
283 if (entityDetails != null) {
284 final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
285 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
286 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
287 }
288 }
289 } finally {
290 lock.unlock();
291 }
292
293 println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
294
295 requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
296
297 @Override
298 public void completed(final AsyncClientEndpoint clientEndpoint) {
299 println("[proxy->origin] " + exchangeState.id + " connection leased");
300 lock.lock();
301 try {
302 exchangeState.clientEndpoint = clientEndpoint;
303 } finally {
304 lock.unlock();
305 }
306 clientEndpoint.execute(
307 new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
308 HttpCoreContext.create());
309 }
310
311 @Override
312 public void failed(final Exception cause) {
313 final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
314 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
315 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
316 final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
317 ContentType.TEXT_PLAIN);
318 lock.lock();
319 try {
320 exchangeState.response = outgoingResponse;
321 exchangeState.responseEntityDetails = exEntityDetails;
322 exchangeState.outBuf = new ProxyBuffer(1024);
323 exchangeState.outBuf.put(msg);
324 exchangeState.outputEnd = true;
325 } finally {
326 lock.unlock();
327 }
328 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
329
330 try {
331 responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
332 } catch (final HttpException | IOException ignore) {
333
334 }
335 }
336
337 @Override
338 public void cancelled() {
339 failed(new InterruptedIOException());
340 }
341
342 });
343
344 }
345
346 @Override
347 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
348 lock.lock();
349 try {
350 exchangeState.requestCapacityChannel = capacityChannel;
351 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
352 if (capacity > 0) {
353 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
354 capacityChannel.update(capacity);
355 }
356 } finally {
357 lock.unlock();
358 }
359 }
360
361 @Override
362 public void consume(final ByteBuffer src) throws IOException {
363 lock.lock();
364 try {
365 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
366 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
367 if (dataChannel != null && exchangeState.inBuf != null) {
368 if (exchangeState.inBuf.hasData()) {
369 final int bytesWritten = exchangeState.inBuf.write(dataChannel);
370 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
371 }
372 if (!exchangeState.inBuf.hasData()) {
373 final int bytesWritten = dataChannel.write(src);
374 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
375 }
376 }
377 if (src.hasRemaining()) {
378 if (exchangeState.inBuf == null) {
379 exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
380 }
381 exchangeState.inBuf.put(src);
382 }
383 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
384 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
385 if (dataChannel != null) {
386 dataChannel.requestOutput();
387 }
388 } finally {
389 lock.unlock();
390 }
391 }
392
393 @Override
394 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
395 lock.lock();
396 try {
397 println("[client->proxy] " + exchangeState.id + " end of input");
398 exchangeState.inputEnd = true;
399 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
400 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
401 println("[proxy->origin] " + exchangeState.id + " end of output");
402 dataChannel.endStream();
403 }
404 } finally {
405 lock.unlock();
406 }
407 }
408
409 @Override
410 public int available() {
411 lock.lock();
412 try {
413 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
414 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
415 return available;
416 } finally {
417 lock.unlock();
418 }
419 }
420
421 @Override
422 public void produce(final DataStreamChannel channel) throws IOException {
423 lock.lock();
424 try {
425 println("[client<-proxy] " + exchangeState.id + " produce output");
426 exchangeState.responseDataChannel = channel;
427
428 if (exchangeState.outBuf != null) {
429 if (exchangeState.outBuf.hasData()) {
430 final int bytesWritten = exchangeState.outBuf.write(channel);
431 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
432 }
433 if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
434 channel.endStream();
435 println("[client<-proxy] " + exchangeState.id + " end of output");
436 }
437 if (!exchangeState.outputEnd) {
438 final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
439 if (capacityChannel != null) {
440 final int capacity = exchangeState.outBuf.capacity();
441 if (capacity > 0) {
442 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
443 capacityChannel.update(capacity);
444 }
445 }
446 }
447 }
448 } finally {
449 lock.unlock();
450 }
451 }
452
453 @Override
454 public void failed(final Exception cause) {
455 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
456 if (!(cause instanceof ConnectionClosedException)) {
457 cause.printStackTrace(System.out);
458 }
459 lock.lock();
460 try {
461 if (exchangeState.clientEndpoint != null) {
462 exchangeState.clientEndpoint.releaseAndDiscard();
463 }
464 } finally {
465 lock.unlock();
466 }
467 }
468
469 @Override
470 public void releaseResources() {
471 lock.lock();
472 try {
473 exchangeState.responseMessageChannel = null;
474 exchangeState.responseDataChannel = null;
475 exchangeState.requestCapacityChannel = null;
476 } finally {
477 lock.unlock();
478 }
479 }
480
481 }
482
483 private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
484
485 private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
486 TextUtils.toLowerCase(HttpHeaders.HOST),
487 TextUtils.toLowerCase(HttpHeaders.CONTENT_LENGTH),
488 TextUtils.toLowerCase(HttpHeaders.TRANSFER_ENCODING),
489 TextUtils.toLowerCase(HttpHeaders.CONNECTION),
490 TextUtils.toLowerCase(HttpHeaders.KEEP_ALIVE),
491 TextUtils.toLowerCase(HttpHeaders.PROXY_AUTHENTICATE),
492 TextUtils.toLowerCase(HttpHeaders.TE),
493 TextUtils.toLowerCase(HttpHeaders.TRAILER),
494 TextUtils.toLowerCase(HttpHeaders.UPGRADE))));
495
496 private final HttpHost targetHost;
497 private final AsyncClientEndpoint clientEndpoint;
498 private final ProxyExchangeState exchangeState;
499 private final ReentrantLock lock;
500
501 OutgoingExchangeHandler(
502 final HttpHost targetHost,
503 final AsyncClientEndpoint clientEndpoint,
504 final ProxyExchangeState exchangeState) {
505 this.targetHost = targetHost;
506 this.clientEndpoint = clientEndpoint;
507 this.exchangeState = exchangeState;
508 this.lock = new ReentrantLock();
509 }
510
511 @Override
512 public void produceRequest(
513 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
514 lock.lock();
515 try {
516 final HttpRequest incomingRequest = exchangeState.request;
517 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
518 final HttpRequest outgoingRequest = new BasicHttpRequest(
519 incomingRequest.getMethod(),
520 targetHost,
521 incomingRequest.getPath());
522 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
523 final Header header = it.next();
524 if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
525 outgoingRequest.addHeader(header);
526 }
527 }
528
529 println("[proxy->origin] " + exchangeState.id + " " +
530 outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
531
532 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
533 } finally {
534 lock.unlock();
535 }
536 }
537
538 @Override
539 public int available() {
540 lock.lock();
541 try {
542 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
543 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
544 return available;
545 } finally {
546 lock.unlock();
547 }
548 }
549
550 @Override
551 public void produce(final DataStreamChannel channel) throws IOException {
552 lock.lock();
553 try {
554 println("[proxy->origin] " + exchangeState.id + " produce output");
555 exchangeState.requestDataChannel = channel;
556 if (exchangeState.inBuf != null) {
557 if (exchangeState.inBuf.hasData()) {
558 final int bytesWritten = exchangeState.inBuf.write(channel);
559 println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
560 }
561 if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
562 channel.endStream();
563 println("[proxy->origin] " + exchangeState.id + " end of output");
564 }
565 if (!exchangeState.inputEnd) {
566 final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
567 if (capacityChannel != null) {
568 final int capacity = exchangeState.inBuf.capacity();
569 if (capacity > 0) {
570 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
571 capacityChannel.update(capacity);
572 }
573 }
574 }
575 }
576 } finally {
577 lock.unlock();
578 }
579 }
580
581 @Override
582 public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
583
584 }
585
586 @Override
587 public void consumeResponse(
588 final HttpResponse incomingResponse,
589 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
590 lock.lock();
591 try {
592 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
593 if (entityDetails == null) {
594 println("[proxy<-origin] " + exchangeState.id + " end of input");
595 }
596
597 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
598 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
599 final Header header = it.next();
600 if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
601 outgoingResponse.addHeader(header);
602 }
603 }
604
605 exchangeState.response = outgoingResponse;
606 exchangeState.responseEntityDetails = entityDetails;
607 exchangeState.outputEnd = entityDetails == null;
608
609 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
610 if (responseChannel != null) {
611
612 responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
613 }
614
615 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
616 if (entityDetails == null) {
617 println("[client<-proxy] " + exchangeState.id + " end of output");
618 clientEndpoint.releaseAndReuse();
619 }
620 } finally {
621 lock.unlock();
622 }
623 }
624
625 @Override
626 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
627 lock.lock();
628 try {
629 exchangeState.responseCapacityChannel = capacityChannel;
630 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
631 if (capacity > 0) {
632 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
633 capacityChannel.update(capacity);
634 }
635 } finally {
636 lock.unlock();
637 }
638 }
639
640 @Override
641 public void consume(final ByteBuffer src) throws IOException {
642 lock.lock();
643 try {
644 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
645 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
646 if (dataChannel != null && exchangeState.outBuf != null) {
647 if (exchangeState.outBuf.hasData()) {
648 final int bytesWritten = exchangeState.outBuf.write(dataChannel);
649 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
650 }
651 if (!exchangeState.outBuf.hasData()) {
652 final int bytesWritten = dataChannel.write(src);
653 println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
654 }
655 }
656 if (src.hasRemaining()) {
657 if (exchangeState.outBuf == null) {
658 exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
659 }
660 exchangeState.outBuf.put(src);
661 }
662 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
663 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
664 if (dataChannel != null) {
665 dataChannel.requestOutput();
666 }
667 } finally {
668 lock.unlock();
669 }
670 }
671
672 @Override
673 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
674 lock.lock();
675 try {
676 println("[proxy<-origin] " + exchangeState.id + " end of input");
677 exchangeState.outputEnd = true;
678 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
679 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
680 println("[client<-proxy] " + exchangeState.id + " end of output");
681 dataChannel.endStream();
682 clientEndpoint.releaseAndReuse();
683 }
684 } finally {
685 lock.unlock();
686 }
687 }
688
689 @Override
690 public void cancel() {
691 clientEndpoint.releaseAndDiscard();
692 }
693
694 @Override
695 public void failed(final Exception cause) {
696 println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
697 if (!(cause instanceof ConnectionClosedException)) {
698 cause.printStackTrace(System.out);
699 }
700 lock.lock();
701 try {
702 if (exchangeState.response == null) {
703 final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
704 final HttpResponse outgoingResponse = new BasicHttpResponse(status);
705 outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
706 exchangeState.response = outgoingResponse;
707
708 final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
709 final int contentLen = msg.remaining();
710 exchangeState.outBuf = new ProxyBuffer(1024);
711 exchangeState.outBuf.put(msg);
712 exchangeState.outputEnd = true;
713
714 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
715
716 try {
717 final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
718 exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
719 } catch (final HttpException | IOException ignore) {
720
721 }
722 } else {
723 exchangeState.outputEnd = true;
724 }
725 clientEndpoint.releaseAndDiscard();
726 } finally {
727 lock.unlock();
728 }
729 }
730
731 @Override
732 public void releaseResources() {
733 lock.lock();
734 try {
735 exchangeState.requestDataChannel = null;
736 exchangeState.responseCapacityChannel = null;
737 clientEndpoint.releaseAndDiscard();
738 } finally {
739 lock.unlock();
740 }
741 }
742
743 }
744
745 static void println(final String msg) {
746 if (!quiet) {
747 System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
748 }
749 }
750 }