View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.BufferedReader;
31  import java.io.BufferedWriter;
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.io.InputStreamReader;
35  import java.io.InterruptedIOException;
36  import java.io.OutputStream;
37  import java.io.OutputStreamWriter;
38  import java.net.InetSocketAddress;
39  import java.net.URI;
40  import java.net.URISyntaxException;
41  import java.nio.ByteBuffer;
42  import java.nio.channels.WritableByteChannel;
43  import java.nio.charset.Charset;
44  import java.nio.charset.StandardCharsets;
45  import java.util.Arrays;
46  import java.util.Collection;
47  import java.util.HashMap;
48  import java.util.LinkedList;
49  import java.util.List;
50  import java.util.Locale;
51  import java.util.Map;
52  import java.util.Queue;
53  import java.util.Random;
54  import java.util.StringTokenizer;
55  import java.util.concurrent.CancellationException;
56  import java.util.concurrent.ExecutionException;
57  import java.util.concurrent.Executors;
58  import java.util.concurrent.Future;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  import org.apache.hc.core5.function.Decorator;
62  import org.apache.hc.core5.function.Supplier;
63  import org.apache.hc.core5.http.ConnectionReuseStrategy;
64  import org.apache.hc.core5.http.ContentLengthStrategy;
65  import org.apache.hc.core5.http.ContentType;
66  import org.apache.hc.core5.http.EntityDetails;
67  import org.apache.hc.core5.http.Header;
68  import org.apache.hc.core5.http.HeaderElements;
69  import org.apache.hc.core5.http.HttpException;
70  import org.apache.hc.core5.http.HttpHeaders;
71  import org.apache.hc.core5.http.HttpHost;
72  import org.apache.hc.core5.http.HttpRequest;
73  import org.apache.hc.core5.http.HttpResponse;
74  import org.apache.hc.core5.http.HttpStatus;
75  import org.apache.hc.core5.http.HttpVersion;
76  import org.apache.hc.core5.http.MalformedChunkCodingException;
77  import org.apache.hc.core5.http.Message;
78  import org.apache.hc.core5.http.Method;
79  import org.apache.hc.core5.http.ProtocolException;
80  import org.apache.hc.core5.http.URIScheme;
81  import org.apache.hc.core5.http.config.CharCodingConfig;
82  import org.apache.hc.core5.http.config.Http1Config;
83  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
84  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
85  import org.apache.hc.core5.http.impl.Http1StreamListener;
86  import org.apache.hc.core5.http.impl.HttpProcessors;
87  import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
88  import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
89  import org.apache.hc.core5.http.message.BasicHttpRequest;
90  import org.apache.hc.core5.http.message.BasicHttpResponse;
91  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
92  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
93  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
94  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
95  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
96  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
97  import org.apache.hc.core5.http.nio.CapacityChannel;
98  import org.apache.hc.core5.http.nio.ContentEncoder;
99  import org.apache.hc.core5.http.nio.DataStreamChannel;
100 import org.apache.hc.core5.http.nio.HandlerFactory;
101 import org.apache.hc.core5.http.nio.NHttpMessageParser;
102 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
103 import org.apache.hc.core5.http.nio.ResponseChannel;
104 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
105 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
107 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
109 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
110 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
111 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
112 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
113 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
114 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
116 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
117 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
120 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
121 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
122 import org.apache.hc.core5.http.protocol.HttpContext;
123 import org.apache.hc.core5.http.protocol.HttpProcessor;
124 import org.apache.hc.core5.http.protocol.RequestConnControl;
125 import org.apache.hc.core5.http.protocol.RequestContent;
126 import org.apache.hc.core5.http.protocol.RequestTargetHost;
127 import org.apache.hc.core5.http.protocol.RequestValidateHost;
128 import org.apache.hc.core5.reactor.IOReactorConfig;
129 import org.apache.hc.core5.reactor.IOSession;
130 import org.apache.hc.core5.reactor.ProtocolIOSession;
131 import org.apache.hc.core5.testing.SSLTestContexts;
132 import org.apache.hc.core5.util.CharArrayBuffer;
133 import org.apache.hc.core5.util.TextUtils;
134 import org.apache.hc.core5.util.TimeValue;
135 import org.apache.hc.core5.util.Timeout;
136 import org.junit.After;
137 import org.junit.Assert;
138 import org.junit.Before;
139 import org.junit.Test;
140 import org.junit.runner.RunWith;
141 import org.junit.runners.Parameterized;
142 import org.slf4j.Logger;
143 import org.slf4j.LoggerFactory;
144 
145 @RunWith(Parameterized.class)
146 public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
147 
148     private final Logger log = LoggerFactory.getLogger(getClass());
149 
150     @Parameterized.Parameters(name = "{0}")
151     public static Collection<Object[]> protocols() {
152         return Arrays.asList(new Object[][]{
153                 { URIScheme.HTTP },
154                 { URIScheme.HTTPS }
155         });
156     }
157 
158     public Http1IntegrationTest(final URIScheme scheme) {
159         super(scheme);
160     }
161 
162     private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
163     private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
164 
165     private Http1TestClient client;
166 
167     @Before
168     public void setup() throws Exception {
169         log.debug("Starting up test client");
170         client = new Http1TestClient(
171                 buildReactorConfig(),
172                 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
173     }
174 
175     protected IOReactorConfig buildReactorConfig() {
176         return IOReactorConfig.DEFAULT;
177     }
178 
179     @After
180     public void cleanup() throws Exception {
181         log.debug("Shutting down test client");
182         if (client != null) {
183             client.shutdown(TimeValue.ofSeconds(5));
184         }
185     }
186 
187     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
188         try {
189             return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
190         } catch (final URISyntaxException e) {
191             throw new IllegalStateException();
192         }
193     }
194 
195     @Test
196     public void testSimpleGet() throws Exception {
197         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
198 
199             @Override
200             public AsyncServerExchangeHandler get() {
201                 return new SingleLineResponseHandler("Hi there");
202             }
203 
204         });
205         final InetSocketAddress serverEndpoint = server.start();
206 
207         client.start();
208         final Future<ClientSessionEndpoint> connectFuture = client.connect(
209                 "localhost", serverEndpoint.getPort(), TIMEOUT);
210         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
211 
212         for (int i = 0; i < 5; i++) {
213             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
214                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
215                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
216             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
217             Assert.assertNotNull(result);
218             final HttpResponse response1 = result.getHead();
219             final String entity1 = result.getBody();
220             Assert.assertNotNull(response1);
221             Assert.assertEquals(200, response1.getCode());
222             Assert.assertEquals("Hi there", entity1);
223         }
224     }
225 
226     @Test
227     public void testSimpleGetConnectionClose() throws Exception {
228         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
229 
230             @Override
231             public AsyncServerExchangeHandler get() {
232                 return new SingleLineResponseHandler("Hi there");
233             }
234 
235         });
236         final InetSocketAddress serverEndpoint = server.start();
237 
238         client.start();
239         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
240         for (int i = 0; i < 5; i++) {
241             final Future<ClientSessionEndpoint> connectFuture = client.connect(
242                     "localhost", serverEndpoint.getPort(), TIMEOUT);
243             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
244                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
245                         AsyncRequestBuilder.get(requestURI)
246                                 .addHeader(HttpHeaders.CONNECTION, "close")
247                                 .build(),
248                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
249                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
250                 Assert.assertNotNull(result);
251                 final HttpResponse response1 = result.getHead();
252                 final String entity1 = result.getBody();
253                 Assert.assertNotNull(response1);
254                 Assert.assertEquals(200, response1.getCode());
255                 Assert.assertEquals("Hi there", entity1);
256             }
257         }
258     }
259 
260     @Test
261     public void testSimpleGetIdentityTransfer() throws Exception {
262         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
263 
264             @Override
265             public AsyncServerExchangeHandler get() {
266                 return new SingleLineResponseHandler("Hi there");
267             }
268 
269         });
270         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
271         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
272 
273         client.start();
274 
275         final int reqNo = 5;
276 
277         for (int i = 0; i < reqNo; i++) {
278             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
279             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
280 
281             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
282                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
283                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
284             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
285 
286             streamEndpoint.close();
287 
288             Assert.assertNotNull(result);
289             final HttpResponse response = result.getHead();
290             final String entity = result.getBody();
291             Assert.assertNotNull(response);
292             Assert.assertEquals(200, response.getCode());
293             Assert.assertEquals("Hi there", entity);
294         }
295 
296     }
297 
298     @Test
299     public void testPostIdentityTransfer() throws Exception {
300         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
301 
302             @Override
303             public AsyncServerExchangeHandler get() {
304                 return new SingleLineResponseHandler("Hi there");
305             }
306 
307         });
308         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
309         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
310 
311         client.start();
312 
313         final int reqNo = 5;
314 
315         for (int i = 0; i < reqNo; i++) {
316             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
317             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
318 
319             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
320                     new BasicRequestProducer(Method.POST,
321                             createRequestURI(serverEndpoint, "/hello"),
322                             new MultiLineEntityProducer("Hello", 16 * i)),
323                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
324             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
325 
326             streamEndpoint.close();
327 
328             Assert.assertNotNull(result);
329             final HttpResponse response = result.getHead();
330             final String entity = result.getBody();
331             Assert.assertNotNull(response);
332             Assert.assertEquals(200, response.getCode());
333             Assert.assertEquals("Hi there", entity);
334         }
335     }
336 
337     @Test
338     public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
339         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
340 
341             @Override
342             public AsyncServerExchangeHandler get() {
343                 return new ImmediateResponseExchangeHandler(500, "Go away");
344             }
345 
346         });
347         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
348         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
349 
350         client.start();
351 
352         final int reqNo = 5;
353 
354         for (int i = 0; i < reqNo; i++) {
355             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
356             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
357 
358             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
359                     new BasicRequestProducer(Method.POST,
360                             createRequestURI(serverEndpoint, "/hello"),
361                             new MultiLineEntityProducer("Hello", 16 * i)),
362                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
363             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
364 
365             streamEndpoint.close();
366 
367             Assert.assertNotNull(result);
368             final HttpResponse response = result.getHead();
369             final String entity = result.getBody();
370             Assert.assertNotNull(response);
371             Assert.assertEquals(500, response.getCode());
372             Assert.assertEquals("Go away", entity);
373         }
374     }
375 
376     @Test
377     public void testSimpleGetsPipelined() throws Exception {
378         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
379 
380             @Override
381             public AsyncServerExchangeHandler get() {
382                 return new SingleLineResponseHandler("Hi there");
383             }
384 
385         });
386         final InetSocketAddress serverEndpoint = server.start();
387 
388         client.start();
389         final Future<ClientSessionEndpoint> connectFuture = client.connect(
390                 "localhost", serverEndpoint.getPort(), TIMEOUT);
391         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
392 
393         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
394         for (int i = 0; i < 5; i++) {
395             queue.add(streamEndpoint.execute(
396                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
397                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
398         }
399         while (!queue.isEmpty()) {
400             final Future<Message<HttpResponse, String>> future = queue.remove();
401             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
402             Assert.assertNotNull(result);
403             final HttpResponse response = result.getHead();
404             final String entity = result.getBody();
405             Assert.assertNotNull(response);
406             Assert.assertEquals(200, response.getCode());
407             Assert.assertEquals("Hi there", entity);
408         }
409     }
410 
411     @Test
412     public void testLargeGet() throws Exception {
413         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
414 
415             @Override
416             public AsyncServerExchangeHandler get() {
417                 return new MultiLineResponseHandler("0123456789abcdef", 5000);
418             }
419 
420         });
421         final InetSocketAddress serverEndpoint = server.start();
422 
423         client.start();
424         final Future<ClientSessionEndpoint> connectFuture = client.connect(
425                 "localhost", serverEndpoint.getPort(), TIMEOUT);
426         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
427 
428         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
429                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
430                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
431 
432         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
433         Assert.assertNotNull(result1);
434         final HttpResponse response1 = result1.getHead();
435         Assert.assertNotNull(response1);
436         Assert.assertEquals(200, response1.getCode());
437         final String s1 = result1.getBody();
438         Assert.assertNotNull(s1);
439         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
440         while (t1.hasMoreTokens()) {
441             Assert.assertEquals("0123456789abcdef", t1.nextToken());
442         }
443 
444         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
445                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
446                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
447 
448         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
449         Assert.assertNotNull(result2);
450         final HttpResponse response2 = result2.getHead();
451         Assert.assertNotNull(response2);
452         Assert.assertEquals(200, response2.getCode());
453         final String s2 = result2.getBody();
454         Assert.assertNotNull(s2);
455         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
456         while (t2.hasMoreTokens()) {
457             Assert.assertEquals("0123456789abcdef", t2.nextToken());
458         }
459     }
460 
461     @Test
462     public void testLargeGetsPipelined() throws Exception {
463         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
464 
465             @Override
466             public AsyncServerExchangeHandler get() {
467                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
468             }
469 
470         });
471         final InetSocketAddress serverEndpoint = server.start();
472 
473         client.start();
474         final Future<ClientSessionEndpoint> connectFuture = client.connect(
475                 "localhost", serverEndpoint.getPort(), TIMEOUT);
476         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477 
478         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
479         for (int i = 0; i < 5; i++) {
480             queue.add(streamEndpoint.execute(
481                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
482                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
483         }
484         while (!queue.isEmpty()) {
485             final Future<Message<HttpResponse, String>> future = queue.remove();
486             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
487             Assert.assertNotNull(result);
488             final HttpResponse response = result.getHead();
489             Assert.assertNotNull(response);
490             Assert.assertEquals(200, response.getCode());
491             final String entity = result.getBody();
492             Assert.assertNotNull(entity);
493             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
494             while (t.hasMoreTokens()) {
495                 Assert.assertEquals("0123456789abcdef", t.nextToken());
496             }
497         }
498     }
499 
500     @Test
501     public void testBasicPost() throws Exception {
502         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
503 
504             @Override
505             public AsyncServerExchangeHandler get() {
506                 return new SingleLineResponseHandler("Hi back");
507             }
508 
509         });
510         final InetSocketAddress serverEndpoint = server.start();
511 
512         client.start();
513         final Future<ClientSessionEndpoint> connectFuture = client.connect(
514                 "localhost", serverEndpoint.getPort(), TIMEOUT);
515         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
516 
517         for (int i = 0; i < 5; i++) {
518             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
519                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
520                             AsyncEntityProducers.create("Hi there")),
521                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
522             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
523             Assert.assertNotNull(result);
524             final HttpResponse response1 = result.getHead();
525             final String entity1 = result.getBody();
526             Assert.assertNotNull(response1);
527             Assert.assertEquals(200, response1.getCode());
528             Assert.assertEquals("Hi back", entity1);
529         }
530     }
531 
532     @Test
533     public void testBasicPostPipelined() throws Exception {
534         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
535 
536             @Override
537             public AsyncServerExchangeHandler get() {
538                 return new SingleLineResponseHandler("Hi back");
539             }
540 
541         });
542         final InetSocketAddress serverEndpoint = server.start();
543 
544         client.start();
545         final Future<ClientSessionEndpoint> connectFuture = client.connect(
546                 "localhost", serverEndpoint.getPort(), TIMEOUT);
547         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
548 
549         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
550         for (int i = 0; i < 5; i++) {
551             queue.add(streamEndpoint.execute(
552                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
553                             AsyncEntityProducers.create("Hi there")),
554                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
555         }
556         while (!queue.isEmpty()) {
557             final Future<Message<HttpResponse, String>> future = queue.remove();
558             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
559             Assert.assertNotNull(result);
560             final HttpResponse response = result.getHead();
561             final String entity = result.getBody();
562             Assert.assertNotNull(response);
563             Assert.assertEquals(200, response.getCode());
564             Assert.assertEquals("Hi back", entity);
565         }
566     }
567 
568     @Test
569     public void testHttp10Post() throws Exception {
570         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
571 
572             @Override
573             public AsyncServerExchangeHandler get() {
574                 return new SingleLineResponseHandler("Hi back");
575             }
576 
577         });
578         final InetSocketAddress serverEndpoint = server.start();
579 
580         client.start();
581         final Future<ClientSessionEndpoint> connectFuture = client.connect(
582                 "localhost", serverEndpoint.getPort(), TIMEOUT);
583         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
584 
585         for (int i = 0; i < 5; i++) {
586             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
587             request.setVersion(HttpVersion.HTTP_1_0);
588             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
589                     new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
590                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
591             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
592             Assert.assertNotNull(result);
593             final HttpResponse response1 = result.getHead();
594             final String entity1 = result.getBody();
595             Assert.assertNotNull(response1);
596             Assert.assertEquals(200, response1.getCode());
597             Assert.assertEquals("Hi back", entity1);
598         }
599     }
600 
601     @Test
602     public void testNoEntityPost() throws Exception {
603         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
604 
605             @Override
606             public AsyncServerExchangeHandler get() {
607                 return new SingleLineResponseHandler("Hi back");
608             }
609 
610         });
611         final InetSocketAddress serverEndpoint = server.start();
612 
613         client.start();
614         final Future<ClientSessionEndpoint> connectFuture = client.connect(
615                 "localhost", serverEndpoint.getPort(), TIMEOUT);
616         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
617 
618         for (int i = 0; i < 5; i++) {
619             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
620             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
621                     new BasicRequestProducer(request, null),
622                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
623             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
624             Assert.assertNotNull(result);
625             final HttpResponse response1 = result.getHead();
626             final String entity1 = result.getBody();
627             Assert.assertNotNull(response1);
628             Assert.assertEquals(200, response1.getCode());
629             Assert.assertEquals("Hi back", entity1);
630         }
631     }
632 
633     @Test
634     public void testLargePost() throws Exception {
635         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
636 
637             @Override
638             public AsyncServerExchangeHandler get() {
639                 return new EchoHandler(2048);
640             }
641 
642         });
643         final InetSocketAddress serverEndpoint = server.start();
644 
645         client.start();
646         final Future<ClientSessionEndpoint> connectFuture = client.connect(
647                 "localhost", serverEndpoint.getPort(), TIMEOUT);
648         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
649 
650         for (int i = 0; i < 5; i++) {
651             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
652                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
653                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
654                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
655             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
656             Assert.assertNotNull(result);
657             final HttpResponse response = result.getHead();
658             Assert.assertNotNull(response);
659             Assert.assertEquals(200, response.getCode());
660             final String entity = result.getBody();
661             Assert.assertNotNull(entity);
662             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
663             while (t.hasMoreTokens()) {
664                 Assert.assertEquals("0123456789abcdef", t.nextToken());
665             }
666         }
667     }
668 
669     @Test
670     public void testPostsPipelinedLargeResponse() throws Exception {
671         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
672 
673             @Override
674             public AsyncServerExchangeHandler get() {
675                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
676             }
677 
678         });
679         final InetSocketAddress serverEndpoint = server.start();
680 
681         client.start();
682         final Future<ClientSessionEndpoint> connectFuture = client.connect(
683                 "localhost", serverEndpoint.getPort(), TIMEOUT);
684         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
685 
686         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
687         for (int i = 0; i < 2; i++) {
688             queue.add(streamEndpoint.execute(
689                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
690                             AsyncEntityProducers.create("Hi there")),
691                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
692         }
693         while (!queue.isEmpty()) {
694             final Future<Message<HttpResponse, String>> future = queue.remove();
695             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
696             Assert.assertNotNull(result);
697             final HttpResponse response = result.getHead();
698             Assert.assertNotNull(response);
699             Assert.assertEquals(200, response.getCode());
700             final String entity = result.getBody();
701             Assert.assertNotNull(entity);
702             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
703             while (t.hasMoreTokens()) {
704                 Assert.assertEquals("0123456789abcdef", t.nextToken());
705             }
706         }
707     }
708 
709 
710     @Test
711     public void testLargePostsPipelined() throws Exception {
712         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
713 
714             @Override
715             public AsyncServerExchangeHandler get() {
716                 return new EchoHandler(2048);
717             }
718 
719         });
720         final InetSocketAddress serverEndpoint = server.start();
721 
722         client.start();
723         final Future<ClientSessionEndpoint> connectFuture = client.connect(
724                 "localhost", serverEndpoint.getPort(), TIMEOUT);
725         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
726 
727         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
728         for (int i = 0; i < 5; i++) {
729             queue.add(streamEndpoint.execute(
730                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
731                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
732                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
733         }
734         while (!queue.isEmpty()) {
735             final Future<Message<HttpResponse, String>> future = queue.remove();
736             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
737             Assert.assertNotNull(result);
738             final HttpResponse response = result.getHead();
739             Assert.assertNotNull(response);
740             Assert.assertEquals(200, response.getCode());
741             final String entity = result.getBody();
742             Assert.assertNotNull(entity);
743             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
744             while (t.hasMoreTokens()) {
745                 Assert.assertEquals("0123456789abcdef", t.nextToken());
746             }
747         }
748     }
749 
750     @Test
751     public void testSimpleHead() throws Exception {
752         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
753 
754             @Override
755             public AsyncServerExchangeHandler get() {
756                 return new SingleLineResponseHandler("Hi there");
757             }
758 
759         });
760         final InetSocketAddress serverEndpoint = server.start();
761 
762         client.start();
763         final Future<ClientSessionEndpoint> connectFuture = client.connect(
764                 "localhost", serverEndpoint.getPort(), TIMEOUT);
765         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
766 
767         for (int i = 0; i < 5; i++) {
768             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
769                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
770                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
771             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
772             Assert.assertNotNull(result);
773             final HttpResponse response1 = result.getHead();
774             Assert.assertNotNull(response1);
775             Assert.assertEquals(200, response1.getCode());
776             Assert.assertNull(result.getBody());
777         }
778     }
779 
780     @Test
781     public void testSimpleHeadConnectionClose() throws Exception {
782         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
783 
784             @Override
785             public AsyncServerExchangeHandler get() {
786                 return new SingleLineResponseHandler("Hi there");
787             }
788 
789         });
790         final InetSocketAddress serverEndpoint = server.start();
791 
792         client.start();
793         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
794         for (int i = 0; i < 5; i++) {
795             final Future<ClientSessionEndpoint> connectFuture = client.connect(
796                     "localhost", serverEndpoint.getPort(), TIMEOUT);
797             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
798                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
799                         AsyncRequestBuilder.head(requestURI)
800                                 .addHeader(HttpHeaders.CONNECTION, "close")
801                                 .build(),
802                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
803                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
804                 Assert.assertNotNull(result);
805                 final HttpResponse response1 = result.getHead();
806                 Assert.assertNotNull(response1);
807                 Assert.assertEquals(200, response1.getCode());
808                 Assert.assertNull(result.getBody());
809             }
810         }
811     }
812 
813     @Test
814     public void testHeadPipelined() throws Exception {
815         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
816 
817             @Override
818             public AsyncServerExchangeHandler get() {
819                 return new SingleLineResponseHandler("Hi there");
820             }
821 
822         });
823         final InetSocketAddress serverEndpoint = server.start();
824 
825         client.start();
826         final Future<ClientSessionEndpoint> connectFuture = client.connect(
827                 "localhost", serverEndpoint.getPort(), TIMEOUT);
828         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
829 
830         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
831         for (int i = 0; i < 5; i++) {
832             queue.add(streamEndpoint.execute(
833                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
834                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
835         }
836         while (!queue.isEmpty()) {
837             final Future<Message<HttpResponse, String>> future = queue.remove();
838             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
839             Assert.assertNotNull(result);
840             final HttpResponse response1 = result.getHead();
841             Assert.assertNotNull(response1);
842             Assert.assertEquals(200, response1.getCode());
843             Assert.assertNull(result.getBody());
844         }
845     }
846 
847     @Test
848     public void testExpectationFailed() throws Exception {
849         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
850 
851             @Override
852             public AsyncServerExchangeHandler get() {
853                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
854 
855                     @Override
856                     protected void handle(
857                             final Message<HttpRequest, String> request,
858                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
859                             final HttpContext context) throws IOException, HttpException {
860                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
861 
862                     }
863                 };
864             }
865 
866         });
867         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
868 
869             @Override
870             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
871 
872                 return new BasicAsyncServerExpectationDecorator(handler) {
873 
874                     @Override
875                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
876                         final Header h = request.getFirstHeader("password");
877                         if (h != null && "secret".equals(h.getValue())) {
878                             return null;
879                         } else {
880                             return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
881                         }
882                     }
883                 };
884 
885             }
886         }, Http1Config.DEFAULT);
887 
888         client.start();
889         final Future<IOSession> sessionFuture = client.requestSession(
890                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
891         final IOSession ioSession = sessionFuture.get();
892         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
893 
894         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
895         request1.addHeader("password", "secret");
896         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
897                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
898                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
899         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
900         Assert.assertNotNull(result1);
901         final HttpResponse response1 = result1.getHead();
902         Assert.assertNotNull(response1);
903         Assert.assertEquals(200, response1.getCode());
904         Assert.assertNotNull("All is well", result1.getBody());
905 
906         Assert.assertTrue(ioSession.isOpen());
907 
908         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
909         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
910                 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
911                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
912         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
913         Assert.assertNotNull(result2);
914         final HttpResponse response2 = result2.getHead();
915         Assert.assertNotNull(response2);
916         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
917         Assert.assertNotNull("You shall not pass", result2.getBody());
918 
919         Assert.assertTrue(ioSession.isOpen());
920 
921         final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
922         request3.addHeader("password", "secret");
923         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
924                 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
925                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
926         final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
927         Assert.assertNotNull(result3);
928         final HttpResponse response3 = result3.getHead();
929         Assert.assertNotNull(response3);
930         Assert.assertEquals(200, response3.getCode());
931         Assert.assertNotNull("All is well", result3.getBody());
932 
933         Assert.assertTrue(ioSession.isOpen());
934 
935         final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
936         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
937                 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
938                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
939         final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
940         Assert.assertNotNull(result4);
941         final HttpResponse response4 = result4.getHead();
942         Assert.assertNotNull(response4);
943         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
944         Assert.assertNotNull("You shall not pass", result4.getBody());
945 
946         Assert.assertFalse(ioSession.isOpen());
947     }
948 
949     @Test
950     public void testExpectationFailedCloseConnection() throws Exception {
951         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
952 
953             @Override
954             public AsyncServerExchangeHandler get() {
955                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
956 
957                     @Override
958                     protected void handle(
959                             final Message<HttpRequest, String> request,
960                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
961                             final HttpContext context) throws IOException, HttpException {
962                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
963 
964                     }
965                 };
966             }
967 
968         });
969         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
970 
971             @Override
972             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
973 
974                 return new BasicAsyncServerExpectationDecorator(handler) {
975 
976                     @Override
977                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
978                         final Header h = request.getFirstHeader("password");
979                         if (h != null && "secret".equals(h.getValue())) {
980                             return null;
981                         } else {
982                             final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
983                             response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
984                             return new BasicResponseProducer(response, "You shall not pass");
985                         }
986                     }
987                 };
988 
989             }
990         }, Http1Config.DEFAULT);
991 
992         client.start();
993         final Future<IOSession> sessionFuture = client.requestSession(
994                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
995         final IOSession ioSession = sessionFuture.get();
996         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
997 
998         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
999         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1000                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1001                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1002                         100000,
1003                         ContentType.TEXT_PLAIN)),
1004                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1005         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1006         Assert.assertNotNull(result1);
1007         final HttpResponse response1 = result1.getHead();
1008         Assert.assertNotNull(response1);
1009         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1010         Assert.assertNotNull("You shall not pass", result1.getBody());
1011 
1012         Assert.assertFalse(streamEndpoint.isOpen());
1013     }
1014 
1015     @Test
1016     public void testDelayedExpectationVerification() throws Exception {
1017         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1018 
1019             @Override
1020             public AsyncServerExchangeHandler get() {
1021                 return new AsyncServerExchangeHandler() {
1022 
1023                     private final Random random = new Random(System.currentTimeMillis());
1024                     private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
1025                             "All is well");
1026 
1027                     @Override
1028                     public void handleRequest(
1029                             final HttpRequest request,
1030                             final EntityDetails entityDetails,
1031                             final ResponseChannel responseChannel,
1032                             final HttpContext context) throws HttpException, IOException {
1033 
1034                         Executors.newSingleThreadExecutor().execute(new Runnable() {
1035                             @Override
1036                             public void run() {
1037                                 try {
1038                                     if (entityDetails != null) {
1039                                         final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
1040                                         if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
1041                                             Thread.sleep(random.nextInt(1000));
1042                                             responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
1043                                         }
1044                                         final HttpResponse response = new BasicHttpResponse(200);
1045                                         synchronized (entityProducer) {
1046                                             responseChannel.sendResponse(response, entityProducer, context);
1047                                         }
1048                                     }
1049                                 } catch (final Exception ignore) {
1050                                     // ignore
1051                                 }
1052                             }
1053                         });
1054 
1055                     }
1056 
1057                     @Override
1058                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1059                         capacityChannel.update(Integer.MAX_VALUE);
1060                     }
1061 
1062                     @Override
1063                     public void consume(final ByteBuffer src) throws IOException {
1064                     }
1065 
1066                     @Override
1067                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1068                     }
1069 
1070                     @Override
1071                     public int available() {
1072                         synchronized (entityProducer) {
1073                             return entityProducer.available();
1074                         }
1075                     }
1076 
1077                     @Override
1078                     public void produce(final DataStreamChannel channel) throws IOException {
1079                         synchronized (entityProducer) {
1080                             entityProducer.produce(channel);
1081                         }
1082                     }
1083 
1084                     @Override
1085                     public void failed(final Exception cause) {
1086                     }
1087 
1088                     @Override
1089                     public void releaseResources() {
1090                     }
1091 
1092                 };
1093 
1094             }
1095         });
1096         final InetSocketAddress serverEndpoint = server.start();
1097 
1098         client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
1099         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1100                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1101         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1102 
1103         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1104         for (int i = 0; i < 5; i++) {
1105             queue.add(streamEndpoint.execute(
1106                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1107                             AsyncEntityProducers.create("Some important message")),
1108                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1109         }
1110         while (!queue.isEmpty()) {
1111             final Future<Message<HttpResponse, String>> future = queue.remove();
1112             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1113             Assert.assertNotNull(result);
1114             final HttpResponse response = result.getHead();
1115             Assert.assertNotNull(response);
1116             Assert.assertEquals(200, response.getCode());
1117             Assert.assertNotNull("All is well", result.getBody());
1118         }
1119     }
1120 
1121     @Test
1122     public void testPrematureResponse() throws Exception {
1123         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1124 
1125             @Override
1126             public AsyncServerExchangeHandler get() {
1127                 return new AsyncServerExchangeHandler() {
1128 
1129                     private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
1130 
1131                     @Override
1132                     public void handleRequest(
1133                             final HttpRequest request,
1134                             final EntityDetails entityDetails,
1135                             final ResponseChannel responseChannel,
1136                             final HttpContext context) throws HttpException, IOException {
1137                         final AsyncResponseProducer producer;
1138                         final Header h = request.getFirstHeader("password");
1139                         if (h != null && "secret".equals(h.getValue())) {
1140                             producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1141                         } else {
1142                             producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1143                         }
1144                         responseProducer.set(producer);
1145                         producer.sendResponse(responseChannel, context);
1146                     }
1147 
1148                     @Override
1149                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1150                         capacityChannel.update(Integer.MAX_VALUE);
1151                     }
1152 
1153                     @Override
1154                     public void consume(final ByteBuffer src) throws IOException {
1155                     }
1156 
1157                     @Override
1158                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1159                     }
1160 
1161                     @Override
1162                     public int available() {
1163                         final AsyncResponseProducer producer = responseProducer.get();
1164                         return producer.available();
1165                     }
1166 
1167                     @Override
1168                     public void produce(final DataStreamChannel channel) throws IOException {
1169                         final AsyncResponseProducer producer = responseProducer.get();
1170                         producer.produce(channel);
1171                     }
1172 
1173                     @Override
1174                     public void failed(final Exception cause) {
1175                     }
1176 
1177                     @Override
1178                     public void releaseResources() {
1179                     }
1180                 };
1181             }
1182 
1183         });
1184         final InetSocketAddress serverEndpoint = server.start();
1185 
1186         client.start();
1187         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1188                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1189         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1190 
1191         for (int i = 0; i < 3; i++) {
1192             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1193             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1194                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1195                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1196             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1197             Assert.assertNotNull(result1);
1198             final HttpResponse response1 = result1.getHead();
1199             Assert.assertNotNull(response1);
1200             Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1201             Assert.assertNotNull("You shall not pass", result1.getBody());
1202 
1203             Assert.assertTrue(streamEndpoint.isOpen());
1204         }
1205         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1206         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1207                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1208                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1209                         100000,
1210                         ContentType.TEXT_PLAIN)),
1211                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1212         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1213         Assert.assertNotNull(result1);
1214         final HttpResponse response1 = result1.getHead();
1215         Assert.assertNotNull(response1);
1216         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1217         Assert.assertNotNull("You shall not pass", result1.getBody());
1218 
1219         Assert.assertFalse(streamEndpoint.isOpen());
1220     }
1221 
1222     @Test
1223     public void testSlowResponseConsumer() throws Exception {
1224         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
1225 
1226             @Override
1227             public AsyncServerExchangeHandler get() {
1228                 return new MultiLineResponseHandler("0123456789abcd", 100);
1229             }
1230 
1231         });
1232         final InetSocketAddress serverEndpoint = server.start();
1233 
1234         client.start(Http1Config.custom().setBufferSize(256).build());
1235 
1236         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1237                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1238         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1239 
1240         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1241         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1242                 new BasicRequestProducer(request1, null),
1243                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1244 
1245                     @Override
1246                     protected String consumeData(
1247                             final ContentType contentType, final InputStream inputStream) throws IOException {
1248                         Charset charset = contentType != null ? contentType.getCharset() : null;
1249                         if (charset == null) {
1250                             charset = StandardCharsets.US_ASCII;
1251                         }
1252 
1253                         final StringBuilder buffer = new StringBuilder();
1254                         try {
1255                             final byte[] tmp = new byte[16];
1256                             int l;
1257                             while ((l = inputStream.read(tmp)) != -1) {
1258                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1259                                 Thread.sleep(50);
1260                             }
1261                         } catch (final InterruptedException ex) {
1262                             Thread.currentThread().interrupt();
1263                             throw new InterruptedIOException(ex.getMessage());
1264                         }
1265                         return buffer.toString();
1266                     }
1267                 }),
1268                 null);
1269 
1270         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1271         Assert.assertNotNull(result1);
1272         final HttpResponse response1 = result1.getHead();
1273         Assert.assertNotNull(response1);
1274         Assert.assertEquals(200, response1.getCode());
1275         final String s1 = result1.getBody();
1276         Assert.assertNotNull(s1);
1277         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1278         while (t1.hasMoreTokens()) {
1279             Assert.assertEquals("0123456789abcd", t1.nextToken());
1280         }
1281     }
1282 
1283     @Test
1284     public void testSlowRequestProducer() throws Exception {
1285         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1286 
1287             @Override
1288             public AsyncServerExchangeHandler get() {
1289                 return new EchoHandler(2048);
1290             }
1291 
1292         });
1293         final InetSocketAddress serverEndpoint = server.start();
1294 
1295         client.start();
1296         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1297                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1298         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1299 
1300         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1301         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1302                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1303 
1304                     @Override
1305                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1306                         Charset charset = contentType.getCharset();
1307                         if (charset == null) {
1308                             charset = StandardCharsets.US_ASCII;
1309                         }
1310                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1311                             for (int i = 0; i < 500; i++) {
1312                                 if (i % 100 == 0) {
1313                                     writer.flush();
1314                                     Thread.sleep(500);
1315                                 }
1316                                 writer.write("0123456789abcdef\r\n");
1317                             }
1318                         } catch (final InterruptedException ex) {
1319                             Thread.currentThread().interrupt();
1320                             throw new InterruptedIOException(ex.getMessage());
1321                         }
1322                     }
1323 
1324                 }),
1325                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1326         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1327         Assert.assertNotNull(result1);
1328         final HttpResponse response1 = result1.getHead();
1329         Assert.assertNotNull(response1);
1330         Assert.assertEquals(200, response1.getCode());
1331         final String s1 = result1.getBody();
1332         Assert.assertNotNull(s1);
1333         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1334         while (t1.hasMoreTokens()) {
1335             Assert.assertEquals("0123456789abcdef", t1.nextToken());
1336         }
1337     }
1338 
1339     @Test
1340     public void testSlowResponseProducer() throws Exception {
1341         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1342 
1343             @Override
1344             public AsyncServerExchangeHandler get() {
1345                 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1346 
1347                     @Override
1348                     protected void handle(
1349                             final HttpRequest request,
1350                             final InputStream requestStream,
1351                             final HttpResponse response,
1352                             final OutputStream responseStream,
1353                             final HttpContext context) throws IOException, HttpException {
1354 
1355                         if (!"/hello".equals(request.getPath())) {
1356                             response.setCode(HttpStatus.SC_NOT_FOUND);
1357                             return;
1358                         }
1359                         if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1360                             response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1361                             return;
1362                         }
1363                         if (requestStream == null) {
1364                             return;
1365                         }
1366                         final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1367                         final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1368                         Charset charset = contentType != null ? contentType.getCharset() : null;
1369                         if (charset == null) {
1370                             charset = StandardCharsets.US_ASCII;
1371                         }
1372                         response.setCode(HttpStatus.SC_OK);
1373                         response.setHeader(h1);
1374                         try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1375                              final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1376                             try {
1377                                 String l;
1378                                 int count = 0;
1379                                 while ((l = reader.readLine()) != null) {
1380                                     writer.write(l);
1381                                     writer.write("\r\n");
1382                                     count++;
1383                                     if (count % 500 == 0) {
1384                                         Thread.sleep(500);
1385                                     }
1386                                 }
1387                                 writer.flush();
1388                             } catch (final InterruptedException ex) {
1389                                 Thread.currentThread().interrupt();
1390                                 throw new InterruptedIOException(ex.getMessage());
1391                             }
1392                         }
1393                     }
1394                 };
1395             }
1396 
1397         });
1398         final InetSocketAddress serverEndpoint = server.start();
1399 
1400         client.start(Http1Config.custom().setBufferSize(256).build());
1401 
1402         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1403                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1404         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1405 
1406         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1407         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1408                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1409                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1410         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1411         Assert.assertNotNull(result1);
1412         final HttpResponse response1 = result1.getHead();
1413         Assert.assertNotNull(response1);
1414         Assert.assertEquals(200, response1.getCode());
1415         final String s1 = result1.getBody();
1416         Assert.assertNotNull(s1);
1417         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1418         while (t1.hasMoreTokens()) {
1419             Assert.assertEquals("0123456789abcd", t1.nextToken());
1420         }
1421     }
1422 
1423     @Test
1424     public void testPipelinedConnectionClose() throws Exception {
1425         server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1426 
1427             @Override
1428             public AsyncServerExchangeHandler get() {
1429                 return new SingleLineResponseHandler("Hi back");
1430             }
1431 
1432         });
1433         final InetSocketAddress serverEndpoint = server.start();
1434 
1435         client.start();
1436         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1437                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1438         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1439 
1440         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1441                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1442                         AsyncEntityProducers.create("Hi there")),
1443                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1444         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1445         request2.addHeader(HttpHeaders.CONNECTION, "close");
1446         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1447                 new BasicRequestProducer(request2,
1448                         AsyncEntityProducers.create("Hi there")),
1449                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1450         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1451                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1452                         AsyncEntityProducers.create("Hi there")),
1453                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1454 
1455         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1456         Assert.assertNotNull(result1);
1457         final HttpResponse response1 = result1.getHead();
1458         final String entity1 = result1.getBody();
1459         Assert.assertNotNull(response1);
1460         Assert.assertEquals(200, response1.getCode());
1461         Assert.assertEquals("Hi back", entity1);
1462 
1463         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1464         Assert.assertNotNull(result2);
1465         final HttpResponse response2 = result2.getHead();
1466         final String entity2 = result2.getBody();
1467         Assert.assertNotNull(response2);
1468         Assert.assertEquals(200, response2.getCode());
1469         Assert.assertEquals("Hi back", entity2);
1470 
1471         try {
1472             future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1473             Assert.fail("ExecutionException expected");
1474         } catch (final CancellationException | ExecutionException ignore) {
1475         }
1476 
1477         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1478                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1479                         AsyncEntityProducers.create("Hi there")),
1480                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1481         try {
1482             future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1483             Assert.fail("CancellationException or ExecutionException expected");
1484         } catch (final CancellationException ignore) {
1485             Assert.assertTrue(future4.isCancelled());
1486         } catch (final ExecutionException ignore) {
1487         }
1488     }
1489 
1490     @Test
1491     public void testPipelinedInvalidRequest() throws Exception {
1492         server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1493 
1494             @Override
1495             public AsyncServerExchangeHandler get() {
1496                 return new SingleLineResponseHandler("Hi back");
1497             }
1498 
1499         });
1500         final InetSocketAddress serverEndpoint = server.start();
1501 
1502         client.start();
1503         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1504                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1505         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1506 
1507         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1508                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1509                         AsyncEntityProducers.create("Hi there")),
1510                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1511         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1512         request2.addHeader(HttpHeaders.HOST, "blah:blah");
1513         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1514                 new BasicRequestProducer(request2,
1515                         AsyncEntityProducers.create("Hi there")),
1516                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1517         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1518                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1519                         AsyncEntityProducers.create("Hi there")),
1520                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1521 
1522         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1523         Assert.assertNotNull(result1);
1524         final HttpResponse response1 = result1.getHead();
1525         final String entity1 = result1.getBody();
1526         Assert.assertNotNull(response1);
1527         Assert.assertEquals(200, response1.getCode());
1528         Assert.assertEquals("Hi back", entity1);
1529 
1530         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1531         Assert.assertNotNull(result2);
1532         final HttpResponse response2 = result2.getHead();
1533         final String entity2 = result2.getBody();
1534         Assert.assertNotNull(response2);
1535         Assert.assertEquals(400, response2.getCode());
1536         Assert.assertTrue(entity2.length() > 0);
1537 
1538         try {
1539             future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1540             Assert.fail("ExecutionException expected");
1541         } catch (final CancellationException | ExecutionException ignore) {
1542         }
1543     }
1544 
1545     private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1546 
1547     private static class BrokenChunkEncoder extends AbstractContentEncoder {
1548 
1549         private final CharArrayBuffer lineBuffer;
1550         private boolean done;
1551 
1552         BrokenChunkEncoder(
1553                 final WritableByteChannel channel,
1554                 final SessionOutputBuffer buffer,
1555                 final BasicHttpTransportMetrics metrics) {
1556             super(channel, buffer, metrics);
1557             lineBuffer = new CharArrayBuffer(16);
1558         }
1559 
1560         @Override
1561         public void complete(final List<? extends Header> trailers) throws IOException {
1562             super.complete(trailers);
1563         }
1564 
1565         @Override
1566         public int write(final ByteBuffer src) throws IOException {
1567             final int chunk;
1568             if (!done) {
1569                 lineBuffer.clear();
1570                 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1571                 buffer().writeLine(lineBuffer);
1572                 buffer().write(ByteBuffer.wrap(GARBAGE));
1573                 done = true;
1574                 chunk = GARBAGE.length;
1575             } else {
1576                 chunk = 0;
1577             }
1578             final long bytesWritten = buffer().flush(channel());
1579             if (bytesWritten > 0) {
1580                 metrics().incrementBytesTransferred(bytesWritten);
1581             }
1582             if (!buffer().hasData()) {
1583                 channel().close();
1584             }
1585             return chunk;
1586         }
1587 
1588     }
1589 
1590     @Test
1591     public void testTruncatedChunk() throws Exception {
1592         final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1593                 HttpProcessors.server(),
1594                 new HandlerFactory<AsyncServerExchangeHandler>() {
1595 
1596                     @Override
1597                     public AsyncServerExchangeHandler create(
1598                             final HttpRequest request,
1599                             final HttpContext context) throws HttpException {
1600                         return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1601 
1602                             @Override
1603                             protected void handle(
1604                                     final Message<HttpRequest, String> request,
1605                                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1606                                     final HttpContext context) throws IOException, HttpException {
1607                                 responseTrigger.submitResponse(
1608                                         new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1609                             }
1610 
1611                         };
1612                     }
1613 
1614                 },
1615                 Http1Config.DEFAULT,
1616                 CharCodingConfig.DEFAULT,
1617                 DefaultConnectionReuseStrategy.INSTANCE,
1618                 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1619 
1620             @Override
1621             protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1622                     final ProtocolIOSession ioSession,
1623                     final HttpProcessor httpProcessor,
1624                     final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1625                     final Http1Config http1Config,
1626                     final CharCodingConfig connectionConfig,
1627                     final ConnectionReuseStrategy connectionReuseStrategy,
1628                     final NHttpMessageParser<HttpRequest> incomingMessageParser,
1629                     final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1630                     final ContentLengthStrategy incomingContentStrategy,
1631                     final ContentLengthStrategy outgoingContentStrategy,
1632                     final Http1StreamListener streamListener) {
1633                 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1634                         scheme.id,
1635                         http1Config, connectionConfig, connectionReuseStrategy,
1636                         incomingMessageParser, outgoingMessageWriter,
1637                         incomingContentStrategy, outgoingContentStrategy,
1638                         streamListener) {
1639 
1640                     @Override
1641                     protected ContentEncoder createContentEncoder(
1642                             final long len,
1643                             final WritableByteChannel channel,
1644                             final SessionOutputBuffer buffer,
1645                             final BasicHttpTransportMetrics metrics) throws HttpException {
1646                         if (len == ContentLengthStrategy.CHUNKED) {
1647                             return new BrokenChunkEncoder(channel, buffer, metrics);
1648                         } else {
1649                             return super.createContentEncoder(len, channel, buffer, metrics);
1650                         }
1651                     }
1652 
1653                 };
1654             }
1655 
1656         });
1657 
1658         client.start();
1659         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1660                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1661         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1662 
1663         final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1664         final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1665 
1666             @Override
1667             public void releaseResources() {
1668                 // Do not clear internal content buffer
1669             }
1670 
1671         };
1672         final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1673         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1674         try {
1675             future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1676             Assert.fail("ExecutionException should have been thrown");
1677         } catch (final ExecutionException ex) {
1678             final Throwable cause = ex.getCause();
1679             Assert.assertTrue(cause instanceof MalformedChunkCodingException);
1680             Assert.assertEquals("garbage", entityConsumer.generateContent());
1681         }
1682     }
1683 
1684     @Test
1685     public void testExceptionInHandler() throws Exception {
1686         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1687 
1688             @Override
1689             public AsyncServerExchangeHandler get() {
1690                 return new SingleLineResponseHandler("Hi there") {
1691 
1692                     @Override
1693                     protected void handle(
1694                             final Message<HttpRequest, String> request,
1695                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1696                             final HttpContext context) throws IOException, HttpException {
1697                         throw new HttpException("Boom");
1698                     }
1699                 };
1700             }
1701 
1702         });
1703         final InetSocketAddress serverEndpoint = server.start();
1704 
1705         client.start();
1706         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1707                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1708         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1709 
1710         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1711                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1712                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1713         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1714         Assert.assertNotNull(result);
1715         final HttpResponse response1 = result.getHead();
1716         final String entity1 = result.getBody();
1717         Assert.assertNotNull(response1);
1718         Assert.assertEquals(500, response1.getCode());
1719         Assert.assertEquals("Boom", entity1);
1720     }
1721 
1722     @Test
1723     public void testNoServiceHandler() throws Exception {
1724         final InetSocketAddress serverEndpoint = server.start();
1725 
1726         client.start();
1727         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1728                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1729         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1730 
1731         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1732                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1733                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1734         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1735         Assert.assertNotNull(result);
1736         final HttpResponse response1 = result.getHead();
1737         final String entity1 = result.getBody();
1738         Assert.assertNotNull(response1);
1739         Assert.assertEquals(404, response1.getCode());
1740         Assert.assertEquals("Resource not found", entity1);
1741     }
1742 
1743     @Test
1744     public void testResponseNoContent() throws Exception {
1745         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1746 
1747             @Override
1748             public AsyncServerExchangeHandler get() {
1749                 return new SingleLineResponseHandler("Hi there") {
1750 
1751                     @Override
1752                     protected void handle(
1753                             final Message<HttpRequest, String> request,
1754                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1755                             final HttpContext context) throws IOException, HttpException {
1756                         final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1757                         responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1758                     }
1759                 };
1760             }
1761 
1762         });
1763         final InetSocketAddress serverEndpoint = server.start();
1764 
1765         client.start();
1766         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1767                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1768         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1769 
1770         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1771                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1772                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1773         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1774         Assert.assertNotNull(result);
1775         final HttpResponse response1 = result.getHead();
1776         Assert.assertNotNull(response1);
1777         Assert.assertEquals(204, response1.getCode());
1778         Assert.assertNull(result.getBody());
1779     }
1780 
1781     @Test
1782     public void testAbsentHostHeader() throws Exception {
1783         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1784 
1785             @Override
1786             public AsyncServerExchangeHandler get() {
1787                 return new SingleLineResponseHandler("Hi there");
1788             }
1789 
1790         });
1791         final InetSocketAddress serverEndpoint = server.start();
1792 
1793         client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()), Http1Config.DEFAULT);
1794 
1795         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1796                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1797         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1798 
1799         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1800         request1.setVersion(HttpVersion.HTTP_1_0);
1801         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1802                 new BasicRequestProducer(request1, null),
1803                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1804         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1805         Assert.assertNotNull(result1);
1806         final HttpResponse response1 = result1.getHead();
1807         Assert.assertNotNull(response1);
1808         Assert.assertEquals(200, response1.getCode());
1809         Assert.assertEquals("Hi there", result1.getBody());
1810 
1811         final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1812         request2.setVersion(HttpVersion.HTTP_1_1);
1813         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1814                 new BasicRequestProducer(request2, null),
1815                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1816         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1817         Assert.assertNotNull(result2);
1818         final HttpResponse response2 = result2.getHead();
1819         Assert.assertNotNull(response2);
1820         Assert.assertEquals(400, response2.getCode());
1821         Assert.assertEquals("Host header is absent", result2.getBody());
1822     }
1823 
1824     @Test
1825     public void testMessageWithTrailers() throws Exception {
1826         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1827 
1828             @Override
1829             public AsyncServerExchangeHandler get() {
1830                 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1831 
1832                     @Override
1833                     protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1834                             final HttpRequest request,
1835                             final EntityDetails entityDetails,
1836                             final HttpContext context) throws HttpException {
1837                         return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1838                     }
1839 
1840                     @Override
1841                     protected void handle(
1842                             final Message<HttpRequest, String> requestMessage,
1843                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1844                             final HttpContext context) throws HttpException, IOException {
1845                         responseTrigger.submitResponse(new BasicResponseProducer(
1846                                 HttpStatus.SC_OK,
1847                                 new DigestingEntityProducer("MD5",
1848                                         new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1849                     }
1850                 };
1851             }
1852 
1853         });
1854         final InetSocketAddress serverEndpoint = server.start();
1855 
1856         client.start();
1857 
1858         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1859                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1860         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1861 
1862         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1863         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1864         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1865                 new BasicRequestProducer(request1, null),
1866                 new BasicResponseConsumer<>(entityConsumer), null);
1867         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1868         Assert.assertNotNull(result1);
1869         final HttpResponse response1 = result1.getHead();
1870         Assert.assertNotNull(response1);
1871         Assert.assertEquals(200, response1.getCode());
1872         Assert.assertEquals("Hello back with some trailers", result1.getBody());
1873 
1874         final List<Header> trailers = entityConsumer.getTrailers();
1875         Assert.assertNotNull(trailers);
1876         Assert.assertEquals(2, trailers.size());
1877         final Map<String, String> map = new HashMap<>();
1878         for (final Header header: trailers) {
1879             map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
1880         }
1881         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1882         Assert.assertEquals("MD5", map.get("digest-algo"));
1883         Assert.assertEquals(digest, map.get("digest"));
1884     }
1885 
1886     @Test
1887     public void testProtocolException() throws Exception {
1888         server.register("/boom", new Supplier<AsyncServerExchangeHandler>() {
1889 
1890             @Override
1891             public AsyncServerExchangeHandler get() {
1892                 return new AsyncServerExchangeHandler() {
1893 
1894                     private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1895 
1896                     @Override
1897                     public void releaseResources() {
1898                         entityProducer.releaseResources();
1899                     }
1900 
1901                     @Override
1902                     public void handleRequest(
1903                             final HttpRequest request,
1904                             final EntityDetails entityDetails,
1905                             final ResponseChannel responseChannel,
1906                             final HttpContext context) throws HttpException, IOException {
1907                         final String requestUri = request.getRequestUri();
1908                         if (requestUri.endsWith("boom")) {
1909                             throw new ProtocolException("Boom!!!");
1910                         }
1911                         responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1912                     }
1913 
1914                     @Override
1915                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1916                         capacityChannel.update(Integer.MAX_VALUE);
1917                     }
1918 
1919                     @Override
1920                     public void consume(final ByteBuffer src) throws IOException {
1921                     }
1922 
1923                     @Override
1924                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1925                         // empty
1926                     }
1927 
1928                     @Override
1929                     public int available() {
1930                         return entityProducer.available();
1931                     }
1932 
1933                     @Override
1934                     public void produce(final DataStreamChannel channel) throws IOException {
1935                         entityProducer.produce(channel);
1936                     }
1937 
1938                     @Override
1939                     public void failed(final Exception cause) {
1940                         releaseResources();
1941                     }
1942 
1943                 };
1944             }
1945 
1946         });
1947 
1948         final InetSocketAddress serverEndpoint = server.start();
1949 
1950         client.start();
1951         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1952                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1953         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1954         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1955                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1956                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1957         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1958         Assert.assertNotNull(result);
1959         final HttpResponse response1 = result.getHead();
1960         final String entity1 = result.getBody();
1961         Assert.assertNotNull(response1);
1962         Assert.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1963         Assert.assertEquals("Boom!!!", entity1);
1964     }
1965 
1966     @Test
1967     public void testHeaderTooLarge() throws Exception {
1968         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1969 
1970             @Override
1971             public AsyncServerExchangeHandler get() {
1972                 return new SingleLineResponseHandler("Hi there");
1973             }
1974 
1975         });
1976         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1977                 .setMaxLineLength(100)
1978                 .build());
1979         client.start();
1980 
1981         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1982                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1983         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1984 
1985         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1986         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1987                 "1234567890123456789012345678901234567890");
1988         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1989                 new BasicRequestProducer(request1, null),
1990                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1991         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1992         Assert.assertNotNull(result1);
1993         final HttpResponse response1 = result1.getHead();
1994         Assert.assertNotNull(response1);
1995         Assert.assertEquals(431, response1.getCode());
1996         Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1997     }
1998 
1999     @Test
2000     public void testHeaderTooLargePost() throws Exception {
2001         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
2002 
2003             @Override
2004             public AsyncServerExchangeHandler get() {
2005                 return new SingleLineResponseHandler("Hi there");
2006             }
2007 
2008         });
2009         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
2010                 .setMaxLineLength(100)
2011                 .build());
2012         client.start(
2013                 new DefaultHttpProcessor(new RequestContent(), new RequestTargetHost(), new RequestConnControl()), null);
2014 
2015         final Future<ClientSessionEndpoint> connectFuture = client.connect(
2016                 "localhost", serverEndpoint.getPort(), TIMEOUT);
2017         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
2018 
2019         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
2020         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
2021                 "1234567890123456789012345678901234567890");
2022 
2023         final byte[] b = new byte[2048];
2024         for (int i = 0; i < b.length; i++) {
2025             b[i] = (byte) ('a' + i % 10);
2026         }
2027 
2028         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
2029                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
2030                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
2031         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
2032         Assert.assertNotNull(result1);
2033         final HttpResponse response1 = result1.getHead();
2034         Assert.assertNotNull(response1);
2035         Assert.assertEquals(431, response1.getCode());
2036         Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
2037     }
2038 
2039 }