View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.BufferedReader;
31  import java.io.BufferedWriter;
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.io.InputStreamReader;
35  import java.io.InterruptedIOException;
36  import java.io.OutputStream;
37  import java.io.OutputStreamWriter;
38  import java.net.InetSocketAddress;
39  import java.net.URI;
40  import java.net.URISyntaxException;
41  import java.nio.ByteBuffer;
42  import java.nio.channels.WritableByteChannel;
43  import java.nio.charset.Charset;
44  import java.nio.charset.StandardCharsets;
45  import java.util.Arrays;
46  import java.util.Collection;
47  import java.util.HashMap;
48  import java.util.LinkedList;
49  import java.util.List;
50  import java.util.Locale;
51  import java.util.Map;
52  import java.util.Queue;
53  import java.util.Random;
54  import java.util.StringTokenizer;
55  import java.util.concurrent.CancellationException;
56  import java.util.concurrent.ExecutionException;
57  import java.util.concurrent.Executors;
58  import java.util.concurrent.Future;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  import org.apache.hc.core5.function.Decorator;
62  import org.apache.hc.core5.function.Supplier;
63  import org.apache.hc.core5.http.ConnectionReuseStrategy;
64  import org.apache.hc.core5.http.ContentLengthStrategy;
65  import org.apache.hc.core5.http.ContentType;
66  import org.apache.hc.core5.http.EntityDetails;
67  import org.apache.hc.core5.http.Header;
68  import org.apache.hc.core5.http.HeaderElements;
69  import org.apache.hc.core5.http.HttpException;
70  import org.apache.hc.core5.http.HttpHeaders;
71  import org.apache.hc.core5.http.HttpHost;
72  import org.apache.hc.core5.http.HttpRequest;
73  import org.apache.hc.core5.http.HttpResponse;
74  import org.apache.hc.core5.http.HttpStatus;
75  import org.apache.hc.core5.http.HttpVersion;
76  import org.apache.hc.core5.http.MalformedChunkCodingException;
77  import org.apache.hc.core5.http.Message;
78  import org.apache.hc.core5.http.Method;
79  import org.apache.hc.core5.http.ProtocolException;
80  import org.apache.hc.core5.http.URIScheme;
81  import org.apache.hc.core5.http.config.CharCodingConfig;
82  import org.apache.hc.core5.http.config.Http1Config;
83  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
84  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
85  import org.apache.hc.core5.http.impl.Http1StreamListener;
86  import org.apache.hc.core5.http.impl.HttpProcessors;
87  import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
88  import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
89  import org.apache.hc.core5.http.message.BasicHttpRequest;
90  import org.apache.hc.core5.http.message.BasicHttpResponse;
91  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
92  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
93  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
94  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
95  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
96  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
97  import org.apache.hc.core5.http.nio.CapacityChannel;
98  import org.apache.hc.core5.http.nio.ContentEncoder;
99  import org.apache.hc.core5.http.nio.DataStreamChannel;
100 import org.apache.hc.core5.http.nio.HandlerFactory;
101 import org.apache.hc.core5.http.nio.NHttpMessageParser;
102 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
103 import org.apache.hc.core5.http.nio.ResponseChannel;
104 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
105 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
107 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
109 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
110 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
111 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
112 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
113 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
114 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
116 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
120 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
121 import org.apache.hc.core5.http.protocol.HttpContext;
122 import org.apache.hc.core5.http.protocol.HttpProcessor;
123 import org.apache.hc.core5.http.protocol.RequestConnControl;
124 import org.apache.hc.core5.http.protocol.RequestContent;
125 import org.apache.hc.core5.http.protocol.RequestTargetHost;
126 import org.apache.hc.core5.http.protocol.RequestValidateHost;
127 import org.apache.hc.core5.reactor.IOReactorConfig;
128 import org.apache.hc.core5.reactor.IOSession;
129 import org.apache.hc.core5.reactor.ProtocolIOSession;
130 import org.apache.hc.core5.testing.SSLTestContexts;
131 import org.apache.hc.core5.util.CharArrayBuffer;
132 import org.apache.hc.core5.util.TextUtils;
133 import org.apache.hc.core5.util.TimeValue;
134 import org.apache.hc.core5.util.Timeout;
135 import org.junit.After;
136 import org.junit.Assert;
137 import org.junit.Before;
138 import org.junit.Test;
139 import org.junit.runner.RunWith;
140 import org.junit.runners.Parameterized;
141 import org.slf4j.Logger;
142 import org.slf4j.LoggerFactory;
143 
144 @RunWith(Parameterized.class)
145 public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
146 
147     private final Logger log = LoggerFactory.getLogger(getClass());
148 
149     @Parameterized.Parameters(name = "{0}")
150     public static Collection<Object[]> protocols() {
151         return Arrays.asList(new Object[][]{
152                 { URIScheme.HTTP },
153                 { URIScheme.HTTPS }
154         });
155     }
156 
157     public Http1IntegrationTest(final URIScheme scheme) {
158         super(scheme);
159     }
160 
161     private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
162     private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
163 
164     private Http1TestClient client;
165 
166     @Before
167     public void setup() throws Exception {
168         log.debug("Starting up test client");
169         client = new Http1TestClient(
170                 buildReactorConfig(),
171                 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
172     }
173 
174     protected IOReactorConfig buildReactorConfig() {
175         return IOReactorConfig.DEFAULT;
176     }
177 
178     @After
179     public void cleanup() throws Exception {
180         log.debug("Shutting down test client");
181         if (client != null) {
182             client.shutdown(TimeValue.ofSeconds(5));
183         }
184     }
185 
186     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
187         try {
188             return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
189         } catch (final URISyntaxException e) {
190             throw new IllegalStateException();
191         }
192     }
193 
194     @Test
195     public void testSimpleGet() throws Exception {
196         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
197 
198             @Override
199             public AsyncServerExchangeHandler get() {
200                 return new SingleLineResponseHandler("Hi there");
201             }
202 
203         });
204         final InetSocketAddress serverEndpoint = server.start();
205 
206         client.start();
207         final Future<ClientSessionEndpoint> connectFuture = client.connect(
208                 "localhost", serverEndpoint.getPort(), TIMEOUT);
209         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
210 
211         for (int i = 0; i < 5; i++) {
212             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
213                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
214                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
215             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
216             Assert.assertNotNull(result);
217             final HttpResponse response1 = result.getHead();
218             final String entity1 = result.getBody();
219             Assert.assertNotNull(response1);
220             Assert.assertEquals(200, response1.getCode());
221             Assert.assertEquals("Hi there", entity1);
222         }
223     }
224 
225     @Test
226     public void testSimpleGetConnectionClose() throws Exception {
227         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
228 
229             @Override
230             public AsyncServerExchangeHandler get() {
231                 return new SingleLineResponseHandler("Hi there");
232             }
233 
234         });
235         final InetSocketAddress serverEndpoint = server.start();
236 
237         client.start();
238         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
239         for (int i = 0; i < 5; i++) {
240             final Future<ClientSessionEndpoint> connectFuture = client.connect(
241                     "localhost", serverEndpoint.getPort(), TIMEOUT);
242             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
243                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
244                         AsyncRequestBuilder.get(requestURI)
245                                 .addHeader(HttpHeaders.CONNECTION, "close")
246                                 .build(),
247                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
248                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
249                 Assert.assertNotNull(result);
250                 final HttpResponse response1 = result.getHead();
251                 final String entity1 = result.getBody();
252                 Assert.assertNotNull(response1);
253                 Assert.assertEquals(200, response1.getCode());
254                 Assert.assertEquals("Hi there", entity1);
255             }
256         }
257     }
258 
259     @Test
260     public void testSimpleGetIdentityTransfer() throws Exception {
261         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
262 
263             @Override
264             public AsyncServerExchangeHandler get() {
265                 return new SingleLineResponseHandler("Hi there");
266             }
267 
268         });
269         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
270         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
271 
272         client.start();
273         final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
274         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
275 
276         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
277                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
278                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
279         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
280         Assert.assertNotNull(result);
281         final HttpResponse response = result.getHead();
282         final String entity = result.getBody();
283         Assert.assertNotNull(response);
284         Assert.assertEquals(200, response.getCode());
285         Assert.assertEquals("Hi there", entity);
286     }
287 
288     @Test
289     public void testSimpleGetsPipelined() throws Exception {
290         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
291 
292             @Override
293             public AsyncServerExchangeHandler get() {
294                 return new SingleLineResponseHandler("Hi there");
295             }
296 
297         });
298         final InetSocketAddress serverEndpoint = server.start();
299 
300         client.start();
301         final Future<ClientSessionEndpoint> connectFuture = client.connect(
302                 "localhost", serverEndpoint.getPort(), TIMEOUT);
303         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
304 
305         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
306         for (int i = 0; i < 5; i++) {
307             queue.add(streamEndpoint.execute(
308                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
309                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
310         }
311         while (!queue.isEmpty()) {
312             final Future<Message<HttpResponse, String>> future = queue.remove();
313             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
314             Assert.assertNotNull(result);
315             final HttpResponse response = result.getHead();
316             final String entity = result.getBody();
317             Assert.assertNotNull(response);
318             Assert.assertEquals(200, response.getCode());
319             Assert.assertEquals("Hi there", entity);
320         }
321     }
322 
323     @Test
324     public void testLargeGet() throws Exception {
325         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
326 
327             @Override
328             public AsyncServerExchangeHandler get() {
329                 return new MultiLineResponseHandler("0123456789abcdef", 5000);
330             }
331 
332         });
333         final InetSocketAddress serverEndpoint = server.start();
334 
335         client.start();
336         final Future<ClientSessionEndpoint> connectFuture = client.connect(
337                 "localhost", serverEndpoint.getPort(), TIMEOUT);
338         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
339 
340         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
341                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
342                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
343 
344         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
345         Assert.assertNotNull(result1);
346         final HttpResponse response1 = result1.getHead();
347         Assert.assertNotNull(response1);
348         Assert.assertEquals(200, response1.getCode());
349         final String s1 = result1.getBody();
350         Assert.assertNotNull(s1);
351         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
352         while (t1.hasMoreTokens()) {
353             Assert.assertEquals("0123456789abcdef", t1.nextToken());
354         }
355 
356         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
357                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
358                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
359 
360         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
361         Assert.assertNotNull(result2);
362         final HttpResponse response2 = result2.getHead();
363         Assert.assertNotNull(response2);
364         Assert.assertEquals(200, response2.getCode());
365         final String s2 = result2.getBody();
366         Assert.assertNotNull(s2);
367         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
368         while (t2.hasMoreTokens()) {
369             Assert.assertEquals("0123456789abcdef", t2.nextToken());
370         }
371     }
372 
373     @Test
374     public void testLargeGetsPipelined() throws Exception {
375         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
376 
377             @Override
378             public AsyncServerExchangeHandler get() {
379                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
380             }
381 
382         });
383         final InetSocketAddress serverEndpoint = server.start();
384 
385         client.start();
386         final Future<ClientSessionEndpoint> connectFuture = client.connect(
387                 "localhost", serverEndpoint.getPort(), TIMEOUT);
388         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
389 
390         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
391         for (int i = 0; i < 5; i++) {
392             queue.add(streamEndpoint.execute(
393                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
394                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
395         }
396         while (!queue.isEmpty()) {
397             final Future<Message<HttpResponse, String>> future = queue.remove();
398             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
399             Assert.assertNotNull(result);
400             final HttpResponse response = result.getHead();
401             Assert.assertNotNull(response);
402             Assert.assertEquals(200, response.getCode());
403             final String entity = result.getBody();
404             Assert.assertNotNull(entity);
405             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
406             while (t.hasMoreTokens()) {
407                 Assert.assertEquals("0123456789abcdef", t.nextToken());
408             }
409         }
410     }
411 
412     @Test
413     public void testBasicPost() throws Exception {
414         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
415 
416             @Override
417             public AsyncServerExchangeHandler get() {
418                 return new SingleLineResponseHandler("Hi back");
419             }
420 
421         });
422         final InetSocketAddress serverEndpoint = server.start();
423 
424         client.start();
425         final Future<ClientSessionEndpoint> connectFuture = client.connect(
426                 "localhost", serverEndpoint.getPort(), TIMEOUT);
427         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
428 
429         for (int i = 0; i < 5; i++) {
430             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
431                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
432                             AsyncEntityProducers.create("Hi there")),
433                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
434             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
435             Assert.assertNotNull(result);
436             final HttpResponse response1 = result.getHead();
437             final String entity1 = result.getBody();
438             Assert.assertNotNull(response1);
439             Assert.assertEquals(200, response1.getCode());
440             Assert.assertEquals("Hi back", entity1);
441         }
442     }
443 
444     @Test
445     public void testBasicPostPipelined() throws Exception {
446         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
447 
448             @Override
449             public AsyncServerExchangeHandler get() {
450                 return new SingleLineResponseHandler("Hi back");
451             }
452 
453         });
454         final InetSocketAddress serverEndpoint = server.start();
455 
456         client.start();
457         final Future<ClientSessionEndpoint> connectFuture = client.connect(
458                 "localhost", serverEndpoint.getPort(), TIMEOUT);
459         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
460 
461         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
462         for (int i = 0; i < 5; i++) {
463             queue.add(streamEndpoint.execute(
464                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
465                             AsyncEntityProducers.create("Hi there")),
466                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
467         }
468         while (!queue.isEmpty()) {
469             final Future<Message<HttpResponse, String>> future = queue.remove();
470             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
471             Assert.assertNotNull(result);
472             final HttpResponse response = result.getHead();
473             final String entity = result.getBody();
474             Assert.assertNotNull(response);
475             Assert.assertEquals(200, response.getCode());
476             Assert.assertEquals("Hi back", entity);
477         }
478     }
479 
480     @Test
481     public void testHttp10Post() throws Exception {
482         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
483 
484             @Override
485             public AsyncServerExchangeHandler get() {
486                 return new SingleLineResponseHandler("Hi back");
487             }
488 
489         });
490         final InetSocketAddress serverEndpoint = server.start();
491 
492         client.start();
493         final Future<ClientSessionEndpoint> connectFuture = client.connect(
494                 "localhost", serverEndpoint.getPort(), TIMEOUT);
495         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
496 
497         for (int i = 0; i < 5; i++) {
498             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
499             request.setVersion(HttpVersion.HTTP_1_0);
500             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
501                     new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
502                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
503             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
504             Assert.assertNotNull(result);
505             final HttpResponse response1 = result.getHead();
506             final String entity1 = result.getBody();
507             Assert.assertNotNull(response1);
508             Assert.assertEquals(200, response1.getCode());
509             Assert.assertEquals("Hi back", entity1);
510         }
511     }
512 
513     @Test
514     public void testNoEntityPost() throws Exception {
515         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
516 
517             @Override
518             public AsyncServerExchangeHandler get() {
519                 return new SingleLineResponseHandler("Hi back");
520             }
521 
522         });
523         final InetSocketAddress serverEndpoint = server.start();
524 
525         client.start();
526         final Future<ClientSessionEndpoint> connectFuture = client.connect(
527                 "localhost", serverEndpoint.getPort(), TIMEOUT);
528         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
529 
530         for (int i = 0; i < 5; i++) {
531             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
532             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
533                     new BasicRequestProducer(request, null),
534                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
535             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
536             Assert.assertNotNull(result);
537             final HttpResponse response1 = result.getHead();
538             final String entity1 = result.getBody();
539             Assert.assertNotNull(response1);
540             Assert.assertEquals(200, response1.getCode());
541             Assert.assertEquals("Hi back", entity1);
542         }
543     }
544 
545     @Test
546     public void testLargePost() throws Exception {
547         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
548 
549             @Override
550             public AsyncServerExchangeHandler get() {
551                 return new EchoHandler(2048);
552             }
553 
554         });
555         final InetSocketAddress serverEndpoint = server.start();
556 
557         client.start();
558         final Future<ClientSessionEndpoint> connectFuture = client.connect(
559                 "localhost", serverEndpoint.getPort(), TIMEOUT);
560         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
561 
562         for (int i = 0; i < 5; i++) {
563             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
564                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
565                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
566                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
567             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
568             Assert.assertNotNull(result);
569             final HttpResponse response = result.getHead();
570             Assert.assertNotNull(response);
571             Assert.assertEquals(200, response.getCode());
572             final String entity = result.getBody();
573             Assert.assertNotNull(entity);
574             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
575             while (t.hasMoreTokens()) {
576                 Assert.assertEquals("0123456789abcdef", t.nextToken());
577             }
578         }
579     }
580 
581     @Test
582     public void testPostsPipelinedLargeResponse() throws Exception {
583         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
584 
585             @Override
586             public AsyncServerExchangeHandler get() {
587                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
588             }
589 
590         });
591         final InetSocketAddress serverEndpoint = server.start();
592 
593         client.start();
594         final Future<ClientSessionEndpoint> connectFuture = client.connect(
595                 "localhost", serverEndpoint.getPort(), TIMEOUT);
596         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
597 
598         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
599         for (int i = 0; i < 2; i++) {
600             queue.add(streamEndpoint.execute(
601                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
602                             AsyncEntityProducers.create("Hi there")),
603                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
604         }
605         while (!queue.isEmpty()) {
606             final Future<Message<HttpResponse, String>> future = queue.remove();
607             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
608             Assert.assertNotNull(result);
609             final HttpResponse response = result.getHead();
610             Assert.assertNotNull(response);
611             Assert.assertEquals(200, response.getCode());
612             final String entity = result.getBody();
613             Assert.assertNotNull(entity);
614             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
615             while (t.hasMoreTokens()) {
616                 Assert.assertEquals("0123456789abcdef", t.nextToken());
617             }
618         }
619     }
620 
621 
622     @Test
623     public void testLargePostsPipelined() throws Exception {
624         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
625 
626             @Override
627             public AsyncServerExchangeHandler get() {
628                 return new EchoHandler(2048);
629             }
630 
631         });
632         final InetSocketAddress serverEndpoint = server.start();
633 
634         client.start();
635         final Future<ClientSessionEndpoint> connectFuture = client.connect(
636                 "localhost", serverEndpoint.getPort(), TIMEOUT);
637         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
638 
639         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
640         for (int i = 0; i < 5; i++) {
641             queue.add(streamEndpoint.execute(
642                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
643                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
644                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
645         }
646         while (!queue.isEmpty()) {
647             final Future<Message<HttpResponse, String>> future = queue.remove();
648             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
649             Assert.assertNotNull(result);
650             final HttpResponse response = result.getHead();
651             Assert.assertNotNull(response);
652             Assert.assertEquals(200, response.getCode());
653             final String entity = result.getBody();
654             Assert.assertNotNull(entity);
655             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
656             while (t.hasMoreTokens()) {
657                 Assert.assertEquals("0123456789abcdef", t.nextToken());
658             }
659         }
660     }
661 
662     @Test
663     public void testSimpleHead() throws Exception {
664         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
665 
666             @Override
667             public AsyncServerExchangeHandler get() {
668                 return new SingleLineResponseHandler("Hi there");
669             }
670 
671         });
672         final InetSocketAddress serverEndpoint = server.start();
673 
674         client.start();
675         final Future<ClientSessionEndpoint> connectFuture = client.connect(
676                 "localhost", serverEndpoint.getPort(), TIMEOUT);
677         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
678 
679         for (int i = 0; i < 5; i++) {
680             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
681                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
682                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
683             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
684             Assert.assertNotNull(result);
685             final HttpResponse response1 = result.getHead();
686             Assert.assertNotNull(response1);
687             Assert.assertEquals(200, response1.getCode());
688             Assert.assertNull(result.getBody());
689         }
690     }
691 
692     @Test
693     public void testSimpleHeadConnectionClose() throws Exception {
694         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
695 
696             @Override
697             public AsyncServerExchangeHandler get() {
698                 return new SingleLineResponseHandler("Hi there");
699             }
700 
701         });
702         final InetSocketAddress serverEndpoint = server.start();
703 
704         client.start();
705         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
706         for (int i = 0; i < 5; i++) {
707             final Future<ClientSessionEndpoint> connectFuture = client.connect(
708                     "localhost", serverEndpoint.getPort(), TIMEOUT);
709             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
710                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
711                         AsyncRequestBuilder.head(requestURI)
712                                 .addHeader(HttpHeaders.CONNECTION, "close")
713                                 .build(),
714                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
715                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
716                 Assert.assertNotNull(result);
717                 final HttpResponse response1 = result.getHead();
718                 Assert.assertNotNull(response1);
719                 Assert.assertEquals(200, response1.getCode());
720                 Assert.assertNull(result.getBody());
721             }
722         }
723     }
724 
725     @Test
726     public void testHeadPipelined() throws Exception {
727         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
728 
729             @Override
730             public AsyncServerExchangeHandler get() {
731                 return new SingleLineResponseHandler("Hi there");
732             }
733 
734         });
735         final InetSocketAddress serverEndpoint = server.start();
736 
737         client.start();
738         final Future<ClientSessionEndpoint> connectFuture = client.connect(
739                 "localhost", serverEndpoint.getPort(), TIMEOUT);
740         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
741 
742         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
743         for (int i = 0; i < 5; i++) {
744             queue.add(streamEndpoint.execute(
745                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
746                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
747         }
748         while (!queue.isEmpty()) {
749             final Future<Message<HttpResponse, String>> future = queue.remove();
750             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
751             Assert.assertNotNull(result);
752             final HttpResponse response1 = result.getHead();
753             Assert.assertNotNull(response1);
754             Assert.assertEquals(200, response1.getCode());
755             Assert.assertNull(result.getBody());
756         }
757     }
758 
759     @Test
760     public void testExpectationFailed() throws Exception {
761         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
762 
763             @Override
764             public AsyncServerExchangeHandler get() {
765                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
766 
767                     @Override
768                     protected void handle(
769                             final Message<HttpRequest, String> request,
770                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
771                             final HttpContext context) throws IOException, HttpException {
772                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
773 
774                     }
775                 };
776             }
777 
778         });
779         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
780 
781             @Override
782             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
783 
784                 return new BasicAsyncServerExpectationDecorator(handler) {
785 
786                     @Override
787                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
788                         final Header h = request.getFirstHeader("password");
789                         if (h != null && "secret".equals(h.getValue())) {
790                             return null;
791                         } else {
792                             return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
793                         }
794                     }
795                 };
796 
797             }
798         }, Http1Config.DEFAULT);
799 
800         client.start();
801         final Future<IOSession> sessionFuture = client.requestSession(
802                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
803         final IOSession ioSession = sessionFuture.get();
804         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
805 
806         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
807         request1.addHeader("password", "secret");
808         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
809                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
810                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
811         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
812         Assert.assertNotNull(result1);
813         final HttpResponse response1 = result1.getHead();
814         Assert.assertNotNull(response1);
815         Assert.assertEquals(200, response1.getCode());
816         Assert.assertNotNull("All is well", result1.getBody());
817 
818         Assert.assertTrue(ioSession.isOpen());
819 
820         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
821         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
822                 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
823                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
824         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
825         Assert.assertNotNull(result2);
826         final HttpResponse response2 = result2.getHead();
827         Assert.assertNotNull(response2);
828         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
829         Assert.assertNotNull("You shall not pass", result2.getBody());
830 
831         Assert.assertTrue(ioSession.isOpen());
832 
833         final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
834         request3.addHeader("password", "secret");
835         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
836                 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
837                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
838         final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
839         Assert.assertNotNull(result3);
840         final HttpResponse response3 = result3.getHead();
841         Assert.assertNotNull(response3);
842         Assert.assertEquals(200, response3.getCode());
843         Assert.assertNotNull("All is well", result3.getBody());
844 
845         Assert.assertTrue(ioSession.isOpen());
846 
847         final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
848         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
849                 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
850                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
851         final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
852         Assert.assertNotNull(result4);
853         final HttpResponse response4 = result4.getHead();
854         Assert.assertNotNull(response4);
855         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
856         Assert.assertNotNull("You shall not pass", result4.getBody());
857 
858         Assert.assertFalse(ioSession.isOpen());
859     }
860 
861     @Test
862     public void testExpectationFailedCloseConnection() throws Exception {
863         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
864 
865             @Override
866             public AsyncServerExchangeHandler get() {
867                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
868 
869                     @Override
870                     protected void handle(
871                             final Message<HttpRequest, String> request,
872                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
873                             final HttpContext context) throws IOException, HttpException {
874                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
875 
876                     }
877                 };
878             }
879 
880         });
881         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
882 
883             @Override
884             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
885 
886                 return new BasicAsyncServerExpectationDecorator(handler) {
887 
888                     @Override
889                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
890                         final Header h = request.getFirstHeader("password");
891                         if (h != null && "secret".equals(h.getValue())) {
892                             return null;
893                         } else {
894                             final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
895                             response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
896                             return new BasicResponseProducer(response, "You shall not pass");
897                         }
898                     }
899                 };
900 
901             }
902         }, Http1Config.DEFAULT);
903 
904         client.start();
905         final Future<IOSession> sessionFuture = client.requestSession(
906                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
907         final IOSession ioSession = sessionFuture.get();
908         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
909 
910         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
911         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
912                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
913                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
914                         100000,
915                         ContentType.TEXT_PLAIN)),
916                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
917         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
918         Assert.assertNotNull(result1);
919         final HttpResponse response1 = result1.getHead();
920         Assert.assertNotNull(response1);
921         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
922         Assert.assertNotNull("You shall not pass", result1.getBody());
923 
924         Assert.assertFalse(streamEndpoint.isOpen());
925     }
926 
927     @Test
928     public void testDelayedExpectationVerification() throws Exception {
929         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
930 
931             @Override
932             public AsyncServerExchangeHandler get() {
933                 return new AsyncServerExchangeHandler() {
934 
935                     private final Random random = new Random(System.currentTimeMillis());
936                     private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
937                             "All is well");
938 
939                     @Override
940                     public void handleRequest(
941                             final HttpRequest request,
942                             final EntityDetails entityDetails,
943                             final ResponseChannel responseChannel,
944                             final HttpContext context) throws HttpException, IOException {
945 
946                         Executors.newSingleThreadExecutor().execute(new Runnable() {
947                             @Override
948                             public void run() {
949                                 try {
950                                     if (entityDetails != null) {
951                                         final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
952                                         if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
953                                             Thread.sleep(random.nextInt(1000));
954                                             responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
955                                         }
956                                         final HttpResponse response = new BasicHttpResponse(200);
957                                         synchronized (entityProducer) {
958                                             responseChannel.sendResponse(response, entityProducer, context);
959                                         }
960                                     }
961                                 } catch (final Exception ignore) {
962                                     // ignore
963                                 }
964                             }
965                         });
966 
967                     }
968 
969                     @Override
970                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
971                         capacityChannel.update(Integer.MAX_VALUE);
972                     }
973 
974                     @Override
975                     public void consume(final ByteBuffer src) throws IOException {
976                     }
977 
978                     @Override
979                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
980                     }
981 
982                     @Override
983                     public int available() {
984                         synchronized (entityProducer) {
985                             return entityProducer.available();
986                         }
987                     }
988 
989                     @Override
990                     public void produce(final DataStreamChannel channel) throws IOException {
991                         synchronized (entityProducer) {
992                             entityProducer.produce(channel);
993                         }
994                     }
995 
996                     @Override
997                     public void failed(final Exception cause) {
998                     }
999 
1000                     @Override
1001                     public void releaseResources() {
1002                     }
1003 
1004                 };
1005 
1006             }
1007         });
1008         final InetSocketAddress serverEndpoint = server.start();
1009 
1010         client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
1011         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1012                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1013         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1014 
1015         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1016         for (int i = 0; i < 5; i++) {
1017             queue.add(streamEndpoint.execute(
1018                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1019                             AsyncEntityProducers.create("Some important message")),
1020                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1021         }
1022         while (!queue.isEmpty()) {
1023             final Future<Message<HttpResponse, String>> future = queue.remove();
1024             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1025             Assert.assertNotNull(result);
1026             final HttpResponse response = result.getHead();
1027             Assert.assertNotNull(response);
1028             Assert.assertEquals(200, response.getCode());
1029             Assert.assertNotNull("All is well", result.getBody());
1030         }
1031     }
1032 
1033     @Test
1034     public void testPrematureResponse() throws Exception {
1035         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1036 
1037             @Override
1038             public AsyncServerExchangeHandler get() {
1039                 return new AsyncServerExchangeHandler() {
1040 
1041                     private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
1042 
1043                     @Override
1044                     public void handleRequest(
1045                             final HttpRequest request,
1046                             final EntityDetails entityDetails,
1047                             final ResponseChannel responseChannel,
1048                             final HttpContext context) throws HttpException, IOException {
1049                         final AsyncResponseProducer producer;
1050                         final Header h = request.getFirstHeader("password");
1051                         if (h != null && "secret".equals(h.getValue())) {
1052                             producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1053                         } else {
1054                             producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1055                         }
1056                         responseProducer.set(producer);
1057                         producer.sendResponse(responseChannel, context);
1058                     }
1059 
1060                     @Override
1061                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1062                         capacityChannel.update(Integer.MAX_VALUE);
1063                     }
1064 
1065                     @Override
1066                     public void consume(final ByteBuffer src) throws IOException {
1067                     }
1068 
1069                     @Override
1070                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1071                     }
1072 
1073                     @Override
1074                     public int available() {
1075                         final AsyncResponseProducer producer = responseProducer.get();
1076                         return producer.available();
1077                     }
1078 
1079                     @Override
1080                     public void produce(final DataStreamChannel channel) throws IOException {
1081                         final AsyncResponseProducer producer = responseProducer.get();
1082                         producer.produce(channel);
1083                     }
1084 
1085                     @Override
1086                     public void failed(final Exception cause) {
1087                     }
1088 
1089                     @Override
1090                     public void releaseResources() {
1091                     }
1092                 };
1093             }
1094 
1095         });
1096         final InetSocketAddress serverEndpoint = server.start();
1097 
1098         client.start();
1099         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1100                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1101         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1102 
1103         for (int i = 0; i < 3; i++) {
1104             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1105             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1106                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1107                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1108             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1109             Assert.assertNotNull(result1);
1110             final HttpResponse response1 = result1.getHead();
1111             Assert.assertNotNull(response1);
1112             Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1113             Assert.assertNotNull("You shall not pass", result1.getBody());
1114 
1115             Assert.assertTrue(streamEndpoint.isOpen());
1116         }
1117         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1118         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1119                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1120                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1121                         100000,
1122                         ContentType.TEXT_PLAIN)),
1123                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1124         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1125         Assert.assertNotNull(result1);
1126         final HttpResponse response1 = result1.getHead();
1127         Assert.assertNotNull(response1);
1128         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1129         Assert.assertNotNull("You shall not pass", result1.getBody());
1130 
1131         Assert.assertFalse(streamEndpoint.isOpen());
1132     }
1133 
1134     @Test
1135     public void testSlowResponseConsumer() throws Exception {
1136         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
1137 
1138             @Override
1139             public AsyncServerExchangeHandler get() {
1140                 return new MultiLineResponseHandler("0123456789abcd", 100);
1141             }
1142 
1143         });
1144         final InetSocketAddress serverEndpoint = server.start();
1145 
1146         client.start(Http1Config.custom().setBufferSize(256).build());
1147 
1148         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1149                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1150         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1151 
1152         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1153         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1154                 new BasicRequestProducer(request1, null),
1155                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1156 
1157                     @Override
1158                     protected String consumeData(
1159                             final ContentType contentType, final InputStream inputStream) throws IOException {
1160                         Charset charset = contentType != null ? contentType.getCharset() : null;
1161                         if (charset == null) {
1162                             charset = StandardCharsets.US_ASCII;
1163                         }
1164 
1165                         final StringBuilder buffer = new StringBuilder();
1166                         try {
1167                             final byte[] tmp = new byte[16];
1168                             int l;
1169                             while ((l = inputStream.read(tmp)) != -1) {
1170                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1171                                 Thread.sleep(50);
1172                             }
1173                         } catch (final InterruptedException ex) {
1174                             Thread.currentThread().interrupt();
1175                             throw new InterruptedIOException(ex.getMessage());
1176                         }
1177                         return buffer.toString();
1178                     }
1179                 }),
1180                 null);
1181 
1182         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1183         Assert.assertNotNull(result1);
1184         final HttpResponse response1 = result1.getHead();
1185         Assert.assertNotNull(response1);
1186         Assert.assertEquals(200, response1.getCode());
1187         final String s1 = result1.getBody();
1188         Assert.assertNotNull(s1);
1189         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1190         while (t1.hasMoreTokens()) {
1191             Assert.assertEquals("0123456789abcd", t1.nextToken());
1192         }
1193     }
1194 
1195     @Test
1196     public void testSlowRequestProducer() throws Exception {
1197         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1198 
1199             @Override
1200             public AsyncServerExchangeHandler get() {
1201                 return new EchoHandler(2048);
1202             }
1203 
1204         });
1205         final InetSocketAddress serverEndpoint = server.start();
1206 
1207         client.start();
1208         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1209                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1210         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1211 
1212         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1213         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1214                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1215 
1216                     @Override
1217                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1218                         Charset charset = contentType.getCharset();
1219                         if (charset == null) {
1220                             charset = StandardCharsets.US_ASCII;
1221                         }
1222                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1223                             for (int i = 0; i < 500; i++) {
1224                                 if (i % 100 == 0) {
1225                                     writer.flush();
1226                                     Thread.sleep(500);
1227                                 }
1228                                 writer.write("0123456789abcdef\r\n");
1229                             }
1230                         } catch (final InterruptedException ex) {
1231                             Thread.currentThread().interrupt();
1232                             throw new InterruptedIOException(ex.getMessage());
1233                         }
1234                     }
1235 
1236                 }),
1237                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1238         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1239         Assert.assertNotNull(result1);
1240         final HttpResponse response1 = result1.getHead();
1241         Assert.assertNotNull(response1);
1242         Assert.assertEquals(200, response1.getCode());
1243         final String s1 = result1.getBody();
1244         Assert.assertNotNull(s1);
1245         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1246         while (t1.hasMoreTokens()) {
1247             Assert.assertEquals("0123456789abcdef", t1.nextToken());
1248         }
1249     }
1250 
1251     @Test
1252     public void testSlowResponseProducer() throws Exception {
1253         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1254 
1255             @Override
1256             public AsyncServerExchangeHandler get() {
1257                 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1258 
1259                     @Override
1260                     protected void handle(
1261                             final HttpRequest request,
1262                             final InputStream requestStream,
1263                             final HttpResponse response,
1264                             final OutputStream responseStream,
1265                             final HttpContext context) throws IOException, HttpException {
1266 
1267                         if (!"/hello".equals(request.getPath())) {
1268                             response.setCode(HttpStatus.SC_NOT_FOUND);
1269                             return;
1270                         }
1271                         if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1272                             response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1273                             return;
1274                         }
1275                         if (requestStream == null) {
1276                             return;
1277                         }
1278                         final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1279                         final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1280                         Charset charset = contentType != null ? contentType.getCharset() : null;
1281                         if (charset == null) {
1282                             charset = StandardCharsets.US_ASCII;
1283                         }
1284                         response.setCode(HttpStatus.SC_OK);
1285                         response.setHeader(h1);
1286                         try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1287                              final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1288                             try {
1289                                 String l;
1290                                 int count = 0;
1291                                 while ((l = reader.readLine()) != null) {
1292                                     writer.write(l);
1293                                     writer.write("\r\n");
1294                                     count++;
1295                                     if (count % 500 == 0) {
1296                                         Thread.sleep(500);
1297                                     }
1298                                 }
1299                                 writer.flush();
1300                             } catch (final InterruptedException ex) {
1301                                 Thread.currentThread().interrupt();
1302                                 throw new InterruptedIOException(ex.getMessage());
1303                             }
1304                         }
1305                     }
1306                 };
1307             }
1308 
1309         });
1310         final InetSocketAddress serverEndpoint = server.start();
1311 
1312         client.start(Http1Config.custom().setBufferSize(256).build());
1313 
1314         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1315                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1316         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1317 
1318         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1319         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1320                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1321                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1322         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1323         Assert.assertNotNull(result1);
1324         final HttpResponse response1 = result1.getHead();
1325         Assert.assertNotNull(response1);
1326         Assert.assertEquals(200, response1.getCode());
1327         final String s1 = result1.getBody();
1328         Assert.assertNotNull(s1);
1329         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1330         while (t1.hasMoreTokens()) {
1331             Assert.assertEquals("0123456789abcd", t1.nextToken());
1332         }
1333     }
1334 
1335     @Test
1336     public void testPipelinedConnectionClose() throws Exception {
1337         server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1338 
1339             @Override
1340             public AsyncServerExchangeHandler get() {
1341                 return new SingleLineResponseHandler("Hi back");
1342             }
1343 
1344         });
1345         final InetSocketAddress serverEndpoint = server.start();
1346 
1347         client.start();
1348         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1349                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1350         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1351 
1352         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1353                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1354                         AsyncEntityProducers.create("Hi there")),
1355                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1356         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1357         request2.addHeader(HttpHeaders.CONNECTION, "close");
1358         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1359                 new BasicRequestProducer(request2,
1360                         AsyncEntityProducers.create("Hi there")),
1361                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1362         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1363                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1364                         AsyncEntityProducers.create("Hi there")),
1365                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1366 
1367         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1368         Assert.assertNotNull(result1);
1369         final HttpResponse response1 = result1.getHead();
1370         final String entity1 = result1.getBody();
1371         Assert.assertNotNull(response1);
1372         Assert.assertEquals(200, response1.getCode());
1373         Assert.assertEquals("Hi back", entity1);
1374 
1375         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1376         Assert.assertNotNull(result2);
1377         final HttpResponse response2 = result2.getHead();
1378         final String entity2 = result2.getBody();
1379         Assert.assertNotNull(response2);
1380         Assert.assertEquals(200, response2.getCode());
1381         Assert.assertEquals("Hi back", entity2);
1382 
1383         try {
1384             future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1385             Assert.fail("ExecutionException expected");
1386         } catch (final CancellationException | ExecutionException ignore) {
1387         }
1388 
1389         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1390                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1391                         AsyncEntityProducers.create("Hi there")),
1392                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1393         try {
1394             future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1395             Assert.fail("CancellationException or ExecutionException expected");
1396         } catch (final CancellationException ignore) {
1397             Assert.assertTrue(future4.isCancelled());
1398         } catch (final ExecutionException ignore) {
1399         }
1400     }
1401 
1402     @Test
1403     public void testPipelinedInvalidRequest() throws Exception {
1404         server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1405 
1406             @Override
1407             public AsyncServerExchangeHandler get() {
1408                 return new SingleLineResponseHandler("Hi back");
1409             }
1410 
1411         });
1412         final InetSocketAddress serverEndpoint = server.start();
1413 
1414         client.start();
1415         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1416                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1417         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1418 
1419         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1420                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1421                         AsyncEntityProducers.create("Hi there")),
1422                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1423         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1424         request2.addHeader(HttpHeaders.HOST, "blah:blah");
1425         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1426                 new BasicRequestProducer(request2,
1427                         AsyncEntityProducers.create("Hi there")),
1428                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1429         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1430                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1431                         AsyncEntityProducers.create("Hi there")),
1432                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1433 
1434         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1435         Assert.assertNotNull(result1);
1436         final HttpResponse response1 = result1.getHead();
1437         final String entity1 = result1.getBody();
1438         Assert.assertNotNull(response1);
1439         Assert.assertEquals(200, response1.getCode());
1440         Assert.assertEquals("Hi back", entity1);
1441 
1442         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1443         Assert.assertNotNull(result2);
1444         final HttpResponse response2 = result2.getHead();
1445         final String entity2 = result2.getBody();
1446         Assert.assertNotNull(response2);
1447         Assert.assertEquals(400, response2.getCode());
1448         Assert.assertTrue(entity2.length() > 0);
1449 
1450         try {
1451             future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1452             Assert.fail("ExecutionException expected");
1453         } catch (final CancellationException | ExecutionException ignore) {
1454         }
1455     }
1456 
1457     private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1458 
1459     private static class BrokenChunkEncoder extends AbstractContentEncoder {
1460 
1461         private final CharArrayBuffer lineBuffer;
1462         private boolean done;
1463 
1464         BrokenChunkEncoder(
1465                 final WritableByteChannel channel,
1466                 final SessionOutputBuffer buffer,
1467                 final BasicHttpTransportMetrics metrics) {
1468             super(channel, buffer, metrics);
1469             lineBuffer = new CharArrayBuffer(16);
1470         }
1471 
1472         @Override
1473         public void complete(final List<? extends Header> trailers) throws IOException {
1474             super.complete(trailers);
1475         }
1476 
1477         @Override
1478         public int write(final ByteBuffer src) throws IOException {
1479             final int chunk;
1480             if (!done) {
1481                 lineBuffer.clear();
1482                 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1483                 buffer().writeLine(lineBuffer);
1484                 buffer().write(ByteBuffer.wrap(GARBAGE));
1485                 done = true;
1486                 chunk = GARBAGE.length;
1487             } else {
1488                 chunk = 0;
1489             }
1490             final long bytesWritten = buffer().flush(channel());
1491             if (bytesWritten > 0) {
1492                 metrics().incrementBytesTransferred(bytesWritten);
1493             }
1494             if (!buffer().hasData()) {
1495                 channel().close();
1496             }
1497             return chunk;
1498         }
1499 
1500     }
1501 
1502     @Test
1503     public void testTruncatedChunk() throws Exception {
1504         final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1505                 HttpProcessors.server(),
1506                 new HandlerFactory<AsyncServerExchangeHandler>() {
1507 
1508                     @Override
1509                     public AsyncServerExchangeHandler create(
1510                             final HttpRequest request,
1511                             final HttpContext context) throws HttpException {
1512                         return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1513 
1514                             @Override
1515                             protected void handle(
1516                                     final Message<HttpRequest, String> request,
1517                                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1518                                     final HttpContext context) throws IOException, HttpException {
1519                                 responseTrigger.submitResponse(
1520                                         new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1521                             }
1522 
1523                         };
1524                     }
1525 
1526                 },
1527                 Http1Config.DEFAULT,
1528                 CharCodingConfig.DEFAULT,
1529                 DefaultConnectionReuseStrategy.INSTANCE,
1530                 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1531 
1532             @Override
1533             protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1534                     final ProtocolIOSession ioSession,
1535                     final HttpProcessor httpProcessor,
1536                     final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1537                     final Http1Config http1Config,
1538                     final CharCodingConfig connectionConfig,
1539                     final ConnectionReuseStrategy connectionReuseStrategy,
1540                     final NHttpMessageParser<HttpRequest> incomingMessageParser,
1541                     final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1542                     final ContentLengthStrategy incomingContentStrategy,
1543                     final ContentLengthStrategy outgoingContentStrategy,
1544                     final Http1StreamListener streamListener) {
1545                 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1546                         scheme.id,
1547                         http1Config, connectionConfig, connectionReuseStrategy,
1548                         incomingMessageParser, outgoingMessageWriter,
1549                         incomingContentStrategy, outgoingContentStrategy,
1550                         streamListener) {
1551 
1552                     @Override
1553                     protected ContentEncoder createContentEncoder(
1554                             final long len,
1555                             final WritableByteChannel channel,
1556                             final SessionOutputBuffer buffer,
1557                             final BasicHttpTransportMetrics metrics) throws HttpException {
1558                         if (len == ContentLengthStrategy.CHUNKED) {
1559                             return new BrokenChunkEncoder(channel, buffer, metrics);
1560                         } else {
1561                             return super.createContentEncoder(len, channel, buffer, metrics);
1562                         }
1563                     }
1564 
1565                 };
1566             }
1567 
1568         });
1569 
1570         client.start();
1571         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1572                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1573         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1574 
1575         final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1576         final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1577 
1578             @Override
1579             public void releaseResources() {
1580                 // Do not clear internal content buffer
1581             }
1582 
1583         };
1584         final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1585         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1586         try {
1587             future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1588             Assert.fail("ExecutionException should have been thrown");
1589         } catch (final ExecutionException ex) {
1590             final Throwable cause = ex.getCause();
1591             Assert.assertTrue(cause instanceof MalformedChunkCodingException);
1592             Assert.assertEquals("garbage", entityConsumer.generateContent());
1593         }
1594     }
1595 
1596     @Test
1597     public void testExceptionInHandler() throws Exception {
1598         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1599 
1600             @Override
1601             public AsyncServerExchangeHandler get() {
1602                 return new SingleLineResponseHandler("Hi there") {
1603 
1604                     @Override
1605                     protected void handle(
1606                             final Message<HttpRequest, String> request,
1607                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1608                             final HttpContext context) throws IOException, HttpException {
1609                         throw new HttpException("Boom");
1610                     }
1611                 };
1612             }
1613 
1614         });
1615         final InetSocketAddress serverEndpoint = server.start();
1616 
1617         client.start();
1618         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1619                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1620         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1621 
1622         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1623                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1624                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1625         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1626         Assert.assertNotNull(result);
1627         final HttpResponse response1 = result.getHead();
1628         final String entity1 = result.getBody();
1629         Assert.assertNotNull(response1);
1630         Assert.assertEquals(500, response1.getCode());
1631         Assert.assertEquals("Boom", entity1);
1632     }
1633 
1634     @Test
1635     public void testNoServiceHandler() throws Exception {
1636         final InetSocketAddress serverEndpoint = server.start();
1637 
1638         client.start();
1639         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1640                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1641         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1642 
1643         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1644                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1645                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1646         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1647         Assert.assertNotNull(result);
1648         final HttpResponse response1 = result.getHead();
1649         final String entity1 = result.getBody();
1650         Assert.assertNotNull(response1);
1651         Assert.assertEquals(404, response1.getCode());
1652         Assert.assertEquals("Resource not found", entity1);
1653     }
1654 
1655     @Test
1656     public void testResponseNoContent() throws Exception {
1657         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1658 
1659             @Override
1660             public AsyncServerExchangeHandler get() {
1661                 return new SingleLineResponseHandler("Hi there") {
1662 
1663                     @Override
1664                     protected void handle(
1665                             final Message<HttpRequest, String> request,
1666                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1667                             final HttpContext context) throws IOException, HttpException {
1668                         final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1669                         responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1670                     }
1671                 };
1672             }
1673 
1674         });
1675         final InetSocketAddress serverEndpoint = server.start();
1676 
1677         client.start();
1678         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1679                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1680         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1681 
1682         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1683                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1684                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1685         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1686         Assert.assertNotNull(result);
1687         final HttpResponse response1 = result.getHead();
1688         Assert.assertNotNull(response1);
1689         Assert.assertEquals(204, response1.getCode());
1690         Assert.assertNull(result.getBody());
1691     }
1692 
1693     @Test
1694     public void testAbsentHostHeader() throws Exception {
1695         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1696 
1697             @Override
1698             public AsyncServerExchangeHandler get() {
1699                 return new SingleLineResponseHandler("Hi there");
1700             }
1701 
1702         });
1703         final InetSocketAddress serverEndpoint = server.start();
1704 
1705         client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()), Http1Config.DEFAULT);
1706 
1707         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1708                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1709         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1710 
1711         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1712         request1.setVersion(HttpVersion.HTTP_1_0);
1713         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1714                 new BasicRequestProducer(request1, null),
1715                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1716         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1717         Assert.assertNotNull(result1);
1718         final HttpResponse response1 = result1.getHead();
1719         Assert.assertNotNull(response1);
1720         Assert.assertEquals(200, response1.getCode());
1721         Assert.assertEquals("Hi there", result1.getBody());
1722 
1723         final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1724         request2.setVersion(HttpVersion.HTTP_1_1);
1725         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1726                 new BasicRequestProducer(request2, null),
1727                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1728         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1729         Assert.assertNotNull(result2);
1730         final HttpResponse response2 = result2.getHead();
1731         Assert.assertNotNull(response2);
1732         Assert.assertEquals(400, response2.getCode());
1733         Assert.assertEquals("Host header is absent", result2.getBody());
1734     }
1735 
1736     @Test
1737     public void testMessageWithTrailers() throws Exception {
1738         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1739 
1740             @Override
1741             public AsyncServerExchangeHandler get() {
1742                 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1743 
1744                     @Override
1745                     protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1746                             final HttpRequest request,
1747                             final EntityDetails entityDetails,
1748                             final HttpContext context) throws HttpException {
1749                         return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1750                     }
1751 
1752                     @Override
1753                     protected void handle(
1754                             final Message<HttpRequest, String> requestMessage,
1755                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1756                             final HttpContext context) throws HttpException, IOException {
1757                         responseTrigger.submitResponse(new BasicResponseProducer(
1758                                 HttpStatus.SC_OK,
1759                                 new DigestingEntityProducer("MD5",
1760                                         new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1761                     }
1762                 };
1763             }
1764 
1765         });
1766         final InetSocketAddress serverEndpoint = server.start();
1767 
1768         client.start();
1769 
1770         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1771                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1772         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1773 
1774         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1775         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1776         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1777                 new BasicRequestProducer(request1, null),
1778                 new BasicResponseConsumer<>(entityConsumer), null);
1779         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1780         Assert.assertNotNull(result1);
1781         final HttpResponse response1 = result1.getHead();
1782         Assert.assertNotNull(response1);
1783         Assert.assertEquals(200, response1.getCode());
1784         Assert.assertEquals("Hello back with some trailers", result1.getBody());
1785 
1786         final List<Header> trailers = entityConsumer.getTrailers();
1787         Assert.assertNotNull(trailers);
1788         Assert.assertEquals(2, trailers.size());
1789         final Map<String, String> map = new HashMap<>();
1790         for (final Header header: trailers) {
1791             map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
1792         }
1793         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1794         Assert.assertEquals("MD5", map.get("digest-algo"));
1795         Assert.assertEquals(digest, map.get("digest"));
1796     }
1797 
1798     @Test
1799     public void testProtocolException() throws Exception {
1800         server.register("/boom", new Supplier<AsyncServerExchangeHandler>() {
1801 
1802             @Override
1803             public AsyncServerExchangeHandler get() {
1804                 return new AsyncServerExchangeHandler() {
1805 
1806                     private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1807 
1808                     @Override
1809                     public void releaseResources() {
1810                         entityProducer.releaseResources();
1811                     }
1812 
1813                     @Override
1814                     public void handleRequest(
1815                             final HttpRequest request,
1816                             final EntityDetails entityDetails,
1817                             final ResponseChannel responseChannel,
1818                             final HttpContext context) throws HttpException, IOException {
1819                         final String requestUri = request.getRequestUri();
1820                         if (requestUri.endsWith("boom")) {
1821                             throw new ProtocolException("Boom!!!");
1822                         }
1823                         responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1824                     }
1825 
1826                     @Override
1827                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1828                         capacityChannel.update(Integer.MAX_VALUE);
1829                     }
1830 
1831                     @Override
1832                     public void consume(final ByteBuffer src) throws IOException {
1833                     }
1834 
1835                     @Override
1836                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1837                         // empty
1838                     }
1839 
1840                     @Override
1841                     public int available() {
1842                         return entityProducer.available();
1843                     }
1844 
1845                     @Override
1846                     public void produce(final DataStreamChannel channel) throws IOException {
1847                         entityProducer.produce(channel);
1848                     }
1849 
1850                     @Override
1851                     public void failed(final Exception cause) {
1852                         releaseResources();
1853                     }
1854 
1855                 };
1856             }
1857 
1858         });
1859 
1860         final InetSocketAddress serverEndpoint = server.start();
1861 
1862         client.start();
1863         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1864                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1865         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1866         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1867                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1868                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1869         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1870         Assert.assertNotNull(result);
1871         final HttpResponse response1 = result.getHead();
1872         final String entity1 = result.getBody();
1873         Assert.assertNotNull(response1);
1874         Assert.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1875         Assert.assertEquals("Boom!!!", entity1);
1876     }
1877 
1878     @Test
1879     public void testHeaderTooLarge() throws Exception {
1880         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1881 
1882             @Override
1883             public AsyncServerExchangeHandler get() {
1884                 return new SingleLineResponseHandler("Hi there");
1885             }
1886 
1887         });
1888         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1889                 .setMaxLineLength(100)
1890                 .build());
1891         client.start();
1892 
1893         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1894                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1895         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1896 
1897         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1898         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1899                 "1234567890123456789012345678901234567890");
1900         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1901                 new BasicRequestProducer(request1, null),
1902                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1903         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1904         Assert.assertNotNull(result1);
1905         final HttpResponse response1 = result1.getHead();
1906         Assert.assertNotNull(response1);
1907         Assert.assertEquals(431, response1.getCode());
1908         Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1909     }
1910 
1911     @Test
1912     public void testHeaderTooLargePost() throws Exception {
1913         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1914 
1915             @Override
1916             public AsyncServerExchangeHandler get() {
1917                 return new SingleLineResponseHandler("Hi there");
1918             }
1919 
1920         });
1921         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1922                 .setMaxLineLength(100)
1923                 .build());
1924         client.start(
1925                 new DefaultHttpProcessor(new RequestContent(), new RequestTargetHost(), new RequestConnControl()), null);
1926 
1927         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1928                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1929         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1930 
1931         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1932         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1933                 "1234567890123456789012345678901234567890");
1934 
1935         final byte[] b = new byte[2048];
1936         for (int i = 0; i < b.length; i++) {
1937             b[i] = (byte) ('a' + i % 10);
1938         }
1939 
1940         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1941                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1942                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1943         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1944         Assert.assertNotNull(result1);
1945         final HttpResponse response1 = result1.getHead();
1946         Assert.assertNotNull(response1);
1947         Assert.assertEquals(431, response1.getCode());
1948         Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1949     }
1950 
1951 }