View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.examples;
28  
29  import java.io.IOException;
30  import java.io.InterruptedIOException;
31  import java.net.InetSocketAddress;
32  import java.nio.ByteBuffer;
33  import java.nio.CharBuffer;
34  import java.nio.charset.StandardCharsets;
35  import java.util.Arrays;
36  import java.util.Collections;
37  import java.util.HashSet;
38  import java.util.Iterator;
39  import java.util.List;
40  import java.util.Set;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.atomic.AtomicLong;
43  
44  import org.apache.hc.core5.concurrent.FutureCallback;
45  import org.apache.hc.core5.http.ConnectionClosedException;
46  import org.apache.hc.core5.http.ContentType;
47  import org.apache.hc.core5.http.EntityDetails;
48  import org.apache.hc.core5.http.Header;
49  import org.apache.hc.core5.http.HeaderElements;
50  import org.apache.hc.core5.http.HttpConnection;
51  import org.apache.hc.core5.http.HttpException;
52  import org.apache.hc.core5.http.HttpHeaders;
53  import org.apache.hc.core5.http.HttpHost;
54  import org.apache.hc.core5.http.HttpRequest;
55  import org.apache.hc.core5.http.HttpResponse;
56  import org.apache.hc.core5.http.HttpStatus;
57  import org.apache.hc.core5.http.URIScheme;
58  import org.apache.hc.core5.http.impl.BasicEntityDetails;
59  import org.apache.hc.core5.http.impl.Http1StreamListener;
60  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
61  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
62  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
63  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
64  import org.apache.hc.core5.http.impl.nio.BufferedData;
65  import org.apache.hc.core5.http.message.BasicHttpRequest;
66  import org.apache.hc.core5.http.message.BasicHttpResponse;
67  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
68  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
69  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
70  import org.apache.hc.core5.http.nio.CapacityChannel;
71  import org.apache.hc.core5.http.nio.DataStreamChannel;
72  import org.apache.hc.core5.http.nio.RequestChannel;
73  import org.apache.hc.core5.http.nio.ResponseChannel;
74  import org.apache.hc.core5.http.protocol.HttpContext;
75  import org.apache.hc.core5.http.protocol.HttpCoreContext;
76  import org.apache.hc.core5.http.protocol.HttpDateGenerator;
77  import org.apache.hc.core5.io.CloseMode;
78  import org.apache.hc.core5.pool.ConnPoolListener;
79  import org.apache.hc.core5.pool.ConnPoolStats;
80  import org.apache.hc.core5.pool.PoolStats;
81  import org.apache.hc.core5.reactor.IOReactorConfig;
82  import org.apache.hc.core5.util.TextUtils;
83  import org.apache.hc.core5.util.TimeValue;
84  import org.apache.hc.core5.util.Timeout;
85  
86  /**
87   * Example of asynchronous embedded  HTTP/1.1 reverse proxy with full content streaming.
88   */
89  public class AsyncReverseProxyExample {
90  
91      private static boolean quiet;
92  
93      public static void main(final String[] args) throws Exception {
94          if (args.length < 1) {
95              System.out.println("Usage: <hostname[:port]> [listener port] [--quiet]");
96              System.exit(1);
97          }
98          // Target host
99          final HttpHost targetHost = HttpHost.create(args[0]);
100         int port = 8080;
101         if (args.length > 1) {
102             port = Integer.parseInt(args[1]);
103         }
104         for (final String s : args) {
105             if ("--quiet".equalsIgnoreCase(s)) {
106                 quiet = true;
107                 break;
108             }
109         }
110 
111         println("Reverse proxy to " + targetHost);
112 
113         final IOReactorConfig config = IOReactorConfig.custom()
114             .setSoTimeout(1, TimeUnit.MINUTES)
115             .build();
116 
117         final HttpAsyncRequester requester = AsyncRequesterBootstrap.bootstrap()
118                 .setIOReactorConfig(config)
119                 .setConnPoolListener(new ConnPoolListener<HttpHost>() {
120 
121                     @Override
122                     public void onLease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
123                         final StringBuilder buf = new StringBuilder();
124                         buf.append("[proxy->origin] connection leased ").append(route);
125                         println(buf.toString());
126                     }
127 
128                     @Override
129                     public void onRelease(final HttpHost route, final ConnPoolStats<HttpHost> connPoolStats) {
130                         final StringBuilder buf = new StringBuilder();
131                         buf.append("[proxy->origin] connection released ").append(route);
132                         final PoolStats totals = connPoolStats.getTotalStats();
133                         buf.append("; total kept alive: ").append(totals.getAvailable()).append("; ");
134                         buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
135                         buf.append(" of ").append(totals.getMax());
136                         println(buf.toString());
137                     }
138 
139                 })
140                 .setStreamListener(new Http1StreamListener() {
141 
142                     @Override
143                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
144                         // empty
145                     }
146 
147                     @Override
148                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
149                         // empty
150                     }
151 
152                     @Override
153                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
154                         println("[proxy<-origin] connection " +
155                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
156                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
157                     }
158 
159                 })
160                 .setMaxTotal(100)
161                 .setDefaultMaxPerRoute(20)
162                 .create();
163 
164         final HttpAsyncServer server = AsyncServerBootstrap.bootstrap()
165                 .setExceptionCallback(e -> e.printStackTrace())
166                 .setIOReactorConfig(config)
167                 .setStreamListener(new Http1StreamListener() {
168 
169                     @Override
170                     public void onRequestHead(final HttpConnection connection, final HttpRequest request) {
171                         // empty
172                     }
173 
174                     @Override
175                     public void onResponseHead(final HttpConnection connection, final HttpResponse response) {
176                         // empty
177                     }
178 
179                     @Override
180                     public void onExchangeComplete(final HttpConnection connection, final boolean keepAlive) {
181                         println("[client<-proxy] connection " +
182                                 connection.getLocalAddress() + "->" + connection.getRemoteAddress() +
183                                 (keepAlive ? " kept alive" : " cannot be kept alive"));
184                     }
185 
186                 })
187                 .register("*", () -> new IncomingExchangeHandler(targetHost, requester))
188                 .create();
189 
190         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
191             println("Reverse proxy shutting down");
192             server.close(CloseMode.GRACEFUL);
193             requester.close(CloseMode.GRACEFUL);
194         }));
195 
196         requester.start();
197         server.start();
198         server.listen(new InetSocketAddress(port), URIScheme.HTTP);
199         println("Listening on port " + port);
200 
201         server.awaitShutdown(TimeValue.MAX_VALUE);
202     }
203 
204     private static class ProxyBuffer extends BufferedData {
205 
206         ProxyBuffer(final int bufferSize) {
207             super(bufferSize);
208         }
209 
210         int write(final DataStreamChannel channel) throws IOException {
211             setOutputMode();
212             if (buffer().hasRemaining()) {
213                 return channel.write(buffer());
214             }
215             return 0;
216         }
217 
218     }
219 
220     private static final AtomicLong COUNT = new AtomicLong(0);
221 
222     private static class ProxyExchangeState {
223 
224         final String id;
225 
226         HttpRequest request;
227         EntityDetails requestEntityDetails;
228         DataStreamChannel requestDataChannel;
229         CapacityChannel requestCapacityChannel;
230         ProxyBuffer inBuf;
231         boolean inputEnd;
232 
233         HttpResponse response;
234         EntityDetails responseEntityDetails;
235         ResponseChannel responseMessageChannel;
236         DataStreamChannel responseDataChannel;
237         CapacityChannel responseCapacityChannel;
238         ProxyBuffer outBuf;
239         boolean outputEnd;
240 
241         AsyncClientEndpoint clientEndpoint;
242 
243         ProxyExchangeState() {
244             this.id = String.format("%010d", COUNT.getAndIncrement());
245         }
246 
247     }
248 
249     private static final int INIT_BUFFER_SIZE = 4096;
250 
251     private static class IncomingExchangeHandler implements AsyncServerExchangeHandler {
252 
253         private final HttpHost targetHost;
254         private final HttpAsyncRequester requester;
255         private final ProxyExchangeState exchangeState;
256 
257         IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) {
258             super();
259             this.targetHost = targetHost;
260             this.requester = requester;
261             this.exchangeState = new ProxyExchangeState();
262         }
263 
264         @Override
265         public void handleRequest(
266                 final HttpRequest incomingRequest,
267                 final EntityDetails entityDetails,
268                 final ResponseChannel responseChannel,
269                 final HttpContext httpContext) throws HttpException, IOException {
270 
271             synchronized (exchangeState) {
272                 println("[client->proxy] " + exchangeState.id + " " +
273                         incomingRequest.getMethod() + " " + incomingRequest.getRequestUri());
274                 exchangeState.request = incomingRequest;
275                 exchangeState.requestEntityDetails = entityDetails;
276                 exchangeState.inputEnd = entityDetails == null;
277                 exchangeState.responseMessageChannel = responseChannel;
278 
279                 if (entityDetails != null) {
280                     final Header h = incomingRequest.getFirstHeader(HttpHeaders.EXPECT);
281                     if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
282                         responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext);
283                     }
284                 }
285             }
286 
287             println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost);
288 
289             requester.connect(targetHost, Timeout.ofSeconds(30), null, new FutureCallback<AsyncClientEndpoint>() {
290 
291                 @Override
292                 public void completed(final AsyncClientEndpoint clientEndpoint) {
293                     println("[proxy->origin] " + exchangeState.id + " connection leased");
294                     synchronized (exchangeState) {
295                         exchangeState.clientEndpoint = clientEndpoint;
296                     }
297                     clientEndpoint.execute(
298                             new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState),
299                             HttpCoreContext.create());
300                 }
301 
302                 @Override
303                 public void failed(final Exception cause) {
304                     final HttpResponse outgoingResponse = new BasicHttpResponse(HttpStatus.SC_SERVICE_UNAVAILABLE);
305                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
306                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
307                     final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(),
308                                     ContentType.TEXT_PLAIN);
309                     synchronized (exchangeState) {
310                         exchangeState.response = outgoingResponse;
311                         exchangeState.responseEntityDetails = exEntityDetails;
312                         exchangeState.outBuf = new ProxyBuffer(1024);
313                         exchangeState.outBuf.put(msg);
314                         exchangeState.outputEnd = true;
315                     }
316                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
317 
318                     try {
319                         responseChannel.sendResponse(outgoingResponse, exEntityDetails, httpContext);
320                     } catch (final HttpException | IOException ignore) {
321                         // ignore
322                     }
323                 }
324 
325                 @Override
326                 public void cancelled() {
327                     failed(new InterruptedIOException());
328                 }
329 
330             });
331 
332         }
333 
334         @Override
335         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
336             synchronized (exchangeState) {
337                 exchangeState.requestCapacityChannel = capacityChannel;
338                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
339                 if (capacity > 0) {
340                     println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
341                     capacityChannel.update(capacity);
342                 }
343             }
344         }
345 
346         @Override
347         public void consume(final ByteBuffer src) throws IOException {
348             synchronized (exchangeState) {
349                 println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received");
350                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
351                 if (dataChannel != null && exchangeState.inBuf != null) {
352                     if (exchangeState.inBuf.hasData()) {
353                         final int bytesWritten = exchangeState.inBuf.write(dataChannel);
354                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
355                     }
356                     if (!exchangeState.inBuf.hasData()) {
357                         final int bytesWritten = dataChannel.write(src);
358                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
359                     }
360                 }
361                 if (src.hasRemaining()) {
362                     if (exchangeState.inBuf == null) {
363                         exchangeState.inBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
364                     }
365                     exchangeState.inBuf.put(src);
366                 }
367                 final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE;
368                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
369                 if (dataChannel != null) {
370                     dataChannel.requestOutput();
371                 }
372             }
373         }
374 
375         @Override
376         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
377             synchronized (exchangeState) {
378                 println("[client->proxy] " + exchangeState.id + " end of input");
379                 exchangeState.inputEnd = true;
380                 final DataStreamChannel dataChannel = exchangeState.requestDataChannel;
381                 if (dataChannel != null && (exchangeState.inBuf == null || !exchangeState.inBuf.hasData())) {
382                     println("[proxy->origin] " + exchangeState.id + " end of output");
383                     dataChannel.endStream();
384                 }
385             }
386         }
387 
388         @Override
389         public int available() {
390             synchronized (exchangeState) {
391                 final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0;
392                 println("[client<-proxy] " + exchangeState.id + " output available: " + available);
393                 return available;
394             }
395         }
396 
397         @Override
398         public void produce(final DataStreamChannel channel) throws IOException {
399             synchronized (exchangeState) {
400                 println("[client<-proxy] " + exchangeState.id + " produce output");
401                 exchangeState.responseDataChannel = channel;
402 
403                 if (exchangeState.outBuf != null) {
404                     if (exchangeState.outBuf.hasData()) {
405                         final int bytesWritten = exchangeState.outBuf.write(channel);
406                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
407                     }
408                     if (exchangeState.outputEnd && !exchangeState.outBuf.hasData()) {
409                         channel.endStream();
410                         println("[client<-proxy] " + exchangeState.id + " end of output");
411                     }
412                     if (!exchangeState.outputEnd) {
413                         final CapacityChannel capacityChannel = exchangeState.responseCapacityChannel;
414                         if (capacityChannel != null) {
415                             final int capacity = exchangeState.outBuf.capacity();
416                             if (capacity > 0) {
417                                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
418                                 capacityChannel.update(capacity);
419                             }
420                         }
421                     }
422                 }
423             }
424         }
425 
426         @Override
427         public void failed(final Exception cause) {
428             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
429             if (!(cause instanceof ConnectionClosedException)) {
430                 cause.printStackTrace(System.out);
431             }
432             synchronized (exchangeState) {
433                 if (exchangeState.clientEndpoint != null) {
434                     exchangeState.clientEndpoint.releaseAndDiscard();
435                 }
436             }
437         }
438 
439         @Override
440         public void releaseResources() {
441             synchronized (exchangeState) {
442                 exchangeState.responseMessageChannel = null;
443                 exchangeState.responseDataChannel = null;
444                 exchangeState.requestCapacityChannel = null;
445             }
446         }
447 
448     }
449 
450     private static class OutgoingExchangeHandler implements AsyncClientExchangeHandler {
451 
452         private final static Set<String> HOP_BY_HOP = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
453                 TextUtils.toLowerCase(HttpHeaders.HOST),
454                 TextUtils.toLowerCase(HttpHeaders.CONTENT_LENGTH),
455                 TextUtils.toLowerCase(HttpHeaders.TRANSFER_ENCODING),
456                 TextUtils.toLowerCase(HttpHeaders.CONNECTION),
457                 TextUtils.toLowerCase(HttpHeaders.KEEP_ALIVE),
458                 TextUtils.toLowerCase(HttpHeaders.PROXY_AUTHENTICATE),
459                 TextUtils.toLowerCase(HttpHeaders.TE),
460                 TextUtils.toLowerCase(HttpHeaders.TRAILER),
461                 TextUtils.toLowerCase(HttpHeaders.UPGRADE))));
462 
463         private final HttpHost targetHost;
464         private final AsyncClientEndpoint clientEndpoint;
465         private final ProxyExchangeState exchangeState;
466 
467         OutgoingExchangeHandler(
468                 final HttpHost targetHost,
469                 final AsyncClientEndpoint clientEndpoint,
470                 final ProxyExchangeState exchangeState) {
471             this.targetHost = targetHost;
472             this.clientEndpoint = clientEndpoint;
473             this.exchangeState = exchangeState;
474         }
475 
476         @Override
477         public void produceRequest(
478                 final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
479             synchronized (exchangeState) {
480                 final HttpRequest incomingRequest = exchangeState.request;
481                 final EntityDetails entityDetails = exchangeState.requestEntityDetails;
482                 final HttpRequest outgoingRequest = new BasicHttpRequest(
483                         incomingRequest.getMethod(),
484                         targetHost,
485                         incomingRequest.getPath());
486                 for (final Iterator<Header> it = incomingRequest.headerIterator(); it.hasNext(); ) {
487                     final Header header = it.next();
488                     if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
489                         outgoingRequest.addHeader(header);
490                     }
491                 }
492 
493                 println("[proxy->origin] " + exchangeState.id + " " +
494                         outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri());
495 
496                 channel.sendRequest(outgoingRequest, entityDetails, httpContext);
497             }
498         }
499 
500         @Override
501         public int available() {
502             synchronized (exchangeState) {
503                 final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0;
504                 println("[proxy->origin] " + exchangeState.id + " output available: " + available);
505                 return available;
506             }
507         }
508 
509         @Override
510         public void produce(final DataStreamChannel channel) throws IOException {
511             synchronized (exchangeState) {
512                 println("[proxy->origin] " + exchangeState.id + " produce output");
513                 exchangeState.requestDataChannel = channel;
514                 if (exchangeState.inBuf != null) {
515                     if (exchangeState.inBuf.hasData()) {
516                         final int bytesWritten = exchangeState.inBuf.write(channel);
517                         println("[proxy->origin] " + exchangeState.id + " " + bytesWritten + " bytes sent");
518                     }
519                     if (exchangeState.inputEnd && !exchangeState.inBuf.hasData()) {
520                         channel.endStream();
521                         println("[proxy->origin] " + exchangeState.id + " end of output");
522                     }
523                     if (!exchangeState.inputEnd) {
524                         final CapacityChannel capacityChannel = exchangeState.requestCapacityChannel;
525                         if (capacityChannel != null) {
526                             final int capacity = exchangeState.inBuf.capacity();
527                             if (capacity > 0) {
528                                 println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity);
529                                 capacityChannel.update(capacity);
530                             }
531                         }
532                     }
533                 }
534             }
535         }
536 
537         @Override
538         public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
539             // ignore
540         }
541 
542         @Override
543         public void consumeResponse(
544                 final HttpResponse incomingResponse,
545                 final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
546             synchronized (exchangeState) {
547                 println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode());
548                 if (entityDetails == null) {
549                     println("[proxy<-origin] " + exchangeState.id + " end of input");
550                 }
551 
552                 final HttpResponse outgoingResponse = new BasicHttpResponse(incomingResponse.getCode());
553                 for (final Iterator<Header> it = incomingResponse.headerIterator(); it.hasNext(); ) {
554                     final Header header = it.next();
555                     if (!HOP_BY_HOP.contains(TextUtils.toLowerCase(header.getName()))) {
556                         outgoingResponse.addHeader(header);
557                     }
558                 }
559 
560                 exchangeState.response = outgoingResponse;
561                 exchangeState.responseEntityDetails = entityDetails;
562                 exchangeState.outputEnd = entityDetails == null;
563 
564                 final ResponseChannel responseChannel = exchangeState.responseMessageChannel;
565                 if (responseChannel != null) {
566                     // responseChannel can be null under load.
567                     responseChannel.sendResponse(outgoingResponse, entityDetails, httpContext);
568                 }
569 
570                 println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
571                 if (entityDetails == null) {
572                     println("[client<-proxy] " + exchangeState.id + " end of output");
573                     clientEndpoint.releaseAndReuse();
574                 }
575             }
576         }
577 
578         @Override
579         public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
580             synchronized (exchangeState) {
581                 exchangeState.responseCapacityChannel = capacityChannel;
582                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
583                 if (capacity > 0) {
584                     println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
585                     capacityChannel.update(capacity);
586                 }
587             }
588         }
589 
590         @Override
591         public void consume(final ByteBuffer src) throws IOException {
592             synchronized (exchangeState) {
593                 println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received");
594                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
595                 if (dataChannel != null && exchangeState.outBuf != null) {
596                     if (exchangeState.outBuf.hasData()) {
597                         final int bytesWritten = exchangeState.outBuf.write(dataChannel);
598                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
599                     }
600                     if (!exchangeState.outBuf.hasData()) {
601                         final int bytesWritten = dataChannel.write(src);
602                         println("[client<-proxy] " + exchangeState.id + " " + bytesWritten + " bytes sent");
603                     }
604                 }
605                 if (src.hasRemaining()) {
606                     if (exchangeState.outBuf == null) {
607                         exchangeState.outBuf = new ProxyBuffer(INIT_BUFFER_SIZE);
608                     }
609                     exchangeState.outBuf.put(src);
610                 }
611                 final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE;
612                 println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity);
613                 if (dataChannel != null) {
614                     dataChannel.requestOutput();
615                 }
616             }
617         }
618 
619         @Override
620         public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
621             synchronized (exchangeState) {
622                 println("[proxy<-origin] " + exchangeState.id + " end of input");
623                 exchangeState.outputEnd = true;
624                 final DataStreamChannel dataChannel = exchangeState.responseDataChannel;
625                 if (dataChannel != null && (exchangeState.outBuf == null || !exchangeState.outBuf.hasData())) {
626                     println("[client<-proxy] " + exchangeState.id + " end of output");
627                     dataChannel.endStream();
628                     clientEndpoint.releaseAndReuse();
629                 }
630             }
631         }
632 
633         @Override
634         public void cancel() {
635             clientEndpoint.releaseAndDiscard();
636         }
637 
638         @Override
639         public void failed(final Exception cause) {
640             println("[client<-proxy] " + exchangeState.id + " " + cause.getMessage());
641             if (!(cause instanceof ConnectionClosedException)) {
642                 cause.printStackTrace(System.out);
643             }
644             synchronized (exchangeState) {
645                 if (exchangeState.response == null) {
646                     final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR;
647                     final HttpResponse outgoingResponse = new BasicHttpResponse(status);
648                     outgoingResponse.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
649                     exchangeState.response = outgoingResponse;
650 
651                     final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage()));
652                     final int contentLen = msg.remaining();
653                     exchangeState.outBuf = new ProxyBuffer(1024);
654                     exchangeState.outBuf.put(msg);
655                     exchangeState.outputEnd = true;
656 
657                     println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode());
658 
659                     try {
660                         final EntityDetails entityDetails = new BasicEntityDetails(contentLen, ContentType.TEXT_PLAIN);
661                         exchangeState.responseMessageChannel.sendResponse(outgoingResponse, entityDetails, null);
662                     } catch (final HttpException | IOException ignore) {
663                         // ignore
664                     }
665                 } else {
666                     exchangeState.outputEnd = true;
667                 }
668                 clientEndpoint.releaseAndDiscard();
669             }
670         }
671 
672         @Override
673         public void releaseResources() {
674             synchronized (exchangeState) {
675                 exchangeState.requestDataChannel = null;
676                 exchangeState.responseCapacityChannel = null;
677                 clientEndpoint.releaseAndDiscard();
678             }
679         }
680 
681     }
682 
683     static void println(final String msg) {
684         if (!quiet) {
685             System.out.println(HttpDateGenerator.INSTANCE.getCurrentDate() + " | " + msg);
686         }
687     }
688 }