View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.examples;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.CharBuffer;
34  import java.nio.charset.StandardCharsets;
35  import java.util.Arrays;
36  import java.util.Collections;
37  import java.util.HashSet;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Set;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import org.apache.hc.core5.concurrent.FutureCallback;
45  import org.apache.hc.core5.http.ConnectionClosedException;
46  import org.apache.hc.core5.http.ContentType;
47  import org.apache.hc.core5.http.EntityDetails;
48  import org.apache.hc.core5.http.Header;
49  import org.apache.hc.core5.http.HeaderElements;
50  import org.apache.hc.core5.http.HttpConnection;
51  import org.apache.hc.core5.http.HttpException;
52  import org.apache.hc.core5.http.HttpHeaders;
53  import org.apache.hc.core5.http.HttpHost;
54  import org.apache.hc.core5.http.HttpRequest;
55  import org.apache.hc.core5.http.HttpResponse;
56  import org.apache.hc.core5.http.HttpStatus;
57  import org.apache.hc.core5.http.URIScheme;
58  import org.apache.hc.core5.http.impl.BasicEntityDetails;
59  import org.apache.hc.core5.http.impl.Http1StreamListener;
60  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
61  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
62  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
63  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
64  import org.apache.hc.core5.http.impl.nio.BufferedData;
65  import org.apache.hc.core5.http.message.BasicHttpRequest;
66  import org.apache.hc.core5.http.message.BasicHttpResponse;
67  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
68  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
69  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
70  import org.apache.hc.core5.http.nio.CapacityChannel;
71  import org.apache.hc.core5.http.nio.DataStreamChannel;
72  import org.apache.hc.core5.http.nio.RequestChannel;
73  import org.apache.hc.core5.http.nio.ResponseChannel;
74  import org.apache.hc.core5.http.protocol.HttpContext;
75  import org.apache.hc.core5.http.protocol.HttpCoreContext;
76  import org.apache.hc.core5.http.protocol.HttpDateGenerator;
77  import org.apache.hc.core5.io.CloseMode;
78  import org.apache.hc.core5.pool.ConnPoolListener;
79  import org.apache.hc.core5.pool.ConnPoolStats;
80  import org.apache.hc.core5.pool.PoolStats;
81  import org.apache.hc.core5.reactor.IOReactorConfig;
82  import org.apache.hc.core5.util.TextUtils;
83  import org.apache.hc.core5.util.TimeValue;
84  import org.apache.hc.core5.util.Timeout;
85  
86  /**
87   * Example of asynchronous embedded  HTTP/1.1 reverse proxy with full content streaming.
88   */
89  public class AsyncReverseProxyExample {
90  
91      private static boolean quiet;
92  
93      public static void main(final String[] args) throws Exception {
94          if (args.length < 1) {
95              System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
96              System.exit(1);
97          }
98          // Target host
99          final HttpHost targetHost = HttpHost.create(args[0]);
100         int port = 8080;
101         if (args.length > 1) {
102             port = Integer.parseInt(args[1]);
103         }
104         for (final String s : args) {
105             if ("--quiet".equalsIgnoreCase(s)) {
106                 quiet = true;
107                 break;
108             }
109         }
110 
111         println("Reverse proxy to " + targetHost);
112 
113         final IOReactorConfig config = IOReactorConfig.custom()
114             .setSoTimeout(1, TimeUnit.MINUTES)
115             .build();
116 
117         final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
118                 .setIOReactorConfig(config)
119                 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
120 
121                     @Override
122                     public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
123                         final StringBuilder buf = new StringBuilder();
124                         buf.append("[proxy->origin] connection leased ").append(route);
125                         println(buf.toString());
126                     }
127 
128                     @Override
129                     public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
130                         final StringBuilder buf = new StringBuilder();
131                         buf.append("[proxy->origin] connection released ").append(route);
132                         final PoolStats totals = connPoolStats.getTotalStats();
133                         buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
134                         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
135                         buf.append(" of ").append(totals.getMax());
136                         println(buf.toString());
137                     }
138 
139                 })
140                 .setStreamListener(new Http1StreamListener() {
141 
142                     @Override
143                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
144                         // empty
145                     }
146 
147                     @Override
148                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
149                         // empty
150                     }
151 
152                     @Override
153                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
154                         println("[proxy<-origin] connection " +
155                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
156                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
157                     }
158 
159                 })
160                 .setMaxTotal(100)
161                 .setDefaultMaxPerRoute(20)
162                 .create();
163 
164         final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
165                 .setIOReactorConfig(config)
166                 .setStreamListener(new Http1StreamListener() {
167 
168                     @Override
169                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
170                         // empty
171                     }
172 
173                     @Override
174                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
175                         // empty
176                     }
177 
178                     @Override
179                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
180                         println("[client<-proxy] connection " +
181                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
182                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
183                     }
184 
185                 })
186                 .register("*", () -> new IncomingExchangeHandler(targetHost, requester))
187                 .create();
188 
189         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
190             println("Reverse proxy shutting down");
191             server.close(CloseMode.GRACEFUL);
192             requester.close(CloseMode.GRACEFUL);
193         }));
194 
195         requester.start();
196         server.start();
197         server.listen(new InetSocketAddress(port), URIScheme.HTTP);
198         println("Listening on port " + port);
199 
200         server.awaitShutdown(TimeValue.MAX_VALUE);
201     }
202 
203     private static class ProxyBuffer extends BufferedData {
204 
205         ProxyBuffer(final int bufferSize) {
206             super(bufferSize);
207         }
208 
209         int write(final DataStreamChannel channel) throws IOException {
210             setOutputMode();
211             if (buffer().hasRemaining()) {
212                 return channel.write(buffer());
213             }
214             return 0;
215         }
216 
217     }
218 
219     private static final AtomicLong COUNT = new AtomicLong(0);
220 
221     private static class ProxyExchangeState {
222 
223         final String id;
224 
225         HttpRequest request;
226         EntityDetails requestEntityDetails;
227         DataStreamChannel requestDataChannel;
228         CapacityChannel requestCapacityChannel;
229         ProxyBuffer inBuf;
230         boolean inputEnd;
231 
232         HttpResponse response;
233         EntityDetails responseEntityDetails;
234         ResponseChannel responseMessageChannel;
235         DataStreamChannel responseDataChannel;
236         CapacityChannel responseCapacityChannel;
237         ProxyBuffer outBuf;
238         boolean outputEnd;
239 
240         AsyncClientEndpoint clientEndpoint;
241 
242         ProxyExchangeState() {
243             this.id = String.format("%010d", COUNT.getAndIncrement());
244         }
245 
246     }
247 
248     private static final int INIT_BUFFER_SIZE = 4096;
249 
250     private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
251 
252         private final HttpHost targetHost;
253         private final HttpAsyncRequester requester;
254         private final ProxyExchangeState exchangeState;
255 
256         IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
257             super();
258             this.targetHost = targetHost;
259             this.requester = requester;
260             this.exchangeState = new ProxyExchangeState();
261         }
262 
263         @Override
264         public void handleRequest(
265                 final HttpRequest incomingRequest,
266                 final EntityDetails entityDetails,
267                 final ResponseChannel responseChannel,
268                 final HttpContext httpContext) throws HttpException, IOException {
269 
270             synchronized (exchangeState) {
271                 println("[client->proxy] " + exchangeState.id + " " +
272                         incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
273                 exchangeState.request = incomingRequest;
274                 exchangeState.requestEntityDetails = entityDetails;
275                 exchangeState.inputEnd = entityDetails == null;
276                 exchangeState.responseMessageChannel = responseChannel;
277 
278                 if (entityDetails != null) {
279                     final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
280                     if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
281                         responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
282                     }
283                 }
284             }
285 
286             println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
287 
288             requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
289 
290                 @Override
291                 public void completed(final AsyncClientEndpoint clientEndpoint) {
292                     println("[proxy->origin] " + exchangeState.id + " connection leased");
293                     synchronized (exchangeState) {
294                         exchangeState.clientEndpoint = clientEndpoint;
295                     }
296                     clientEndpoint.execute(
297                             new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
298                             HttpCoreContext.create());
299                 }
300 
301                 @Override
302                 public void failed(final Exception cause) {
303                     final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
304                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
305                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
306                     final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
307                                     ContentType.TEXT_PLAIN);
308                     synchronized (exchangeState) {
309                         exchangeState.response = outgoingResponse;
310                         exchangeState.responseEntityDetails = exEntityDetails;
311                         exchangeState.outBuf = new ProxyBuffer(1024);
312                         exchangeState.outBuf.put(msg);
313                         exchangeState.outputEnd = true;
314                     }
315                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
316 
317                     try {
318                         responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
319                     } catch (final HttpException | IOException ignore) {
320                         // ignore
321                     }
322                 }
323 
324                 @Override
325                 public void cancelled() {
326                     failed(new InterruptedIOException());
327                 }
328 
329             });
330 
331         }
332 
333         @Override
334         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
335             synchronized (exchangeState) {
336                 exchangeState.requestCapacityChannel = capacityChannel;
337                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
338                 if (capacity > 0) {
339                     println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
340                     capacityChannel.update(capacity);
341                 }
342             }
343         }
344 
345         @Override
346         public void consume(final ByteBuffer src) throws IOException {
347             synchronized (exchangeState) {
348                 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
349                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
350                 if (dataChannel != null && exchangeState.inBuf != null) {
351                     if (exchangeState.inBuf.hasData()) {
352                         final int bytesWritten = exchangeState.inBuf.write(dataChannel);
353                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
354                     }
355                     if (!exchangeState.inBuf.hasData()) {
356                         final int bytesWritten = dataChannel.write(src);
357                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
358                     }
359                 }
360                 if (src.hasRemaining()) {
361                     if (exchangeState.inBuf == null) {
362                         exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
363                     }
364                     exchangeState.inBuf.put(src);
365                 }
366                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
367                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
368                 if (dataChannel != null) {
369                     dataChannel.requestOutput();
370                 }
371             }
372         }
373 
374         @Override
375         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
376             synchronized (exchangeState) {
377                 println("[client->proxy] " + exchangeState.id + " end of input");
378                 exchangeState.inputEnd = true;
379                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
380                 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
381                     println("[proxy->origin] " + exchangeState.id + " end of output");
382                     dataChannel.endStream();
383                 }
384             }
385         }
386 
387         @Override
388         public int available() {
389             synchronized (exchangeState) {
390                 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
391                 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
392                 return available;
393             }
394         }
395 
396         @Override
397         public void produce(final DataStreamChannel channel) throws IOException {
398             synchronized (exchangeState) {
399                 println("[client<-proxy] " + exchangeState.id + " produce output");
400                 exchangeState.responseDataChannel = channel;
401 
402                 if (exchangeState.outBuf != null) {
403                     if (exchangeState.outBuf.hasData()) {
404                         final int bytesWritten = exchangeState.outBuf.write(channel);
405                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
406                     }
407                     if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
408                         channel.endStream();
409                         println("[client<-proxy] " + exchangeState.id + " end of output");
410                     }
411                     if (!exchangeState.outputEnd) {
412                         final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
413                         if (capacityChannel != null) {
414                             final int capacity = exchangeState.outBuf.capacity();
415                             if (capacity > 0) {
416                                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
417                                 capacityChannel.update(capacity);
418                             }
419                         }
420                     }
421                 }
422             }
423         }
424 
425         @Override
426         public void failed(final Exception cause) {
427             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
428             if (!(cause instanceof ConnectionClosedException)) {
429                 cause.printStackTrace(System.out);
430             }
431             synchronized (exchangeState) {
432                 if (exchangeState.clientEndpoint != null) {
433                     exchangeState.clientEndpoint.releaseAndDiscard();
434                 }
435             }
436         }
437 
438         @Override
439         public void releaseResources() {
440             synchronized (exchangeState) {
441                 exchangeState.responseMessageChannel = null;
442                 exchangeState.responseDataChannel = null;
443                 exchangeState.requestCapacityChannel = null;
444             }
445         }
446 
447     }
448 
449     private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
450 
451         private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
452                 TextUtils.toLowerCase(HttpHeaders.HOST),
453                 TextUtils.toLowerCase(HttpHeaders.CONTENT_LENGTH),
454                 TextUtils.toLowerCase(HttpHeaders.TRANSFER_ENCODING),
455                 TextUtils.toLowerCase(HttpHeaders.CONNECTION),
456                 TextUtils.toLowerCase(HttpHeaders.KEEP_ALIVE),
457                 TextUtils.toLowerCase(HttpHeaders.PROXY_AUTHENTICATE),
458                 TextUtils.toLowerCase(HttpHeaders.TE),
459                 TextUtils.toLowerCase(HttpHeaders.TRAILER),
460                 TextUtils.toLowerCase(HttpHeaders.UPGRADE))));
461 
462         private final HttpHost targetHost;
463         private final AsyncClientEndpoint clientEndpoint;
464         private final ProxyExchangeState exchangeState;
465 
466         OutgoingExchangeHandler(
467                 final HttpHost targetHost,
468                 final AsyncClientEndpoint clientEndpoint,
469                 final ProxyExchangeState exchangeState) {
470             this.targetHost = targetHost;
471             this.clientEndpoint = clientEndpoint;
472             this.exchangeState = exchangeState;
473         }
474 
475         @Override
476         public void produceRequest(
477                 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
478             synchronized (exchangeState) {
479                 final HttpRequest incomingRequest = exchangeState.request;
480                 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
481                 final HttpRequest outgoingRequest = new BasicHttpRequest(
482                         incomingRequest.getMethod(),
483                         targetHost,
484                         incomingRequest.getPath());
485                 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
486                     final Header header = it.next();
487                     if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
488                         outgoingRequest.addHeader(header);
489                     }
490                 }
491 
492                 println("[proxy->origin] " + exchangeState.id + " " +
493                         outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
494 
495                 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
496             }
497         }
498 
499         @Override
500         public int available() {
501             synchronized (exchangeState) {
502                 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
503                 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
504                 return available;
505             }
506         }
507 
508         @Override
509         public void produce(final DataStreamChannel channel) throws IOException {
510             synchronized (exchangeState) {
511                 println("[proxy->origin] " + exchangeState.id + " produce output");
512                 exchangeState.requestDataChannel = channel;
513                 if (exchangeState.inBuf != null) {
514                     if (exchangeState.inBuf.hasData()) {
515                         final int bytesWritten = exchangeState.inBuf.write(channel);
516                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
517                     }
518                     if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
519                         channel.endStream();
520                         println("[proxy->origin] " + exchangeState.id + " end of output");
521                     }
522                     if (!exchangeState.inputEnd) {
523                         final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
524                         if (capacityChannel != null) {
525                             final int capacity = exchangeState.inBuf.capacity();
526                             if (capacity > 0) {
527                                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
528                                 capacityChannel.update(capacity);
529                             }
530                         }
531                     }
532                 }
533             }
534         }
535 
536         @Override
537         public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
538             // ignore
539         }
540 
541         @Override
542         public void consumeResponse(
543                 final HttpResponse incomingResponse,
544                 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
545             synchronized (exchangeState) {
546                 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
547                 if (entityDetails == null) {
548                     println("[proxy<-origin] " + exchangeState.id + " end of input");
549                 }
550 
551                 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
552                 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
553                     final Header header = it.next();
554                     if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
555                         outgoingResponse.addHeader(header);
556                     }
557                 }
558 
559                 exchangeState.response = outgoingResponse;
560                 exchangeState.responseEntityDetails = entityDetails;
561                 exchangeState.outputEnd = entityDetails == null;
562 
563                 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
564                 if (responseChannel != null) {
565                     // responseChannel can be null under load.
566                     responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
567                 }
568 
569                 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
570                 if (entityDetails == null) {
571                     println("[client<-proxy] " + exchangeState.id + " end of output");
572                     clientEndpoint.releaseAndReuse();
573                 }
574             }
575         }
576 
577         @Override
578         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
579             synchronized (exchangeState) {
580                 exchangeState.responseCapacityChannel = capacityChannel;
581                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
582                 if (capacity > 0) {
583                     println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
584                     capacityChannel.update(capacity);
585                 }
586             }
587         }
588 
589         @Override
590         public void consume(final ByteBuffer src) throws IOException {
591             synchronized (exchangeState) {
592                 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
593                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
594                 if (dataChannel != null && exchangeState.outBuf != null) {
595                     if (exchangeState.outBuf.hasData()) {
596                         final int bytesWritten = exchangeState.outBuf.write(dataChannel);
597                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
598                     }
599                     if (!exchangeState.outBuf.hasData()) {
600                         final int bytesWritten = dataChannel.write(src);
601                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
602                     }
603                 }
604                 if (src.hasRemaining()) {
605                     if (exchangeState.outBuf == null) {
606                         exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
607                     }
608                     exchangeState.outBuf.put(src);
609                 }
610                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
611                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
612                 if (dataChannel != null) {
613                     dataChannel.requestOutput();
614                 }
615             }
616         }
617 
618         @Override
619         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
620             synchronized (exchangeState) {
621                 println("[proxy<-origin] " + exchangeState.id + " end of input");
622                 exchangeState.outputEnd = true;
623                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
624                 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
625                     println("[client<-proxy] " + exchangeState.id + " end of output");
626                     dataChannel.endStream();
627                     clientEndpoint.releaseAndReuse();
628                 }
629             }
630         }
631 
632         @Override
633         public void cancel() {
634             clientEndpoint.releaseAndDiscard();
635         }
636 
637         @Override
638         public void failed(final Exception cause) {
639             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
640             if (!(cause instanceof ConnectionClosedException)) {
641                 cause.printStackTrace(System.out);
642             }
643             synchronized (exchangeState) {
644                 if (exchangeState.response == null) {
645                     final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
646                     final HttpResponse outgoingResponse = new BasicHttpResponse(status);
647                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
648                     exchangeState.response = outgoingResponse;
649 
650                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
651                     final int contentLen = msg.remaining();
652                     exchangeState.outBuf = new ProxyBuffer(1024);
653                     exchangeState.outBuf.put(msg);
654                     exchangeState.outputEnd = true;
655 
656                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
657 
658                     try {
659                         final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
660                         exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
661                     } catch (final HttpException | IOException ignore) {
662                         // ignore
663                     }
664                 } else {
665                     exchangeState.outputEnd = true;
666                 }
667                 clientEndpoint.releaseAndDiscard();
668             }
669         }
670 
671         @Override
672         public void releaseResources() {
673             synchronized (exchangeState) {
674                 exchangeState.requestDataChannel = null;
675                 exchangeState.responseCapacityChannel = null;
676                 clientEndpoint.releaseAndDiscard();
677             }
678         }
679 
680     }
681 
682     static void println(final String msg) {
683         if (!quiet) {
684             System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
685         }
686     }
687 }