View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.examples;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.CharBuffer;
34  import java.nio.charset.StandardCharsets;
35  import java.util.Arrays;
36  import java.util.Collections;
37  import java.util.HashSet;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Locale;
41  import java.util.Set;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.function.Supplier;
47  import org.apache.hc.core5.http.ConnectionClosedException;
48  import org.apache.hc.core5.http.ContentType;
49  import org.apache.hc.core5.http.EntityDetails;
50  import org.apache.hc.core5.http.Header;
51  import org.apache.hc.core5.http.HeaderElements;
52  import org.apache.hc.core5.http.HttpConnection;
53  import org.apache.hc.core5.http.HttpException;
54  import org.apache.hc.core5.http.HttpHeaders;
55  import org.apache.hc.core5.http.HttpHost;
56  import org.apache.hc.core5.http.HttpRequest;
57  import org.apache.hc.core5.http.HttpResponse;
58  import org.apache.hc.core5.http.HttpStatus;
59  import org.apache.hc.core5.http.URIScheme;
60  import org.apache.hc.core5.http.impl.BasicEntityDetails;
61  import org.apache.hc.core5.http.impl.Http1StreamListener;
62  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
63  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
64  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
65  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
66  import org.apache.hc.core5.http.impl.nio.BufferedData;
67  import org.apache.hc.core5.http.message.BasicHttpRequest;
68  import org.apache.hc.core5.http.message.BasicHttpResponse;
69  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
70  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
71  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
72  import org.apache.hc.core5.http.nio.CapacityChannel;
73  import org.apache.hc.core5.http.nio.DataStreamChannel;
74  import org.apache.hc.core5.http.nio.RequestChannel;
75  import org.apache.hc.core5.http.nio.ResponseChannel;
76  import org.apache.hc.core5.http.protocol.HttpContext;
77  import org.apache.hc.core5.http.protocol.HttpCoreContext;
78  import org.apache.hc.core5.http.protocol.HttpDateGenerator;
79  import org.apache.hc.core5.io.CloseMode;
80  import org.apache.hc.core5.pool.ConnPoolListener;
81  import org.apache.hc.core5.pool.ConnPoolStats;
82  import org.apache.hc.core5.pool.PoolStats;
83  import org.apache.hc.core5.reactor.IOReactorConfig;
84  import org.apache.hc.core5.util.TimeValue;
85  import org.apache.hc.core5.util.Timeout;
86  
87  /**
88   * Example of asynchronous embedded  HTTP/1.1 reverse proxy with full content streaming.
89   */
90  public class AsyncReverseProxyExample {
91  
92      private static boolean quiet;
93  
94      public static void main(final String[] args) throws Exception {
95          if (args.length < 1) {
96              System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
97              System.exit(1);
98          }
99          // Target host
100         final HttpHost targetHost = HttpHost.create(args[0]);
101         int port = 8080;
102         if (args.length > 1) {
103             port = Integer.parseInt(args[1]);
104         }
105         for (final String s : args) {
106             if ("--quiet".equalsIgnoreCase(s)) {
107                 quiet = true;
108                 break;
109             }
110         }
111 
112         println("Reverse proxy to " + targetHost);
113 
114         final IOReactorConfig config = IOReactorConfig.custom()
115             .setSoTimeout(1, TimeUnit.MINUTES)
116             .build();
117 
118         final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
119                 .setIOReactorConfig(config)
120                 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
121 
122                     @Override
123                     public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
124                         final StringBuilder buf = new StringBuilder();
125                         buf.append("[proxy->origin] connection leased ").append(route);
126                         println(buf.toString());
127                     }
128 
129                     @Override
130                     public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
131                         final StringBuilder buf = new StringBuilder();
132                         buf.append("[proxy->origin] connection released ").append(route);
133                         final PoolStats totals = connPoolStats.getTotalStats();
134                         buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
135                         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
136                         buf.append(" of ").append(totals.getMax());
137                         println(buf.toString());
138                     }
139 
140                 })
141                 .setStreamListener(new Http1StreamListener() {
142 
143                     @Override
144                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
145                         // empty
146                     }
147 
148                     @Override
149                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
150                         // empty
151                     }
152 
153                     @Override
154                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
155                         println("[proxy<-origin] connection " +
156                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
157                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
158                     }
159 
160                 })
161                 .setMaxTotal(100)
162                 .setDefaultMaxPerRoute(20)
163                 .create();
164 
165         final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
166                 .setIOReactorConfig(config)
167                 .setStreamListener(new Http1StreamListener() {
168 
169                     @Override
170                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
171                         // empty
172                     }
173 
174                     @Override
175                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
176                         // empty
177                     }
178 
179                     @Override
180                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
181                         println("[client<-proxy] connection " +
182                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
183                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
184                     }
185 
186                 })
187                 .register("*", new Supplier<AsyncServerExchangeHandler>() {
188 
189                     @Override
190                     public AsyncServerExchangeHandler get() {
191                         return new IncomingExchangeHandler(targetHost, requester);
192                     }
193 
194                 })
195                 .create();
196 
197         Runtime.getRuntime().addShutdownHook(new Thread() {
198             @Override
199             public void run() {
200                 println("Reverse proxy shutting down");
201                 server.close(CloseMode.GRACEFUL);
202                 requester.close(CloseMode.GRACEFUL);
203             }
204         });
205 
206         requester.start();
207         server.start();
208         server.listen(new InetSocketAddress(port), URIScheme.HTTP);
209         println("Listening on port " + port);
210 
211         server.awaitShutdown(TimeValue.MAX_VALUE);
212     }
213 
214     private static class ProxyBuffer extends BufferedData {
215 
216         ProxyBuffer(final int bufferSize) {
217             super(bufferSize);
218         }
219 
220         int write(final DataStreamChannel channel) throws IOException {
221             setOutputMode();
222             if (buffer().hasRemaining()) {
223                 return channel.write(buffer());
224             }
225             return 0;
226         }
227 
228     }
229 
230     private static final AtomicLong COUNT = new AtomicLong(0);
231 
232     private static class ProxyExchangeState {
233 
234         final String id;
235 
236         HttpRequest request;
237         EntityDetails requestEntityDetails;
238         DataStreamChannel requestDataChannel;
239         CapacityChannel requestCapacityChannel;
240         ProxyBuffer inBuf;
241         boolean inputEnd;
242 
243         HttpResponse response;
244         EntityDetails responseEntityDetails;
245         ResponseChannel responseMessageChannel;
246         DataStreamChannel responseDataChannel;
247         CapacityChannel responseCapacityChannel;
248         ProxyBuffer outBuf;
249         boolean outputEnd;
250 
251         AsyncClientEndpoint clientEndpoint;
252 
253         ProxyExchangeState() {
254             this.id = String.format("%010d", COUNT.getAndIncrement());
255         }
256 
257     }
258 
259     private static final int INIT_BUFFER_SIZE = 4096;
260 
261     private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
262 
263         private final HttpHost targetHost;
264         private final HttpAsyncRequester requester;
265         private final ProxyExchangeState exchangeState;
266 
267         IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
268             super();
269             this.targetHost = targetHost;
270             this.requester = requester;
271             this.exchangeState = new ProxyExchangeState();
272         }
273 
274         @Override
275         public void handleRequest(
276                 final HttpRequest incomingRequest,
277                 final EntityDetails entityDetails,
278                 final ResponseChannel responseChannel,
279                 final HttpContext httpContext) throws HttpException, IOException {
280 
281             synchronized (exchangeState) {
282                 println("[client->proxy] " + exchangeState.id + " " +
283                         incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
284                 exchangeState.request = incomingRequest;
285                 exchangeState.requestEntityDetails = entityDetails;
286                 exchangeState.inputEnd = entityDetails == null;
287                 exchangeState.responseMessageChannel = responseChannel;
288 
289                 if (entityDetails != null) {
290                     final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
291                     if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
292                         responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
293                     }
294                 }
295             }
296 
297             println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
298 
299             requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
300 
301                 @Override
302                 public void completed(final AsyncClientEndpoint clientEndpoint) {
303                     println("[proxy->origin] " + exchangeState.id + " connection leased");
304                     synchronized (exchangeState) {
305                         exchangeState.clientEndpoint = clientEndpoint;
306                     }
307                     clientEndpoint.execute(
308                             new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
309                             HttpCoreContext.create());
310                 }
311 
312                 @Override
313                 public void failed(final Exception cause) {
314                     final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
315                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
316                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
317                     final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
318                                     ContentType.TEXT_PLAIN);
319                     synchronized (exchangeState) {
320                         exchangeState.response = outgoingResponse;
321                         exchangeState.responseEntityDetails = exEntityDetails;
322                         exchangeState.outBuf = new ProxyBuffer(1024);
323                         exchangeState.outBuf.put(msg);
324                         exchangeState.outputEnd = true;
325                     }
326                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
327 
328                     try {
329                         responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
330                     } catch (final HttpException | IOException ignore) {
331                         // ignore
332                     }
333                 }
334 
335                 @Override
336                 public void cancelled() {
337                     failed(new InterruptedIOException());
338                 }
339 
340             });
341 
342         }
343 
344         @Override
345         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
346             synchronized (exchangeState) {
347                 exchangeState.requestCapacityChannel = capacityChannel;
348                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
349                 if (capacity > 0) {
350                     println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
351                     capacityChannel.update(capacity);
352                 }
353             }
354         }
355 
356         @Override
357         public void consume(final ByteBuffer src) throws IOException {
358             synchronized (exchangeState) {
359                 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
360                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
361                 if (dataChannel != null && exchangeState.inBuf != null) {
362                     if (exchangeState.inBuf.hasData()) {
363                         final int bytesWritten = exchangeState.inBuf.write(dataChannel);
364                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
365                     }
366                     if (!exchangeState.inBuf.hasData()) {
367                         final int bytesWritten = dataChannel.write(src);
368                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
369                     }
370                 }
371                 if (src.hasRemaining()) {
372                     if (exchangeState.inBuf == null) {
373                         exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
374                     }
375                     exchangeState.inBuf.put(src);
376                 }
377                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
378                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
379                 if (dataChannel != null) {
380                     dataChannel.requestOutput();
381                 }
382             }
383         }
384 
385         @Override
386         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
387             synchronized (exchangeState) {
388                 println("[client->proxy] " + exchangeState.id + " end of input");
389                 exchangeState.inputEnd = true;
390                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
391                 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
392                     println("[proxy->origin] " + exchangeState.id + " end of output");
393                     dataChannel.endStream();
394                 }
395             }
396         }
397 
398         @Override
399         public int available() {
400             synchronized (exchangeState) {
401                 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
402                 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
403                 return available;
404             }
405         }
406 
407         @Override
408         public void produce(final DataStreamChannel channel) throws IOException {
409             synchronized (exchangeState) {
410                 println("[client<-proxy] " + exchangeState.id + " produce output");
411                 exchangeState.responseDataChannel = channel;
412 
413                 if (exchangeState.outBuf != null) {
414                     if (exchangeState.outBuf.hasData()) {
415                         final int bytesWritten = exchangeState.outBuf.write(channel);
416                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
417                     }
418                     if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
419                         channel.endStream();
420                         println("[client<-proxy] " + exchangeState.id + " end of output");
421                     }
422                     if (!exchangeState.outputEnd) {
423                         final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
424                         if (capacityChannel != null) {
425                             final int capacity = exchangeState.outBuf.capacity();
426                             if (capacity > 0) {
427                                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
428                                 capacityChannel.update(capacity);
429                             }
430                         }
431                     }
432                 }
433             }
434         }
435 
436         @Override
437         public void failed(final Exception cause) {
438             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
439             if (!(cause instanceof ConnectionClosedException)) {
440                 cause.printStackTrace(System.out);
441             }
442             synchronized (exchangeState) {
443                 if (exchangeState.clientEndpoint != null) {
444                     exchangeState.clientEndpoint.releaseAndDiscard();
445                 }
446             }
447         }
448 
449         @Override
450         public void releaseResources() {
451             synchronized (exchangeState) {
452                 exchangeState.responseMessageChannel = null;
453                 exchangeState.responseDataChannel = null;
454                 exchangeState.requestCapacityChannel = null;
455             }
456         }
457 
458     }
459 
460     private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
461 
462         private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
463                         HttpHeaders.HOST.toLowerCase(Locale.ROOT),
464                         HttpHeaders.CONTENT_LENGTH.toLowerCase(Locale.ROOT),
465                         HttpHeaders.TRANSFER_ENCODING.toLowerCase(Locale.ROOT),
466                         HttpHeaders.CONNECTION.toLowerCase(Locale.ROOT),
467                         HttpHeaders.KEEP_ALIVE.toLowerCase(Locale.ROOT),
468                         HttpHeaders.PROXY_AUTHENTICATE.toLowerCase(Locale.ROOT),
469                         HttpHeaders.TE.toLowerCase(Locale.ROOT),
470                         HttpHeaders.TRAILER.toLowerCase(Locale.ROOT),
471                         HttpHeaders.UPGRADE.toLowerCase(Locale.ROOT))));
472 
473         private final HttpHost targetHost;
474         private final AsyncClientEndpoint clientEndpoint;
475         private final ProxyExchangeState exchangeState;
476 
477         OutgoingExchangeHandler(
478                 final HttpHost targetHost,
479                 final AsyncClientEndpoint clientEndpoint,
480                 final ProxyExchangeState exchangeState) {
481             this.targetHost = targetHost;
482             this.clientEndpoint = clientEndpoint;
483             this.exchangeState = exchangeState;
484         }
485 
486         @Override
487         public void produceRequest(
488                 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
489             synchronized (exchangeState) {
490                 final HttpRequest incomingRequest = exchangeState.request;
491                 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
492                 final HttpRequest outgoingRequest = new BasicHttpRequest(
493                         incomingRequest.getMethod(),
494                         targetHost,
495                         incomingRequest.getPath());
496                 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
497                     final Header header = it.next();
498                     if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
499                         outgoingRequest.addHeader(header);
500                     }
501                 }
502 
503                 println("[proxy->origin] " + exchangeState.id + " " +
504                         outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
505 
506                 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
507             }
508         }
509 
510         @Override
511         public int available() {
512             synchronized (exchangeState) {
513                 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
514                 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
515                 return available;
516             }
517         }
518 
519         @Override
520         public void produce(final DataStreamChannel channel) throws IOException {
521             synchronized (exchangeState) {
522                 println("[proxy->origin] " + exchangeState.id + " produce output");
523                 exchangeState.requestDataChannel = channel;
524                 if (exchangeState.inBuf != null) {
525                     if (exchangeState.inBuf.hasData()) {
526                         final int bytesWritten = exchangeState.inBuf.write(channel);
527                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
528                     }
529                     if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
530                         channel.endStream();
531                         println("[proxy->origin] " + exchangeState.id + " end of output");
532                     }
533                     if (!exchangeState.inputEnd) {
534                         final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
535                         if (capacityChannel != null) {
536                             final int capacity = exchangeState.inBuf.capacity();
537                             if (capacity > 0) {
538                                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
539                                 capacityChannel.update(capacity);
540                             }
541                         }
542                     }
543                 }
544             }
545         }
546 
547         @Override
548         public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
549             // ignore
550         }
551 
552         @Override
553         public void consumeResponse(
554                 final HttpResponse incomingResponse,
555                 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
556             synchronized (exchangeState) {
557                 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
558                 if (entityDetails == null) {
559                     println("[proxy<-origin] " + exchangeState.id + " end of input");
560                 }
561 
562                 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
563                 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
564                     final Header header = it.next();
565                     if (!HOP_BY_HOP.contains(header.getName().toLowerCase(Locale.ROOT))) {
566                         outgoingResponse.addHeader(header);
567                     }
568                 }
569 
570                 exchangeState.response = outgoingResponse;
571                 exchangeState.responseEntityDetails = entityDetails;
572                 exchangeState.outputEnd = entityDetails == null;
573 
574                 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
575                 if (responseChannel != null) {
576                     // responseChannel can be null under load.
577                     responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
578                 }
579 
580                 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
581                 if (entityDetails == null) {
582                     println("[client<-proxy] " + exchangeState.id + " end of output");
583                     clientEndpoint.releaseAndReuse();
584                 }
585             }
586         }
587 
588         @Override
589         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
590             synchronized (exchangeState) {
591                 exchangeState.responseCapacityChannel = capacityChannel;
592                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
593                 if (capacity > 0) {
594                     println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
595                     capacityChannel.update(capacity);
596                 }
597             }
598         }
599 
600         @Override
601         public void consume(final ByteBuffer src) throws IOException {
602             synchronized (exchangeState) {
603                 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
604                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
605                 if (dataChannel != null && exchangeState.outBuf != null) {
606                     if (exchangeState.outBuf.hasData()) {
607                         final int bytesWritten = exchangeState.outBuf.write(dataChannel);
608                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
609                     }
610                     if (!exchangeState.outBuf.hasData()) {
611                         final int bytesWritten = dataChannel.write(src);
612                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
613                     }
614                 }
615                 if (src.hasRemaining()) {
616                     if (exchangeState.outBuf == null) {
617                         exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
618                     }
619                     exchangeState.outBuf.put(src);
620                 }
621                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
622                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
623                 if (dataChannel != null) {
624                     dataChannel.requestOutput();
625                 }
626             }
627         }
628 
629         @Override
630         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
631             synchronized (exchangeState) {
632                 println("[proxy<-origin] " + exchangeState.id + " end of input");
633                 exchangeState.outputEnd = true;
634                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
635                 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
636                     println("[client<-proxy] " + exchangeState.id + " end of output");
637                     dataChannel.endStream();
638                     clientEndpoint.releaseAndReuse();
639                 }
640             }
641         }
642 
643         @Override
644         public void cancel() {
645             clientEndpoint.releaseAndDiscard();
646         }
647 
648         @Override
649         public void failed(final Exception cause) {
650             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
651             if (!(cause instanceof ConnectionClosedException)) {
652                 cause.printStackTrace(System.out);
653             }
654             synchronized (exchangeState) {
655                 if (exchangeState.response == null) {
656                     final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
657                     final HttpResponse outgoingResponse = new BasicHttpResponse(status);
658                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
659                     exchangeState.response = outgoingResponse;
660 
661                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
662                     final int contentLen = msg.remaining();
663                     exchangeState.outBuf = new ProxyBuffer(1024);
664                     exchangeState.outBuf.put(msg);
665                     exchangeState.outputEnd = true;
666 
667                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
668 
669                     try {
670                         final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
671                         exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
672                     } catch (final HttpException | IOException ignore) {
673                         // ignore
674                     }
675                 } else {
676                     exchangeState.outputEnd = true;
677                 }
678                 clientEndpoint.releaseAndDiscard();
679             }
680         }
681 
682         @Override
683         public void releaseResources() {
684             synchronized (exchangeState) {
685                 exchangeState.requestDataChannel = null;
686                 exchangeState.responseCapacityChannel = null;
687                 clientEndpoint.releaseAndDiscard();
688             }
689         }
690 
691     }
692 
693     static final void println(final String msg) {
694         if (!quiet) {
695             System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
696         }
697     }
698 }