View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import static org.hamcrest.MatcherAssert.assertThat;
31  
32  import java.io.BufferedReader;
33  import java.io.BufferedWriter;
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.io.InputStreamReader;
37  import java.io.InterruptedIOException;
38  import java.io.OutputStream;
39  import java.io.OutputStreamWriter;
40  import java.net.InetSocketAddress;
41  import java.net.URI;
42  import java.net.URISyntaxException;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.WritableByteChannel;
45  import java.nio.charset.Charset;
46  import java.nio.charset.StandardCharsets;
47  import java.util.HashMap;
48  import java.util.LinkedList;
49  import java.util.List;
50  import java.util.Map;
51  import java.util.Queue;
52  import java.util.Random;
53  import java.util.StringTokenizer;
54  import java.util.concurrent.CancellationException;
55  import java.util.concurrent.ExecutionException;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.Future;
58  import java.util.concurrent.atomic.AtomicReference;
59  
60  import org.apache.hc.core5.http.ConnectionReuseStrategy;
61  import org.apache.hc.core5.http.ContentLengthStrategy;
62  import org.apache.hc.core5.http.ContentType;
63  import org.apache.hc.core5.http.EntityDetails;
64  import org.apache.hc.core5.http.Header;
65  import org.apache.hc.core5.http.HeaderElements;
66  import org.apache.hc.core5.http.HttpException;
67  import org.apache.hc.core5.http.HttpHeaders;
68  import org.apache.hc.core5.http.HttpHost;
69  import org.apache.hc.core5.http.HttpRequest;
70  import org.apache.hc.core5.http.HttpResponse;
71  import org.apache.hc.core5.http.HttpStatus;
72  import org.apache.hc.core5.http.HttpVersion;
73  import org.apache.hc.core5.http.MalformedChunkCodingException;
74  import org.apache.hc.core5.http.Message;
75  import org.apache.hc.core5.http.Method;
76  import org.apache.hc.core5.http.ProtocolException;
77  import org.apache.hc.core5.http.URIScheme;
78  import org.apache.hc.core5.http.config.CharCodingConfig;
79  import org.apache.hc.core5.http.config.Http1Config;
80  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
81  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
82  import org.apache.hc.core5.http.impl.Http1StreamListener;
83  import org.apache.hc.core5.http.impl.HttpProcessors;
84  import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
85  import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
86  import org.apache.hc.core5.http.message.BasicHttpRequest;
87  import org.apache.hc.core5.http.message.BasicHttpResponse;
88  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
89  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
90  import org.apache.hc.core5.http.nio.AsyncRequestProducer;
91  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
92  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
93  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
94  import org.apache.hc.core5.http.nio.CapacityChannel;
95  import org.apache.hc.core5.http.nio.ContentEncoder;
96  import org.apache.hc.core5.http.nio.DataStreamChannel;
97  import org.apache.hc.core5.http.nio.HandlerFactory;
98  import org.apache.hc.core5.http.nio.NHttpMessageParser;
99  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
100 import org.apache.hc.core5.http.nio.ResponseChannel;
101 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
102 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
103 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
104 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
105 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
106 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
107 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
108 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
109 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
110 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
111 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
112 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
113 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
114 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
115 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
116 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
118 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
119 import org.apache.hc.core5.http.protocol.HttpContext;
120 import org.apache.hc.core5.http.protocol.HttpProcessor;
121 import org.apache.hc.core5.http.protocol.RequestConnControl;
122 import org.apache.hc.core5.http.protocol.RequestContent;
123 import org.apache.hc.core5.http.protocol.RequestTargetHost;
124 import org.apache.hc.core5.http.protocol.RequestValidateHost;
125 import org.apache.hc.core5.reactor.IOSession;
126 import org.apache.hc.core5.reactor.ProtocolIOSession;
127 import org.apache.hc.core5.testing.SSLTestContexts;
128 import org.apache.hc.core5.testing.nio.extension.Http1TestResources;
129 import org.apache.hc.core5.util.CharArrayBuffer;
130 import org.apache.hc.core5.util.TextUtils;
131 import org.apache.hc.core5.util.Timeout;
132 import org.hamcrest.CoreMatchers;
133 import org.junit.jupiter.api.Assertions;
134 import org.junit.jupiter.api.Test;
135 import org.junit.jupiter.api.extension.RegisterExtension;
136 
137 public abstract class Http1IntegrationTest {
138 
139     private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
140     private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
141 
142     private final URIScheme scheme;
143 
144     @RegisterExtension
145     private final Http1TestResources resources;
146 
147     public Http1IntegrationTest(final URIScheme scheme) {
148         this.scheme = scheme;
149         this.resources = new Http1TestResources(scheme, TIMEOUT);
150     }
151 
152     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
153         try {
154             return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
155         } catch (final URISyntaxException e) {
156             throw new IllegalStateException();
157         }
158     }
159 
160     @Test
161     public void testSimpleGet() throws Exception {
162         final Http1TestServer server = resources.server();
163         final Http1TestClient client = resources.client();
164 
165         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
166         final InetSocketAddress serverEndpoint = server.start();
167 
168         client.start();
169         final Future<ClientSessionEndpoint> connectFuture = client.connect(
170                 "localhost", serverEndpoint.getPort(), TIMEOUT);
171         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
172 
173         for (int i = 0; i < 5; i++) {
174             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
175                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
176                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
177             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
178             Assertions.assertNotNull(result);
179             final HttpResponse response1 = result.getHead();
180             final String entity1 = result.getBody();
181             Assertions.assertNotNull(response1);
182             Assertions.assertEquals(200, response1.getCode());
183             Assertions.assertEquals("Hi there", entity1);
184         }
185     }
186 
187     @Test
188     public void testSimpleGetConnectionClose() throws Exception {
189         final Http1TestServer server = resources.server();
190         final Http1TestClient client = resources.client();
191 
192         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
193         final InetSocketAddress serverEndpoint = server.start();
194 
195         client.start();
196         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
197         for (int i = 0; i < 5; i++) {
198             final Future<ClientSessionEndpoint> connectFuture = client.connect(
199                     "localhost", serverEndpoint.getPort(), TIMEOUT);
200             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
201                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
202                         AsyncRequestBuilder.get(requestURI)
203                                 .addHeader(HttpHeaders.CONNECTION, "close")
204                                 .build(),
205                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
206                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
207                 Assertions.assertNotNull(result);
208                 final HttpResponse response1 = result.getHead();
209                 final String entity1 = result.getBody();
210                 Assertions.assertNotNull(response1);
211                 Assertions.assertEquals(200, response1.getCode());
212                 Assertions.assertEquals("Hi there", entity1);
213             }
214         }
215     }
216 
217     @Test
218     public void testSimpleGetIdentityTransfer() throws Exception {
219         final Http1TestServer server = resources.server();
220         final Http1TestClient client = resources.client();
221 
222         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
223         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
224         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
225 
226         client.start();
227 
228         final int reqNo = 5;
229 
230         for (int i = 0; i < reqNo; i++) {
231             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
232             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
233 
234             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
235                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
236                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
237             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
238 
239             streamEndpoint.close();
240 
241             Assertions.assertNotNull(result);
242             final HttpResponse response = result.getHead();
243             final String entity = result.getBody();
244             Assertions.assertNotNull(response);
245             Assertions.assertEquals(200, response.getCode());
246             Assertions.assertEquals("Hi there", entity);
247         }
248 
249     }
250 
251     @Test
252     public void testPostIdentityTransfer() throws Exception {
253         final Http1TestServer server = resources.server();
254         final Http1TestClient client = resources.client();
255 
256         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
257         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
258         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
259 
260         client.start();
261 
262         final int reqNo = 5;
263 
264         for (int i = 0; i < reqNo; i++) {
265             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
266             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
267 
268             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
269                     new BasicRequestProducer(Method.POST,
270                             createRequestURI(serverEndpoint, "/hello"),
271                             new MultiLineEntityProducer("Hello", 16 * i)),
272                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
273             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
274 
275             streamEndpoint.close();
276 
277             Assertions.assertNotNull(result);
278             final HttpResponse response = result.getHead();
279             final String entity = result.getBody();
280             Assertions.assertNotNull(response);
281             Assertions.assertEquals(200, response.getCode());
282             Assertions.assertEquals("Hi there", entity);
283         }
284     }
285 
286     @Test
287     public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
288         final Http1TestServer server = resources.server();
289         final Http1TestClient client = resources.client();
290 
291         server.register("/hello", () -> new ImmediateResponseExchangeHandler(500, "Go away"));
292         final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
293         final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
294 
295         client.start();
296 
297         final int reqNo = 5;
298 
299         for (int i = 0; i < reqNo; i++) {
300             final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
301             final ClientSessionEndpoint streamEndpoint = connectFuture.get();
302 
303             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
304                     new BasicRequestProducer(Method.POST,
305                             createRequestURI(serverEndpoint, "/hello"),
306                             new MultiLineEntityProducer("Hello", 16 * i)),
307                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
308             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
309 
310             streamEndpoint.close();
311 
312             Assertions.assertNotNull(result);
313             final HttpResponse response = result.getHead();
314             final String entity = result.getBody();
315             Assertions.assertNotNull(response);
316             Assertions.assertEquals(500, response.getCode());
317             Assertions.assertEquals("Go away", entity);
318         }
319     }
320 
321     @Test
322     public void testSimpleGetsPipelined() throws Exception {
323         final Http1TestServer server = resources.server();
324         final Http1TestClient client = resources.client();
325 
326         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
327         final InetSocketAddress serverEndpoint = server.start();
328 
329         client.start();
330         final Future<ClientSessionEndpoint> connectFuture = client.connect(
331                 "localhost", serverEndpoint.getPort(), TIMEOUT);
332         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
333 
334         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
335         for (int i = 0; i < 5; i++) {
336             queue.add(streamEndpoint.execute(
337                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
338                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
339         }
340         while (!queue.isEmpty()) {
341             final Future<Message<HttpResponse, String>> future = queue.remove();
342             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
343             Assertions.assertNotNull(result);
344             final HttpResponse response = result.getHead();
345             final String entity = result.getBody();
346             Assertions.assertNotNull(response);
347             Assertions.assertEquals(200, response.getCode());
348             Assertions.assertEquals("Hi there", entity);
349         }
350     }
351 
352     @Test
353     public void testLargeGet() throws Exception {
354         final Http1TestServer server = resources.server();
355         final Http1TestClient client = resources.client();
356 
357         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
358         final InetSocketAddress serverEndpoint = server.start();
359 
360         client.start();
361         final Future<ClientSessionEndpoint> connectFuture = client.connect(
362                 "localhost", serverEndpoint.getPort(), TIMEOUT);
363         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
364 
365         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
366                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
367                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
368 
369         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
370         Assertions.assertNotNull(result1);
371         final HttpResponse response1 = result1.getHead();
372         Assertions.assertNotNull(response1);
373         Assertions.assertEquals(200, response1.getCode());
374         final String s1 = result1.getBody();
375         Assertions.assertNotNull(s1);
376         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
377         while (t1.hasMoreTokens()) {
378             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
379         }
380 
381         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
382                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
383                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
384 
385         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
386         Assertions.assertNotNull(result2);
387         final HttpResponse response2 = result2.getHead();
388         Assertions.assertNotNull(response2);
389         Assertions.assertEquals(200, response2.getCode());
390         final String s2 = result2.getBody();
391         Assertions.assertNotNull(s2);
392         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
393         while (t2.hasMoreTokens()) {
394             Assertions.assertEquals("0123456789abcdef", t2.nextToken());
395         }
396     }
397 
398     @Test
399     public void testLargeGetsPipelined() throws Exception {
400         final Http1TestServer server = resources.server();
401         final Http1TestClient client = resources.client();
402 
403         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
404         final InetSocketAddress serverEndpoint = server.start();
405 
406         client.start();
407         final Future<ClientSessionEndpoint> connectFuture = client.connect(
408                 "localhost", serverEndpoint.getPort(), TIMEOUT);
409         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
410 
411         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
412         for (int i = 0; i < 5; i++) {
413             queue.add(streamEndpoint.execute(
414                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
415                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
416         }
417         while (!queue.isEmpty()) {
418             final Future<Message<HttpResponse, String>> future = queue.remove();
419             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
420             Assertions.assertNotNull(result);
421             final HttpResponse response = result.getHead();
422             Assertions.assertNotNull(response);
423             Assertions.assertEquals(200, response.getCode());
424             final String entity = result.getBody();
425             Assertions.assertNotNull(entity);
426             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
427             while (t.hasMoreTokens()) {
428                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
429             }
430         }
431     }
432 
433     @Test
434     public void testBasicPost() throws Exception {
435         final Http1TestServer server = resources.server();
436         final Http1TestClient client = resources.client();
437 
438         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
439         final InetSocketAddress serverEndpoint = server.start();
440 
441         client.start();
442         final Future<ClientSessionEndpoint> connectFuture = client.connect(
443                 "localhost", serverEndpoint.getPort(), TIMEOUT);
444         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
445 
446         for (int i = 0; i < 5; i++) {
447             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
448                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
449                             AsyncEntityProducers.create("Hi there")),
450                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
451             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
452             Assertions.assertNotNull(result);
453             final HttpResponse response1 = result.getHead();
454             final String entity1 = result.getBody();
455             Assertions.assertNotNull(response1);
456             Assertions.assertEquals(200, response1.getCode());
457             Assertions.assertEquals("Hi back", entity1);
458         }
459     }
460 
461     @Test
462     public void testBasicPostPipelined() throws Exception {
463         final Http1TestServer server = resources.server();
464         final Http1TestClient client = resources.client();
465 
466         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
467         final InetSocketAddress serverEndpoint = server.start();
468 
469         client.start();
470         final Future<ClientSessionEndpoint> connectFuture = client.connect(
471                 "localhost", serverEndpoint.getPort(), TIMEOUT);
472         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
473 
474         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
475         for (int i = 0; i < 5; i++) {
476             queue.add(streamEndpoint.execute(
477                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
478                             AsyncEntityProducers.create("Hi there")),
479                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
480         }
481         while (!queue.isEmpty()) {
482             final Future<Message<HttpResponse, String>> future = queue.remove();
483             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
484             Assertions.assertNotNull(result);
485             final HttpResponse response = result.getHead();
486             final String entity = result.getBody();
487             Assertions.assertNotNull(response);
488             Assertions.assertEquals(200, response.getCode());
489             Assertions.assertEquals("Hi back", entity);
490         }
491     }
492 
493     @Test
494     public void testHttp10Post() throws Exception {
495         final Http1TestServer server = resources.server();
496         final Http1TestClient client = resources.client();
497 
498         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
499         final InetSocketAddress serverEndpoint = server.start();
500 
501         client.start();
502         final Future<ClientSessionEndpoint> connectFuture = client.connect(
503                 "localhost", serverEndpoint.getPort(), TIMEOUT);
504         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
505 
506         for (int i = 0; i < 5; i++) {
507             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
508             request.setVersion(HttpVersion.HTTP_1_0);
509             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
510                     new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
511                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
512             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
513             Assertions.assertNotNull(result);
514             final HttpResponse response1 = result.getHead();
515             final String entity1 = result.getBody();
516             Assertions.assertNotNull(response1);
517             Assertions.assertEquals(200, response1.getCode());
518             Assertions.assertEquals("Hi back", entity1);
519         }
520     }
521 
522     @Test
523     public void testNoEntityPost() throws Exception {
524         final Http1TestServer server = resources.server();
525         final Http1TestClient client = resources.client();
526 
527         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
528         final InetSocketAddress serverEndpoint = server.start();
529 
530         client.start();
531         final Future<ClientSessionEndpoint> connectFuture = client.connect(
532                 "localhost", serverEndpoint.getPort(), TIMEOUT);
533         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
534 
535         for (int i = 0; i < 5; i++) {
536             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
537             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
538                     new BasicRequestProducer(request, null),
539                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
540             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
541             Assertions.assertNotNull(result);
542             final HttpResponse response1 = result.getHead();
543             final String entity1 = result.getBody();
544             Assertions.assertNotNull(response1);
545             Assertions.assertEquals(200, response1.getCode());
546             Assertions.assertEquals("Hi back", entity1);
547         }
548     }
549 
550     @Test
551     public void testLargePost() throws Exception {
552         final Http1TestServer server = resources.server();
553         final Http1TestClient client = resources.client();
554 
555         server.register("*", () -> new EchoHandler(2048));
556         final InetSocketAddress serverEndpoint = server.start();
557 
558         client.start();
559         final Future<ClientSessionEndpoint> connectFuture = client.connect(
560                 "localhost", serverEndpoint.getPort(), TIMEOUT);
561         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
562 
563         for (int i = 0; i < 5; i++) {
564             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
565                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
566                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
567                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
568             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
569             Assertions.assertNotNull(result);
570             final HttpResponse response = result.getHead();
571             Assertions.assertNotNull(response);
572             Assertions.assertEquals(200, response.getCode());
573             final String entity = result.getBody();
574             Assertions.assertNotNull(entity);
575             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
576             while (t.hasMoreTokens()) {
577                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
578             }
579         }
580     }
581 
582     @Test
583     public void testPostsPipelinedLargeResponse() throws Exception {
584         final Http1TestServer server = resources.server();
585         final Http1TestClient client = resources.client();
586 
587         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
588         final InetSocketAddress serverEndpoint = server.start();
589 
590         client.start();
591         final Future<ClientSessionEndpoint> connectFuture = client.connect(
592                 "localhost", serverEndpoint.getPort(), TIMEOUT);
593         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
594 
595         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
596         for (int i = 0; i < 2; i++) {
597             queue.add(streamEndpoint.execute(
598                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
599                             AsyncEntityProducers.create("Hi there")),
600                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
601         }
602         while (!queue.isEmpty()) {
603             final Future<Message<HttpResponse, String>> future = queue.remove();
604             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
605             Assertions.assertNotNull(result);
606             final HttpResponse response = result.getHead();
607             Assertions.assertNotNull(response);
608             Assertions.assertEquals(200, response.getCode());
609             final String entity = result.getBody();
610             Assertions.assertNotNull(entity);
611             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
612             while (t.hasMoreTokens()) {
613                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
614             }
615         }
616     }
617 
618 
619     @Test
620     public void testLargePostsPipelined() throws Exception {
621         final Http1TestServer server = resources.server();
622         final Http1TestClient client = resources.client();
623 
624         server.register("*", () -> new EchoHandler(2048));
625         final InetSocketAddress serverEndpoint = server.start();
626 
627         client.start();
628         final Future<ClientSessionEndpoint> connectFuture = client.connect(
629                 "localhost", serverEndpoint.getPort(), TIMEOUT);
630         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
631 
632         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
633         for (int i = 0; i < 5; i++) {
634             queue.add(streamEndpoint.execute(
635                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
636                             new MultiLineEntityProducer("0123456789abcdef", 5000)),
637                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
638         }
639         while (!queue.isEmpty()) {
640             final Future<Message<HttpResponse, String>> future = queue.remove();
641             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
642             Assertions.assertNotNull(result);
643             final HttpResponse response = result.getHead();
644             Assertions.assertNotNull(response);
645             Assertions.assertEquals(200, response.getCode());
646             final String entity = result.getBody();
647             Assertions.assertNotNull(entity);
648             final StringTokenizer t = new StringTokenizer(entity, "\r\n");
649             while (t.hasMoreTokens()) {
650                 Assertions.assertEquals("0123456789abcdef", t.nextToken());
651             }
652         }
653     }
654 
655     @Test
656     public void testSimpleHead() throws Exception {
657         final Http1TestServer server = resources.server();
658         final Http1TestClient client = resources.client();
659 
660         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
661         final InetSocketAddress serverEndpoint = server.start();
662 
663         client.start();
664         final Future<ClientSessionEndpoint> connectFuture = client.connect(
665                 "localhost", serverEndpoint.getPort(), TIMEOUT);
666         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
667 
668         for (int i = 0; i < 5; i++) {
669             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
670                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
671                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
672             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
673             Assertions.assertNotNull(result);
674             final HttpResponse response1 = result.getHead();
675             Assertions.assertNotNull(response1);
676             Assertions.assertEquals(200, response1.getCode());
677             Assertions.assertNull(result.getBody());
678         }
679     }
680 
681     @Test
682     public void testSimpleHeadConnectionClose() throws Exception {
683         final Http1TestServer server = resources.server();
684         final Http1TestClient client = resources.client();
685 
686         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
687         final InetSocketAddress serverEndpoint = server.start();
688 
689         client.start();
690         final URI requestURI = createRequestURI(serverEndpoint, "/hello");
691         for (int i = 0; i < 5; i++) {
692             final Future<ClientSessionEndpoint> connectFuture = client.connect(
693                     "localhost", serverEndpoint.getPort(), TIMEOUT);
694             try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
695                 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
696                         AsyncRequestBuilder.head(requestURI)
697                                 .addHeader(HttpHeaders.CONNECTION, "close")
698                                 .build(),
699                         new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
700                 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
701                 Assertions.assertNotNull(result);
702                 final HttpResponse response1 = result.getHead();
703                 Assertions.assertNotNull(response1);
704                 Assertions.assertEquals(200, response1.getCode());
705                 Assertions.assertNull(result.getBody());
706             }
707         }
708     }
709 
710     @Test
711     public void testHeadPipelined() throws Exception {
712         final Http1TestServer server = resources.server();
713         final Http1TestClient client = resources.client();
714 
715         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
716         final InetSocketAddress serverEndpoint = server.start();
717 
718         client.start();
719         final Future<ClientSessionEndpoint> connectFuture = client.connect(
720                 "localhost", serverEndpoint.getPort(), TIMEOUT);
721         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
722 
723         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
724         for (int i = 0; i < 5; i++) {
725             queue.add(streamEndpoint.execute(
726                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
727                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
728         }
729         while (!queue.isEmpty()) {
730             final Future<Message<HttpResponse, String>> future = queue.remove();
731             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
732             Assertions.assertNotNull(result);
733             final HttpResponse response1 = result.getHead();
734             Assertions.assertNotNull(response1);
735             Assertions.assertEquals(200, response1.getCode());
736             Assertions.assertNull(result.getBody());
737         }
738     }
739 
740     @Test
741     public void testExpectationFailed() throws Exception {
742         final Http1TestServer server = resources.server();
743         final Http1TestClient client = resources.client();
744 
745         server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
746 
747             @Override
748             protected void handle(
749                     final Message<HttpRequest, String> request,
750                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
751                     final HttpContext context) throws IOException, HttpException {
752                 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
753 
754             }
755         });
756         final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
757 
758             @Override
759             protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
760                 final Header h = request.getFirstHeader("password");
761                 if (h != null && "secret".equals(h.getValue())) {
762                     return null;
763                 } else {
764                     return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
765                 }
766             }
767         }, Http1Config.DEFAULT);
768 
769         client.start();
770         final Future<IOSession> sessionFuture = client.requestSession(
771                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
772         final IOSession ioSession = sessionFuture.get();
773         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
774 
775         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
776         request1.addHeader("password", "secret");
777         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
778                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
779                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
780         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
781         Assertions.assertNotNull(result1);
782         final HttpResponse response1 = result1.getHead();
783         Assertions.assertNotNull(response1);
784         Assertions.assertEquals(200, response1.getCode());
785         Assertions.assertEquals("All is well", result1.getBody());
786 
787         Assertions.assertTrue(ioSession.isOpen());
788 
789         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
790         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
791                 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
792                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
793         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
794         Assertions.assertNotNull(result2);
795         final HttpResponse response2 = result2.getHead();
796         Assertions.assertNotNull(response2);
797         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
798         Assertions.assertEquals("You shall not pass", result2.getBody());
799 
800         Assertions.assertTrue(ioSession.isOpen());
801 
802         final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
803         request3.addHeader("password", "secret");
804         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
805                 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
806                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
807         final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
808         Assertions.assertNotNull(result3);
809         final HttpResponse response3 = result3.getHead();
810         Assertions.assertNotNull(response3);
811         Assertions.assertEquals(200, response3.getCode());
812         Assertions.assertEquals("All is well", result3.getBody());
813 
814         Assertions.assertTrue(ioSession.isOpen());
815 
816         final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
817         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
818                 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
819                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
820         final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
821         Assertions.assertNotNull(result4);
822         final HttpResponse response4 = result4.getHead();
823         Assertions.assertNotNull(response4);
824         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
825         Assertions.assertEquals("You shall not pass", result4.getBody());
826 
827         Assertions.assertFalse(ioSession.isOpen());
828     }
829 
830     @Test
831     public void testExpectationFailedCloseConnection() throws Exception {
832         final Http1TestServer server = resources.server();
833         final Http1TestClient client = resources.client();
834 
835         server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
836 
837             @Override
838             protected void handle(
839                     final Message<HttpRequest, String> request,
840                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
841                     final HttpContext context) throws IOException, HttpException {
842                 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
843 
844             }
845         });
846         final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
847 
848             @Override
849             protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
850                 final Header h = request.getFirstHeader("password");
851                 if (h != null && "secret".equals(h.getValue())) {
852                     return null;
853                 } else {
854                     final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
855                     response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
856                     return new BasicResponseProducer(response, "You shall not pass");
857                 }
858             }
859         }, Http1Config.DEFAULT);
860 
861         client.start();
862         final Future<IOSession> sessionFuture = client.requestSession(
863                 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
864         final IOSession ioSession = sessionFuture.get();
865         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
866 
867         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
868         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
869                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
870                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
871                         100000,
872                         ContentType.TEXT_PLAIN)),
873                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
874         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
875         Assertions.assertNotNull(result1);
876         final HttpResponse response1 = result1.getHead();
877         Assertions.assertNotNull(response1);
878         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
879         Assertions.assertNotNull("You shall not pass", result1.getBody());
880 
881         Assertions.assertFalse(streamEndpoint.isOpen());
882     }
883 
884     @Test
885     public void testDelayedExpectationVerification() throws Exception {
886         final Http1TestServer server = resources.server();
887         final Http1TestClient client = resources.client();
888 
889         server.register("*", () -> new AsyncServerExchangeHandler() {
890 
891             private final Random random = new Random(System.currentTimeMillis());
892             private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
893                     "All is well");
894 
895             @Override
896             public void handleRequest(
897                     final HttpRequest request,
898                     final EntityDetails entityDetails,
899                     final ResponseChannel responseChannel,
900                     final HttpContext context) throws HttpException, IOException {
901 
902                 Executors.newSingleThreadExecutor().execute(() -> {
903                     try {
904                         if (entityDetails != null) {
905                             final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
906                             if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
907                                 Thread.sleep(random.nextInt(1000));
908                                 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
909                             }
910                             final HttpResponse response = new BasicHttpResponse(200);
911                             synchronized (entityProducer) {
912                                 responseChannel.sendResponse(response, entityProducer, context);
913                             }
914                         }
915                     } catch (final Exception ignore) {
916                         // ignore
917                     }
918                 });
919 
920             }
921 
922             @Override
923             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
924                 capacityChannel.update(Integer.MAX_VALUE);
925             }
926 
927             @Override
928             public void consume(final ByteBuffer src) throws IOException {
929             }
930 
931             @Override
932             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
933             }
934 
935             @Override
936             public int available() {
937                 synchronized (entityProducer) {
938                     return entityProducer.available();
939                 }
940             }
941 
942             @Override
943             public void produce(final DataStreamChannel channel) throws IOException {
944                 synchronized (entityProducer) {
945                     entityProducer.produce(channel);
946                 }
947             }
948 
949             @Override
950             public void failed(final Exception cause) {
951             }
952 
953             @Override
954             public void releaseResources() {
955             }
956 
957         });
958         final InetSocketAddress serverEndpoint = server.start();
959 
960         client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
961         final Future<ClientSessionEndpoint> connectFuture = client.connect(
962                 "localhost", serverEndpoint.getPort(), TIMEOUT);
963         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
964 
965         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
966         for (int i = 0; i < 5; i++) {
967             queue.add(streamEndpoint.execute(
968                     new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
969                             AsyncEntityProducers.create("Some important message")),
970                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
971         }
972         while (!queue.isEmpty()) {
973             final Future<Message<HttpResponse, String>> future = queue.remove();
974             final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
975             Assertions.assertNotNull(result);
976             final HttpResponse response = result.getHead();
977             Assertions.assertNotNull(response);
978             Assertions.assertEquals(200, response.getCode());
979             Assertions.assertNotNull("All is well", result.getBody());
980         }
981     }
982 
983     @Test
984     public void testPrematureResponse() throws Exception {
985         final Http1TestServer server = resources.server();
986         final Http1TestClient client = resources.client();
987 
988         server.register("*", () -> new AsyncServerExchangeHandler() {
989 
990             private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
991 
992             @Override
993             public void handleRequest(
994                     final HttpRequest request,
995                     final EntityDetails entityDetails,
996                     final ResponseChannel responseChannel,
997                     final HttpContext context) throws HttpException, IOException {
998                 final AsyncResponseProducer producer;
999                 final Header h = request.getFirstHeader("password");
1000                 if (h != null && "secret".equals(h.getValue())) {
1001                     producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1002                 } else {
1003                     producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1004                 }
1005                 responseProducer.set(producer);
1006                 producer.sendResponse(responseChannel, context);
1007             }
1008 
1009             @Override
1010             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1011                 capacityChannel.update(Integer.MAX_VALUE);
1012             }
1013 
1014             @Override
1015             public void consume(final ByteBuffer src) throws IOException {
1016             }
1017 
1018             @Override
1019             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1020             }
1021 
1022             @Override
1023             public int available() {
1024                 final AsyncResponseProducer producer = responseProducer.get();
1025                 return producer.available();
1026             }
1027 
1028             @Override
1029             public void produce(final DataStreamChannel channel) throws IOException {
1030                 final AsyncResponseProducer producer = responseProducer.get();
1031                 producer.produce(channel);
1032             }
1033 
1034             @Override
1035             public void failed(final Exception cause) {
1036             }
1037 
1038             @Override
1039             public void releaseResources() {
1040             }
1041         });
1042         final InetSocketAddress serverEndpoint = server.start();
1043 
1044         client.start();
1045         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1046                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1047         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1048 
1049         for (int i = 0; i < 3; i++) {
1050             final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1051             final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1052                     new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1053                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1054             final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1055             Assertions.assertNotNull(result1);
1056             final HttpResponse response1 = result1.getHead();
1057             Assertions.assertNotNull(response1);
1058             Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1059             Assertions.assertNotNull("You shall not pass", result1.getBody());
1060 
1061             Assertions.assertTrue(streamEndpoint.isOpen());
1062         }
1063         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1064         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1065                 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1066                         new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1067                         100000,
1068                         ContentType.TEXT_PLAIN)),
1069                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1070         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1071         Assertions.assertNotNull(result1);
1072         final HttpResponse response1 = result1.getHead();
1073         Assertions.assertNotNull(response1);
1074         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1075         Assertions.assertNotNull("You shall not pass", result1.getBody());
1076     }
1077 
1078     @Test
1079     public void testSlowResponseConsumer() throws Exception {
1080         final Http1TestServer server = resources.server();
1081         final Http1TestClient client = resources.client();
1082 
1083         server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 100));
1084         final InetSocketAddress serverEndpoint = server.start();
1085 
1086         client.start(Http1Config.custom().setBufferSize(256).build());
1087 
1088         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1089                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1090         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1091 
1092         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1093         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1094                 new BasicRequestProducer(request1, null),
1095                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1096 
1097                     @Override
1098                     protected String consumeData(
1099                             final ContentType contentType, final InputStream inputStream) throws IOException {
1100                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1101 
1102                         final StringBuilder buffer = new StringBuilder();
1103                         try {
1104                             final byte[] tmp = new byte[16];
1105                             int l;
1106                             while ((l = inputStream.read(tmp)) != -1) {
1107                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1108                                 Thread.sleep(50);
1109                             }
1110                         } catch (final InterruptedException ex) {
1111                             Thread.currentThread().interrupt();
1112                             throw new InterruptedIOException(ex.getMessage());
1113                         }
1114                         return buffer.toString();
1115                     }
1116                 }),
1117                 null);
1118 
1119         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1120         Assertions.assertNotNull(result1);
1121         final HttpResponse response1 = result1.getHead();
1122         Assertions.assertNotNull(response1);
1123         Assertions.assertEquals(200, response1.getCode());
1124         final String s1 = result1.getBody();
1125         Assertions.assertNotNull(s1);
1126         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1127         while (t1.hasMoreTokens()) {
1128             Assertions.assertEquals("0123456789abcd", t1.nextToken());
1129         }
1130     }
1131 
1132     @Test
1133     public void testSlowRequestProducer() throws Exception {
1134         final Http1TestServer server = resources.server();
1135         final Http1TestClient client = resources.client();
1136 
1137         server.register("*", () -> new EchoHandler(2048));
1138         final InetSocketAddress serverEndpoint = server.start();
1139 
1140         client.start();
1141         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1142                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1143         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1144 
1145         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1146         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1147                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1148 
1149                     @Override
1150                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1151                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1152                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1153                             for (int i = 0; i < 500; i++) {
1154                                 if (i % 100 == 0) {
1155                                     writer.flush();
1156                                     Thread.sleep(500);
1157                                 }
1158                                 writer.write("0123456789abcdef\r\n");
1159                             }
1160                         } catch (final InterruptedException ex) {
1161                             Thread.currentThread().interrupt();
1162                             throw new InterruptedIOException(ex.getMessage());
1163                         }
1164                     }
1165 
1166                 }),
1167                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1168         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1169         Assertions.assertNotNull(result1);
1170         final HttpResponse response1 = result1.getHead();
1171         Assertions.assertNotNull(response1);
1172         Assertions.assertEquals(200, response1.getCode());
1173         final String s1 = result1.getBody();
1174         Assertions.assertNotNull(s1);
1175         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1176         while (t1.hasMoreTokens()) {
1177             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
1178         }
1179     }
1180 
1181     @Test
1182     public void testSlowResponseProducer() throws Exception {
1183         final Http1TestServer server = resources.server();
1184         final Http1TestClient client = resources.client();
1185 
1186         server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1187 
1188             @Override
1189             protected void handle(
1190                     final HttpRequest request,
1191                     final InputStream requestStream,
1192                     final HttpResponse response,
1193                     final OutputStream responseStream,
1194                     final HttpContext context) throws IOException, HttpException {
1195 
1196                 if (!"/hello".equals(request.getPath())) {
1197                     response.setCode(HttpStatus.SC_NOT_FOUND);
1198                     return;
1199                 }
1200                 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1201                     response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1202                     return;
1203                 }
1204                 if (requestStream == null) {
1205                     return;
1206                 }
1207                 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1208                 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1209                 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1210                 response.setCode(HttpStatus.SC_OK);
1211                 response.setHeader(h1);
1212                 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1213                      final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1214                     try {
1215                         String l;
1216                         int count = 0;
1217                         while ((l = reader.readLine()) != null) {
1218                             writer.write(l);
1219                             writer.write("\r\n");
1220                             count++;
1221                             if (count % 500 == 0) {
1222                                 Thread.sleep(500);
1223                             }
1224                         }
1225                         writer.flush();
1226                     } catch (final InterruptedException ex) {
1227                         Thread.currentThread().interrupt();
1228                         throw new InterruptedIOException(ex.getMessage());
1229                     }
1230                 }
1231             }
1232         });
1233         final InetSocketAddress serverEndpoint = server.start();
1234 
1235         client.start(Http1Config.custom().setBufferSize(256).build());
1236 
1237         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1238                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1239         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1240 
1241         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1242         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1243                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1244                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1245         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1246         Assertions.assertNotNull(result1);
1247         final HttpResponse response1 = result1.getHead();
1248         Assertions.assertNotNull(response1);
1249         Assertions.assertEquals(200, response1.getCode());
1250         final String s1 = result1.getBody();
1251         Assertions.assertNotNull(s1);
1252         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1253         while (t1.hasMoreTokens()) {
1254             Assertions.assertEquals("0123456789abcd", t1.nextToken());
1255         }
1256     }
1257 
1258     @Test
1259     public void testPipelinedConnectionClose() throws Exception {
1260         final Http1TestServer server = resources.server();
1261         final Http1TestClient client = resources.client();
1262 
1263         server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1264         final InetSocketAddress serverEndpoint = server.start();
1265 
1266         client.start();
1267         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1268                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1269         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1270 
1271         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1272                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1273                         AsyncEntityProducers.create("Hi there")),
1274                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1275         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1276         request2.addHeader(HttpHeaders.CONNECTION, "close");
1277         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1278                 new BasicRequestProducer(request2,
1279                         AsyncEntityProducers.create("Hi there")),
1280                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1281         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1282                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1283                         AsyncEntityProducers.create("Hi there")),
1284                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1285 
1286         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1287         Assertions.assertNotNull(result1);
1288         final HttpResponse response1 = result1.getHead();
1289         final String entity1 = result1.getBody();
1290         Assertions.assertNotNull(response1);
1291         Assertions.assertEquals(200, response1.getCode());
1292         Assertions.assertEquals("Hi back", entity1);
1293 
1294         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1295         Assertions.assertNotNull(result2);
1296         final HttpResponse response2 = result2.getHead();
1297         final String entity2 = result2.getBody();
1298         Assertions.assertNotNull(response2);
1299         Assertions.assertEquals(200, response2.getCode());
1300         Assertions.assertEquals("Hi back", entity2);
1301 
1302         final Exception exception = Assertions.assertThrows(Exception.class, () ->
1303                 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1304         assertThat(exception, CoreMatchers.anyOf(
1305                 CoreMatchers.instanceOf(CancellationException.class),
1306                 CoreMatchers.instanceOf(ExecutionException.class)));
1307 
1308         final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1309                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1310                         AsyncEntityProducers.create("Hi there")),
1311                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1312         final Exception exception2 = Assertions.assertThrows(Exception.class, () ->
1313                 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1314         assertThat(exception2, CoreMatchers.anyOf(
1315                 CoreMatchers.instanceOf(CancellationException.class),
1316                 CoreMatchers.instanceOf(ExecutionException.class)));
1317     }
1318 
1319     @Test
1320     public void testPipelinedInvalidRequest() throws Exception {
1321         final Http1TestServer server = resources.server();
1322         final Http1TestClient client = resources.client();
1323 
1324         server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1325         final InetSocketAddress serverEndpoint = server.start();
1326 
1327         client.start();
1328         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1329                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1330         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1331 
1332         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1333                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1334                         AsyncEntityProducers.create("Hi there")),
1335                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1336         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1337         request2.addHeader(HttpHeaders.HOST, "blah:blah");
1338         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1339                 new BasicRequestProducer(request2,
1340                         AsyncEntityProducers.create("Hi there")),
1341                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1342         final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1343                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1344                         AsyncEntityProducers.create("Hi there")),
1345                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1346 
1347         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1348         Assertions.assertNotNull(result1);
1349         final HttpResponse response1 = result1.getHead();
1350         final String entity1 = result1.getBody();
1351         Assertions.assertNotNull(response1);
1352         Assertions.assertEquals(200, response1.getCode());
1353         Assertions.assertEquals("Hi back", entity1);
1354 
1355         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1356         Assertions.assertNotNull(result2);
1357         final HttpResponse response2 = result2.getHead();
1358         final String entity2 = result2.getBody();
1359         Assertions.assertNotNull(response2);
1360         Assertions.assertEquals(400, response2.getCode());
1361         Assertions.assertTrue(entity2.length() > 0);
1362 
1363 
1364         final Exception exception = Assertions.assertThrows(Exception.class, () ->
1365                 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1366         assertThat(exception, CoreMatchers.anyOf(
1367                 CoreMatchers.instanceOf(CancellationException.class),
1368                 CoreMatchers.instanceOf(ExecutionException.class)));
1369     }
1370 
1371     private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1372 
1373     private static class BrokenChunkEncoder extends AbstractContentEncoder {
1374 
1375         private final CharArrayBuffer lineBuffer;
1376         private boolean done;
1377 
1378         BrokenChunkEncoder(
1379                 final WritableByteChannel channel,
1380                 final SessionOutputBuffer buffer,
1381                 final BasicHttpTransportMetrics metrics) {
1382             super(channel, buffer, metrics);
1383             lineBuffer = new CharArrayBuffer(16);
1384         }
1385 
1386         @Override
1387         public void complete(final List<? extends Header> trailers) throws IOException {
1388             super.complete(trailers);
1389         }
1390 
1391         @Override
1392         public int write(final ByteBuffer src) throws IOException {
1393             final int chunk;
1394             if (!done) {
1395                 lineBuffer.clear();
1396                 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1397                 buffer().writeLine(lineBuffer);
1398                 buffer().write(ByteBuffer.wrap(GARBAGE));
1399                 done = true;
1400                 chunk = GARBAGE.length;
1401             } else {
1402                 chunk = 0;
1403             }
1404             final long bytesWritten = buffer().flush(channel());
1405             if (bytesWritten > 0) {
1406                 metrics().incrementBytesTransferred(bytesWritten);
1407             }
1408             if (!buffer().hasData()) {
1409                 channel().close();
1410             }
1411             return chunk;
1412         }
1413 
1414     }
1415 
1416     @Test
1417     public void testTruncatedChunk() throws Exception {
1418         final Http1TestServer server = resources.server();
1419         final Http1TestClient client = resources.client();
1420 
1421         final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1422                 HttpProcessors.server(),
1423                 (request, context) -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1424 
1425                     @Override
1426                     protected void handle(
1427                             final Message<HttpRequest, String> request,
1428                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1429                             final HttpContext context) throws IOException, HttpException {
1430                         responseTrigger.submitResponse(
1431                                 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1432                     }
1433 
1434                 },
1435                 Http1Config.DEFAULT,
1436                 CharCodingConfig.DEFAULT,
1437                 DefaultConnectionReuseStrategy.INSTANCE,
1438                 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1439 
1440             @Override
1441             protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1442                     final ProtocolIOSession ioSession,
1443                     final HttpProcessor httpProcessor,
1444                     final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1445                     final Http1Config http1Config,
1446                     final CharCodingConfig connectionConfig,
1447                     final ConnectionReuseStrategy connectionReuseStrategy,
1448                     final NHttpMessageParser<HttpRequest> incomingMessageParser,
1449                     final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1450                     final ContentLengthStrategy incomingContentStrategy,
1451                     final ContentLengthStrategy outgoingContentStrategy,
1452                     final Http1StreamListener streamListener) {
1453                 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1454                         scheme.id,
1455                         http1Config, connectionConfig, connectionReuseStrategy,
1456                         incomingMessageParser, outgoingMessageWriter,
1457                         incomingContentStrategy, outgoingContentStrategy,
1458                         streamListener) {
1459 
1460                     @Override
1461                     protected ContentEncoder createContentEncoder(
1462                             final long len,
1463                             final WritableByteChannel channel,
1464                             final SessionOutputBuffer buffer,
1465                             final BasicHttpTransportMetrics metrics) throws HttpException {
1466                         if (len == ContentLengthStrategy.CHUNKED) {
1467                             return new BrokenChunkEncoder(channel, buffer, metrics);
1468                         } else {
1469                             return super.createContentEncoder(len, channel, buffer, metrics);
1470                         }
1471                     }
1472 
1473                 };
1474             }
1475 
1476         });
1477 
1478         client.start();
1479         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1480                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1481         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1482 
1483         final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1484         final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1485 
1486             @Override
1487             public void releaseResources() {
1488                 // Do not clear internal content buffer
1489             }
1490 
1491         };
1492         final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1493         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1494         final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
1495                 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1496         final Throwable cause = exception.getCause();
1497         Assertions.assertTrue(cause instanceof MalformedChunkCodingException);
1498         Assertions.assertEquals("garbage", entityConsumer.generateContent());
1499     }
1500 
1501     @Test
1502     public void testExceptionInHandler() throws Exception {
1503         final Http1TestServer server = resources.server();
1504         final Http1TestClient client = resources.client();
1505 
1506         server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1507 
1508             @Override
1509             protected void handle(
1510                     final Message<HttpRequest, String> request,
1511                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1512                     final HttpContext context) throws IOException, HttpException {
1513                 throw new HttpException("Boom");
1514             }
1515         });
1516         final InetSocketAddress serverEndpoint = server.start();
1517 
1518         client.start();
1519         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1520                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1521         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1522 
1523         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1524                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1525                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1526         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1527         Assertions.assertNotNull(result);
1528         final HttpResponse response1 = result.getHead();
1529         final String entity1 = result.getBody();
1530         Assertions.assertNotNull(response1);
1531         Assertions.assertEquals(500, response1.getCode());
1532         Assertions.assertEquals("Boom", entity1);
1533     }
1534 
1535     @Test
1536     public void testNoServiceHandler() throws Exception {
1537         final Http1TestServer server = resources.server();
1538         final Http1TestClient client = resources.client();
1539 
1540         final InetSocketAddress serverEndpoint = server.start();
1541 
1542         client.start();
1543         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1544                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1545         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1546 
1547         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1548                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1549                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1550         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1551         Assertions.assertNotNull(result);
1552         final HttpResponse response1 = result.getHead();
1553         final String entity1 = result.getBody();
1554         Assertions.assertNotNull(response1);
1555         Assertions.assertEquals(404, response1.getCode());
1556         Assertions.assertEquals("Resource not found", entity1);
1557     }
1558 
1559     @Test
1560     public void testResponseNoContent() throws Exception {
1561         final Http1TestServer server = resources.server();
1562         final Http1TestClient client = resources.client();
1563 
1564         server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1565 
1566             @Override
1567             protected void handle(
1568                     final Message<HttpRequest, String> request,
1569                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1570                     final HttpContext context) throws IOException, HttpException {
1571                 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1572                 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1573             }
1574         });
1575         final InetSocketAddress serverEndpoint = server.start();
1576 
1577         client.start();
1578         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1579                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1580         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1581 
1582         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1583                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1584                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1585         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1586         Assertions.assertNotNull(result);
1587         final HttpResponse response1 = result.getHead();
1588         Assertions.assertNotNull(response1);
1589         Assertions.assertEquals(204, response1.getCode());
1590         Assertions.assertNull(result.getBody());
1591     }
1592 
1593     @Test
1594     public void testAbsentHostHeader() throws Exception {
1595         final Http1TestServer server = resources.server();
1596         final Http1TestClient client = resources.client();
1597 
1598         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1599         final InetSocketAddress serverEndpoint = server.start();
1600 
1601         client.start(new DefaultHttpProcessor(RequestContent.INSTANCE, RequestConnControl.INSTANCE), Http1Config.DEFAULT);
1602 
1603         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1604                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1605         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1606 
1607         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1608         request1.setVersion(HttpVersion.HTTP_1_0);
1609         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1610                 new BasicRequestProducer(request1, null),
1611                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1612         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1613         Assertions.assertNotNull(result1);
1614         final HttpResponse response1 = result1.getHead();
1615         Assertions.assertNotNull(response1);
1616         Assertions.assertEquals(200, response1.getCode());
1617         Assertions.assertEquals("Hi there", result1.getBody());
1618 
1619         final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1620         request2.setVersion(HttpVersion.HTTP_1_1);
1621         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1622                 new BasicRequestProducer(request2, null),
1623                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1624         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1625         Assertions.assertNotNull(result2);
1626         final HttpResponse response2 = result2.getHead();
1627         Assertions.assertNotNull(response2);
1628         Assertions.assertEquals(400, response2.getCode());
1629         Assertions.assertEquals("Host header is absent", result2.getBody());
1630     }
1631 
1632     @Test
1633     public void testMessageWithTrailers() throws Exception {
1634         final Http1TestServer server = resources.server();
1635         final Http1TestClient client = resources.client();
1636 
1637         server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1638 
1639             @Override
1640             protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1641                     final HttpRequest request,
1642                     final EntityDetails entityDetails,
1643                     final HttpContext context) throws HttpException {
1644                 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1645             }
1646 
1647             @Override
1648             protected void handle(
1649                     final Message<HttpRequest, String> requestMessage,
1650                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1651                     final HttpContext context) throws HttpException, IOException {
1652                 responseTrigger.submitResponse(new BasicResponseProducer(
1653                         HttpStatus.SC_OK,
1654                         new DigestingEntityProducer("MD5",
1655                                 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1656             }
1657         });
1658         final InetSocketAddress serverEndpoint = server.start();
1659 
1660         client.start();
1661 
1662         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1663                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1664         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1665 
1666         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1667         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1668         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1669                 new BasicRequestProducer(request1, null),
1670                 new BasicResponseConsumer<>(entityConsumer), null);
1671         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1672         Assertions.assertNotNull(result1);
1673         final HttpResponse response1 = result1.getHead();
1674         Assertions.assertNotNull(response1);
1675         Assertions.assertEquals(200, response1.getCode());
1676         Assertions.assertEquals("Hello back with some trailers", result1.getBody());
1677 
1678         final List<Header> trailers = entityConsumer.getTrailers();
1679         Assertions.assertNotNull(trailers);
1680         Assertions.assertEquals(2, trailers.size());
1681         final Map<String, String> map = new HashMap<>();
1682         for (final Header header: trailers) {
1683             map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
1684         }
1685         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1686         Assertions.assertEquals("MD5", map.get("digest-algo"));
1687         Assertions.assertEquals(digest, map.get("digest"));
1688     }
1689 
1690     @Test
1691     public void testProtocolException() throws Exception {
1692         final Http1TestServer server = resources.server();
1693         final Http1TestClient client = resources.client();
1694 
1695         server.register("/boom", () -> new AsyncServerExchangeHandler() {
1696 
1697             private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1698 
1699             @Override
1700             public void releaseResources() {
1701                 entityProducer.releaseResources();
1702             }
1703 
1704             @Override
1705             public void handleRequest(
1706                     final HttpRequest request,
1707                     final EntityDetails entityDetails,
1708                     final ResponseChannel responseChannel,
1709                     final HttpContext context) throws HttpException, IOException {
1710                 final String requestUri = request.getRequestUri();
1711                 if (requestUri.endsWith("boom")) {
1712                     throw new ProtocolException("Boom!!!");
1713                 }
1714                 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1715             }
1716 
1717             @Override
1718             public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1719                 capacityChannel.update(Integer.MAX_VALUE);
1720             }
1721 
1722             @Override
1723             public void consume(final ByteBuffer src) throws IOException {
1724             }
1725 
1726             @Override
1727             public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1728                 // empty
1729             }
1730 
1731             @Override
1732             public int available() {
1733                 return entityProducer.available();
1734             }
1735 
1736             @Override
1737             public void produce(final DataStreamChannel channel) throws IOException {
1738                 entityProducer.produce(channel);
1739             }
1740 
1741             @Override
1742             public void failed(final Exception cause) {
1743                 releaseResources();
1744             }
1745 
1746         });
1747 
1748         final InetSocketAddress serverEndpoint = server.start();
1749 
1750         client.start();
1751         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1752                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1753         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1754         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1755                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1756                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1757         final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1758         Assertions.assertNotNull(result);
1759         final HttpResponse response1 = result.getHead();
1760         final String entity1 = result.getBody();
1761         Assertions.assertNotNull(response1);
1762         Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1763         Assertions.assertEquals("Boom!!!", entity1);
1764     }
1765 
1766     @Test
1767     public void testHeaderTooLarge() throws Exception {
1768         final Http1TestServer server = resources.server();
1769         final Http1TestClient client = resources.client();
1770 
1771         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1772         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1773                 .setMaxLineLength(100)
1774                 .build());
1775         client.start();
1776 
1777         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1778                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1779         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1780 
1781         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1782         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1783                 "1234567890123456789012345678901234567890");
1784         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1785                 new BasicRequestProducer(request1, null),
1786                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1787         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1788         Assertions.assertNotNull(result1);
1789         final HttpResponse response1 = result1.getHead();
1790         Assertions.assertNotNull(response1);
1791         Assertions.assertEquals(431, response1.getCode());
1792         Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1793     }
1794 
1795     @Test
1796     public void testHeaderTooLargePost() throws Exception {
1797         final Http1TestServer server = resources.server();
1798         final Http1TestClient client = resources.client();
1799 
1800         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1801         final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1802                 .setMaxLineLength(100)
1803                 .build());
1804         client.start(
1805                 new DefaultHttpProcessor(RequestContent.INSTANCE, RequestTargetHost.INSTANCE, RequestConnControl.INSTANCE), null);
1806 
1807         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1808                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1809         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1810 
1811         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1812         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1813                 "1234567890123456789012345678901234567890");
1814 
1815         final byte[] b = new byte[2048];
1816         for (int i = 0; i < b.length; i++) {
1817             b[i] = (byte) ('a' + i % 10);
1818         }
1819 
1820         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1821                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1822                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1823         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1824         Assertions.assertNotNull(result1);
1825         final HttpResponse response1 = result1.getHead();
1826         Assertions.assertNotNull(response1);
1827         Assertions.assertEquals(431, response1.getCode());
1828         Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1829     }
1830 
1831 }