View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.BufferedReader;
31  import java.io.BufferedWriter;
32  import java.io.IOException;
33  import java.io.InputStream;
34  import java.io.InputStreamReader;
35  import java.io.InterruptedIOException;
36  import java.io.OutputStream;
37  import java.io.OutputStreamWriter;
38  import java.net.InetSocketAddress;
39  import java.net.URI;
40  import java.net.URISyntaxException;
41  import java.nio.ByteBuffer;
42  import java.nio.charset.Charset;
43  import java.nio.charset.StandardCharsets;
44  import java.util.Arrays;
45  import java.util.Collection;
46  import java.util.HashMap;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Locale;
50  import java.util.Map;
51  import java.util.Queue;
52  import java.util.StringTokenizer;
53  import java.util.concurrent.BlockingQueue;
54  import java.util.concurrent.CountDownLatch;
55  import java.util.concurrent.ExecutionException;
56  import java.util.concurrent.Executors;
57  import java.util.concurrent.Future;
58  import java.util.concurrent.LinkedBlockingDeque;
59  import java.util.concurrent.TimeUnit;
60  import java.util.concurrent.atomic.AtomicInteger;
61  import java.util.concurrent.atomic.AtomicReference;
62  
63  import org.apache.hc.core5.function.Callback;
64  import org.apache.hc.core5.function.Decorator;
65  import org.apache.hc.core5.function.Supplier;
66  import org.apache.hc.core5.http.ContentType;
67  import org.apache.hc.core5.http.EndpointDetails;
68  import org.apache.hc.core5.http.EntityDetails;
69  import org.apache.hc.core5.http.Header;
70  import org.apache.hc.core5.http.HeaderElements;
71  import org.apache.hc.core5.http.HttpException;
72  import org.apache.hc.core5.http.HttpHeaders;
73  import org.apache.hc.core5.http.HttpHost;
74  import org.apache.hc.core5.http.HttpRequest;
75  import org.apache.hc.core5.http.HttpResponse;
76  import org.apache.hc.core5.http.HttpStatus;
77  import org.apache.hc.core5.http.Message;
78  import org.apache.hc.core5.http.Method;
79  import org.apache.hc.core5.http.ProtocolException;
80  import org.apache.hc.core5.http.URIScheme;
81  import org.apache.hc.core5.http.message.BasicHttpRequest;
82  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
83  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
84  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
85  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
86  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
87  import org.apache.hc.core5.http.nio.CapacityChannel;
88  import org.apache.hc.core5.http.nio.DataStreamChannel;
89  import org.apache.hc.core5.http.nio.HandlerFactory;
90  import org.apache.hc.core5.http.nio.ResponseChannel;
91  import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
92  import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
93  import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
94  import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
95  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
96  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
97  import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
98  import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
99  import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
100 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
101 import org.apache.hc.core5.http.nio.support.BasicPushProducer;
102 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
103 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
104 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
105 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
106 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
107 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
108 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
109 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
110 import org.apache.hc.core5.http.protocol.HttpContext;
111 import org.apache.hc.core5.http.protocol.HttpCoreContext;
112 import org.apache.hc.core5.http2.H2Error;
113 import org.apache.hc.core5.http2.H2StreamResetException;
114 import org.apache.hc.core5.http2.config.H2Config;
115 import org.apache.hc.core5.http2.nio.command.PingCommand;
116 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
117 import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
118 import org.apache.hc.core5.http2.protocol.H2RequestContent;
119 import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
120 import org.apache.hc.core5.reactor.Command;
121 import org.apache.hc.core5.reactor.IOReactorConfig;
122 import org.apache.hc.core5.reactor.IOSession;
123 import org.apache.hc.core5.testing.SSLTestContexts;
124 import org.apache.hc.core5.util.TextUtils;
125 import org.apache.hc.core5.util.TimeValue;
126 import org.apache.hc.core5.util.Timeout;
127 import org.hamcrest.CoreMatchers;
128 import org.hamcrest.MatcherAssert;
129 import org.junit.After;
130 import org.junit.Assert;
131 import org.junit.Before;
132 import org.junit.Test;
133 import org.junit.runner.RunWith;
134 import org.junit.runners.Parameterized;
135 import org.slf4j.Logger;
136 import org.slf4j.LoggerFactory;
137 
138 @RunWith(Parameterized.class)
139 public class H2IntegrationTest extends InternalH2ServerTestBase {
140 
141     private final Logger log = LoggerFactory.getLogger(getClass());
142 
143     @Parameterized.Parameters(name = "{0}")
144     public static Collection<Object[]> protocols() {
145         return Arrays.asList(new Object[][]{
146                 { URIScheme.HTTP },
147                 { URIScheme.HTTPS }
148         });
149     }
150 
151     public H2IntegrationTest(final URIScheme scheme) {
152         super(scheme);
153     }
154 
155     private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
156     private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
157 
158     private H2TestClient client;
159 
160     @Before
161     public void setup() throws Exception {
162         log.debug("Starting up test client");
163         client = new H2TestClient(buildReactorConfig(),
164                 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
165     }
166 
167     protected IOReactorConfig buildReactorConfig() {
168         return IOReactorConfig.DEFAULT;
169     }
170 
171     @After
172     public void cleanup() throws Exception {
173         log.debug("Shutting down test client");
174         if (client != null) {
175             client.shutdown(TimeValue.ofSeconds(5));
176         }
177     }
178 
179     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
180         try {
181             return new URI("http", null, "localhost", serverEndpoint.getPort(), path, null, null);
182         } catch (final URISyntaxException e) {
183             throw new IllegalStateException();
184         }
185     }
186 
187     @Test
188     public void testSimpleGet() throws Exception {
189         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
190 
191             @Override
192             public AsyncServerExchangeHandler get() {
193                 return new SingleLineResponseHandler("Hi there");
194             }
195 
196         });
197         final InetSocketAddress serverEndpoint = server.start();
198 
199         client.start();
200         final Future<ClientSessionEndpoint> connectFuture = client.connect(
201                 "localhost", serverEndpoint.getPort(), TIMEOUT);
202         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
203 
204         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
205         for (int i = 0; i < 10; i++) {
206             queue.add(streamEndpoint.execute(
207                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
208                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
209 
210         }
211         while (!queue.isEmpty()) {
212             final Future<Message<HttpResponse, String>> future = queue.remove();
213             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
214             Assert.assertNotNull(result);
215             final HttpResponse response = result.getHead();
216             final String entity = result.getBody();
217             Assert.assertNotNull(response);
218             Assert.assertEquals(200, response.getCode());
219             Assert.assertEquals("Hi there", entity);
220         }
221     }
222 
223     @Test
224     public void testSimpleHead() throws Exception {
225         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
226 
227             @Override
228             public AsyncServerExchangeHandler get() {
229                 return new SingleLineResponseHandler("Hi there");
230             }
231 
232         });
233         final InetSocketAddress serverEndpoint = server.start();
234 
235         client.start();
236         final Future<ClientSessionEndpoint> connectFuture = client.connect(
237                 "localhost", serverEndpoint.getPort(), TIMEOUT);
238         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
239 
240         for (int i = 0; i < 5; i++) {
241             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
242                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
243                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
244             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
245             Assert.assertNotNull(result);
246             final HttpResponse response1 = result.getHead();
247             Assert.assertNotNull(response1);
248             Assert.assertEquals(200, response1.getCode());
249             Assert.assertNull(result.getBody());
250         }
251     }
252 
253     @Test
254     public void testLargeGet() throws Exception {
255         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
256 
257             @Override
258             public AsyncServerExchangeHandler get() {
259                 return new MultiLineResponseHandler("0123456789abcdef", 5000);
260             }
261 
262         });
263         final InetSocketAddress serverEndpoint = server.start();
264 
265         client.start();
266         final Future<ClientSessionEndpoint> connectFuture = client.connect(
267                 "localhost", serverEndpoint.getPort(), TIMEOUT);
268         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
269 
270         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
271                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
272                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
273 
274         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
275                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
276                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
277 
278         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
279         Assert.assertNotNull(result1);
280         final HttpResponse response1 = result1.getHead();
281         Assert.assertNotNull(response1);
282         Assert.assertEquals(200, response1.getCode());
283         final String s1 = result1.getBody();
284         Assert.assertNotNull(s1);
285         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
286         while (t1.hasMoreTokens()) {
287             Assert.assertEquals("0123456789abcdef", t1.nextToken());
288         }
289 
290         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
291         Assert.assertNotNull(result2);
292         final HttpResponse response2 = result2.getHead();
293         Assert.assertNotNull(response2);
294         Assert.assertEquals(200, response2.getCode());
295         final String s2 = result2.getBody();
296         Assert.assertNotNull(s2);
297         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
298         while (t2.hasMoreTokens()) {
299             Assert.assertEquals("0123456789abcdef", t2.nextToken());
300         }
301     }
302 
303     @Test
304     public void testBasicPost() throws Exception {
305         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
306 
307             @Override
308             public AsyncServerExchangeHandler get() {
309                 return new SingleLineResponseHandler("Hi back");
310             }
311 
312         });
313         final InetSocketAddress serverEndpoint = server.start();
314 
315         client.start();
316         final Future<ClientSessionEndpoint> connectFuture = client.connect(
317                 "localhost", serverEndpoint.getPort(), TIMEOUT);
318         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
319 
320         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
321         for (int i = 0; i < 10; i++) {
322             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
323             queue.add(streamEndpoint.execute(
324                     new BasicRequestProducer(request, new StringAsyncEntityProducer("Hi there", ContentType.TEXT_PLAIN)),
325                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
326 
327         }
328         while (!queue.isEmpty()) {
329             final Future<Message<HttpResponse, String>> future = queue.remove();
330             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
331             Assert.assertNotNull(result);
332             final HttpResponse response = result.getHead();
333             final String entity1 = result.getBody();
334             Assert.assertNotNull(response);
335             Assert.assertEquals(200, response.getCode());
336             Assert.assertEquals("Hi back", entity1);
337         }
338     }
339 
340     @Test
341     public void testLargePost() throws Exception {
342         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
343 
344             @Override
345             public AsyncServerExchangeHandler get() {
346                 return new EchoHandler(2048);
347             }
348 
349         });
350         final InetSocketAddress serverEndpoint = server.start();
351 
352         client.start();
353         final Future<ClientSessionEndpoint> connectFuture = client.connect(
354                 "localhost", serverEndpoint.getPort(), TIMEOUT);
355         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
356 
357         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
358                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
359                         new MultiLineEntityProducer("0123456789abcdef", 5000)),
360                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
361         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
362         Assert.assertNotNull(result1);
363         final HttpResponse response1 = result1.getHead();
364         Assert.assertNotNull(response1);
365         Assert.assertEquals(200, response1.getCode());
366         final String s1 = result1.getBody();
367         Assert.assertNotNull(s1);
368         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
369         while (t1.hasMoreTokens()) {
370             Assert.assertEquals("0123456789abcdef", t1.nextToken());
371         }
372     }
373 
374     @Test
375     public void testSlowResponseConsumer() throws Exception {
376         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
377 
378             @Override
379             public AsyncServerExchangeHandler get() {
380                 return new MultiLineResponseHandler("0123456789abcd", 3);
381             }
382 
383         });
384         final InetSocketAddress serverEndpoint = server.start();
385 
386         client.start(H2Config.custom().setInitialWindowSize(16).build());
387         final Future<ClientSessionEndpoint> connectFuture = client.connect(
388                 "localhost", serverEndpoint.getPort(), TIMEOUT);
389         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
390 
391         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
392                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
393                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
394 
395                     @Override
396                     protected String consumeData(
397                             final ContentType contentType, final InputStream inputStream) throws IOException {
398                         Charset charset = contentType != null ? contentType.getCharset() : null;
399                         if (charset == null) {
400                             charset = StandardCharsets.US_ASCII;
401                         }
402 
403                         final StringBuilder buffer = new StringBuilder();
404                         try {
405                             final byte[] tmp = new byte[16];
406                             int l;
407                             while ((l = inputStream.read(tmp)) != -1) {
408                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
409                                 Thread.sleep(500);
410                             }
411                         } catch (final InterruptedException ex) {
412                             Thread.currentThread().interrupt();
413                             throw new InterruptedIOException(ex.getMessage());
414                         }
415                         return buffer.toString();
416                     }
417                 }),
418                 null);
419 
420         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
421         Assert.assertNotNull(result1);
422         final HttpResponse response1 = result1.getHead();
423         Assert.assertNotNull(response1);
424         Assert.assertEquals(200, response1.getCode());
425         final String s1 = result1.getBody();
426         Assert.assertNotNull(s1);
427         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
428         while (t1.hasMoreTokens()) {
429             Assert.assertEquals("0123456789abcd", t1.nextToken());
430         }
431     }
432 
433     @Test
434     public void testSlowRequestProducer() throws Exception {
435         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
436 
437             @Override
438             public AsyncServerExchangeHandler get() {
439                 return new EchoHandler(2048);
440             }
441 
442         });
443         final InetSocketAddress serverEndpoint = server.start();
444 
445         client.start();
446         final Future<ClientSessionEndpoint> connectFuture = client.connect(
447                 "localhost", serverEndpoint.getPort(), TIMEOUT);
448         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
449 
450         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
451         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
452                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
453 
454                     @Override
455                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
456                         Charset charset = contentType.getCharset();
457                         if (charset == null) {
458                             charset = StandardCharsets.US_ASCII;
459                         }
460                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
461                             for (int i = 0; i < 500; i++) {
462                                 if (i % 100 == 0) {
463                                     writer.flush();
464                                     Thread.sleep(500);
465                                 }
466                                 writer.write("0123456789abcdef\r\n");
467                             }
468                         } catch (final InterruptedException ex) {
469                             Thread.currentThread().interrupt();
470                             throw new InterruptedIOException(ex.getMessage());
471                         }
472                     }
473 
474                 }),
475                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
476         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
477         Assert.assertNotNull(result1);
478         final HttpResponse response1 = result1.getHead();
479         Assert.assertNotNull(response1);
480         Assert.assertEquals(200, response1.getCode());
481         final String s1 = result1.getBody();
482         Assert.assertNotNull(s1);
483         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
484         while (t1.hasMoreTokens()) {
485             Assert.assertEquals("0123456789abcdef", t1.nextToken());
486         }
487     }
488 
489     @Test
490     public void testSlowResponseProducer() throws Exception {
491         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
492 
493             @Override
494             public AsyncServerExchangeHandler get() {
495                 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
496 
497                     @Override
498                     protected void handle(
499                             final HttpRequest request,
500                             final InputStream requestStream,
501                             final HttpResponse response,
502                             final OutputStream responseStream,
503                             final HttpContext context) throws IOException, HttpException {
504 
505                         if (!"/hello".equals(request.getPath())) {
506                             response.setCode(HttpStatus.SC_NOT_FOUND);
507                             return;
508                         }
509                         if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
510                             response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
511                             return;
512                         }
513                         if (requestStream == null) {
514                             return;
515                         }
516                         final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
517                         final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
518                         Charset charset = contentType != null ? contentType.getCharset() : null;
519                         if (charset == null) {
520                             charset = StandardCharsets.US_ASCII;
521                         }
522                         response.setCode(HttpStatus.SC_OK);
523                         response.setHeader(h1);
524                         try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
525                             final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
526                             try {
527                                 String l;
528                                 int count = 0;
529                                 while ((l = reader.readLine()) != null) {
530                                     writer.write(l);
531                                     writer.write("\r\n");
532                                     count++;
533                                     if (count % 500 == 0) {
534                                         Thread.sleep(500);
535                                     }
536                                 }
537                                 writer.flush();
538                             } catch (final InterruptedException ex) {
539                                 Thread.currentThread().interrupt();
540                                 throw new InterruptedIOException(ex.getMessage());
541                             }
542                         }
543                     }
544                 };
545             }
546 
547         });
548         final InetSocketAddress serverEndpoint = server.start();
549 
550         client.start(H2Config.custom()
551                 .setInitialWindowSize(512)
552                 .build());
553 
554         final Future<ClientSessionEndpoint> connectFuture = client.connect(
555                 "localhost", serverEndpoint.getPort(), TIMEOUT);
556         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
557 
558         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
559         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
560                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
561                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
562         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
563         Assert.assertNotNull(result1);
564         final HttpResponse response1 = result1.getHead();
565         Assert.assertNotNull(response1);
566         Assert.assertEquals(200, response1.getCode());
567         final String s1 = result1.getBody();
568         Assert.assertNotNull(s1);
569         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
570         while (t1.hasMoreTokens()) {
571             Assert.assertEquals("0123456789abcd", t1.nextToken());
572         }
573     }
574 
575     @Test
576     public void testPush() throws Exception {
577         final InetSocketAddress serverEndpoint = server.start();
578         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
579 
580             @Override
581             public AsyncServerExchangeHandler get() {
582                 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
583 
584                     @Override
585                     protected void handle(
586                             final Message<HttpRequest, Void> request,
587                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
588                             final HttpContext context) throws IOException, HttpException {
589                         responseTrigger.pushPromise(
590                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
591                                 context,
592                                 new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)));
593                         responseTrigger.submitResponse(
594                                 AsyncResponseBuilder.create(HttpStatus.SC_OK).setEntity("Hi there", ContentType.TEXT_PLAIN).build(),
595                                 context);
596                     }
597                 };
598             }
599 
600         });
601 
602         client.start(H2Config.custom().setPushEnabled(true).build());
603 
604         final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
605 
606         final Future<ClientSessionEndpoint> connectFuture = client.connect(
607                 "localhost", serverEndpoint.getPort(), TIMEOUT);
608         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
609 
610         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
611                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
612                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
613                 new HandlerFactory<AsyncPushConsumer>() {
614 
615                     @Override
616                     public AsyncPushConsumer create(
617                             final HttpRequest request, final HttpContext context) throws HttpException {
618                         return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
619 
620                             @Override
621                             protected void handleResponse(
622                                     final HttpRequest promise,
623                                     final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
624                                 try {
625                                     pushMessageQueue.put(responseMessage);
626                                 } catch (final InterruptedException ex) {
627                                     Thread.currentThread().interrupt();
628                                     throw new InterruptedIOException(ex.getMessage());
629                                 }
630                             }
631 
632                         };
633                     }
634                 },
635                 null,
636                 null);
637         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
638         Assert.assertNotNull(result1);
639         final HttpResponse response1 = result1.getHead();
640         final String entity1 = result1.getBody();
641         Assert.assertNotNull(response1);
642         Assert.assertEquals(200, response1.getCode());
643         Assert.assertEquals("Hi there", entity1);
644 
645         final Message<HttpResponse, String> result2 = pushMessageQueue.poll(5, TimeUnit.SECONDS);
646         Assert.assertNotNull(result2);
647         final HttpResponse response2 = result2.getHead();
648         final String entity2 = result2.getBody();
649         Assert.assertEquals(200, response2.getCode());
650         Assert.assertNotNull(entity2);
651         final StringTokenizer t1 = new StringTokenizer(entity2, "\r\n");
652         while (t1.hasMoreTokens()) {
653             Assert.assertEquals("Pushing lots of stuff", t1.nextToken());
654         }
655     }
656 
657     @Test
658     public void testPushRefused() throws Exception {
659         final BlockingQueue<Exception> pushResultQueue = new LinkedBlockingDeque<>();
660         final InetSocketAddress serverEndpoint = server.start();
661         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
662 
663             @Override
664             public AsyncServerExchangeHandler get() {
665                 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
666 
667                     @Override
668                     protected void handle(
669                             final Message<HttpRequest, Void> request,
670                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
671                             final HttpContext context) throws IOException, HttpException {
672 
673                         responseTrigger.pushPromise(
674                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
675                                 context, new BasicPushProducer(AsyncEntityProducers.create("Pushing all sorts of stuff")) {
676 
677                             @Override
678                             public void failed(final Exception cause) {
679                                 pushResultQueue.add(cause);
680                                 super.failed(cause);
681                             }
682 
683                         });
684                         responseTrigger.pushPromise(
685                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/more-stuff")),
686                                 context, new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)) {
687 
688                             @Override
689                             public void failed(final Exception cause) {
690                                 pushResultQueue.add(cause);
691                                 super.failed(cause);
692                             }
693 
694                         });
695                         responseTrigger.submitResponse(
696                                 new BasicResponseProducer(HttpStatus.SC_OK, AsyncEntityProducers.create("Hi there")),
697                                 context);
698                     }
699                 };
700             }
701 
702         });
703 
704         client.start(H2Config.custom().setPushEnabled(true).build());
705 
706         final Future<ClientSessionEndpoint> connectFuture = client.connect(
707                 "localhost", serverEndpoint.getPort(), TIMEOUT);
708         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
709 
710         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
711                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
712                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
713         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
714         Assert.assertNotNull(result1);
715         final HttpResponse response1 = result1.getHead();
716         final String entity1 = result1.getBody();
717         Assert.assertNotNull(response1);
718         Assert.assertEquals(200, response1.getCode());
719         Assert.assertEquals("Hi there", entity1);
720 
721         final Object result2 = pushResultQueue.poll(5, TimeUnit.SECONDS);
722         Assert.assertNotNull(result2);
723         Assert.assertTrue(result2 instanceof H2StreamResetException);
724         Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result2).getCode());
725 
726         final Object result3 = pushResultQueue.poll(5, TimeUnit.SECONDS);
727         Assert.assertNotNull(result3);
728         Assert.assertTrue(result3 instanceof H2StreamResetException);
729         Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result3).getCode());
730     }
731 
732     @Test
733     public void testExcessOfConcurrentStreams() throws Exception {
734         server.register("/", new Supplier<AsyncServerExchangeHandler>() {
735 
736             @Override
737             public AsyncServerExchangeHandler get() {
738                 return new MultiLineResponseHandler("0123456789abcdef", 2000);
739             }
740 
741         });
742         final InetSocketAddress serverEndpoint = server.start(H2Config.custom().setMaxConcurrentStreams(20).build());
743 
744         client.start(H2Config.custom().setMaxConcurrentStreams(20).build());
745         final Future<ClientSessionEndpoint> connectFuture = client.connect(
746                 "localhost", serverEndpoint.getPort(), TIMEOUT);
747         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
748 
749         final Queue<Future<Message<HttpResponse, Void>>> queue = new LinkedList<>();
750         for (int i = 0; i < 2000; i++) {
751             final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
752             final Future<Message<HttpResponse, Void>> future = streamEndpoint.execute(
753                     new BasicRequestProducer(request1, null),
754                     new BasicResponseConsumer<>(new NoopEntityConsumer()), null);
755             queue.add(future);
756         }
757 
758         while (!queue.isEmpty()) {
759             final Future<Message<HttpResponse, Void>> future = queue.remove();
760             final Message<HttpResponse, Void> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
761             Assert.assertNotNull(result);
762             final HttpResponse response = result.getHead();
763             Assert.assertNotNull(response);
764             Assert.assertEquals(200, response.getCode());
765         }
766     }
767 
768     @Test
769     public void testExpectationFailed() throws Exception {
770         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
771 
772             @Override
773             public AsyncServerExchangeHandler get() {
774                 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
775 
776                     @Override
777                     protected void handle(
778                             final Message<HttpRequest, String> request,
779                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
780                             final HttpContext context) throws IOException, HttpException {
781                         responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
782 
783                     }
784                 };
785             }
786 
787         });
788         final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
789 
790             @Override
791             public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
792 
793                 return new BasicAsyncServerExpectationDecorator(handler) {
794 
795                     @Override
796                     protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
797                         final Header h = request.getFirstHeader("password");
798                         if (h != null && "secret".equals(h.getValue())) {
799                             return null;
800                         } else {
801                             return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
802                         }
803                     }
804                 };
805 
806             }
807         }, H2Config.DEFAULT);
808 
809         client.start();
810         final Future<ClientSessionEndpoint> connectFuture = client.connect(
811                 "localhost", serverEndpoint.getPort(), TIMEOUT);
812         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
813 
814         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
815         request1.addHeader("password", "secret");
816         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
817                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
818                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
819         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
820         Assert.assertNotNull(result1);
821         final HttpResponse response1 = result1.getHead();
822         Assert.assertNotNull(response1);
823         Assert.assertEquals(200, response1.getCode());
824         Assert.assertNotNull("All is well", result1.getBody());
825 
826         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
827         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
828                 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
829                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
830         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
831         Assert.assertNotNull(result2);
832         final HttpResponse response2 = result2.getHead();
833         Assert.assertNotNull(response2);
834         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
835         Assert.assertNotNull("You shall not pass", result2.getBody());
836     }
837 
838     @Test
839     public void testPrematureResponse() throws Exception {
840         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
841 
842             @Override
843             public AsyncServerExchangeHandler get() {
844                 return new AsyncServerExchangeHandler() {
845 
846                     private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
847 
848                     @Override
849                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
850                         capacityChannel.update(Integer.MAX_VALUE);
851                     }
852 
853                     @Override
854                     public void consume(final ByteBuffer src) throws IOException {
855                     }
856 
857                     @Override
858                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
859                     }
860 
861                     @Override
862                     public void handleRequest(
863                             final HttpRequest request,
864                             final EntityDetails entityDetails,
865                             final ResponseChannel responseChannel,
866                             final HttpContext context) throws HttpException, IOException {
867                         final AsyncResponseProducer producer;
868                         final Header h = request.getFirstHeader("password");
869                         if (h != null && "secret".equals(h.getValue())) {
870                             producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
871                         } else {
872                             producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
873                         }
874                         responseProducer.set(producer);
875                         producer.sendResponse(responseChannel, context);
876                     }
877 
878                     @Override
879                     public int available() {
880                         final AsyncResponseProducer producer = this.responseProducer.get();
881                         return producer.available();
882                     }
883 
884                     @Override
885                     public void produce(final DataStreamChannel channel) throws IOException {
886                         final AsyncResponseProducer producer = this.responseProducer.get();
887                         producer.produce(channel);
888                     }
889 
890                     @Override
891                     public void failed(final Exception cause) {
892                     }
893 
894                     @Override
895                     public void releaseResources() {
896                     }
897                 };
898             }
899 
900         });
901         final InetSocketAddress serverEndpoint = server.start();
902 
903         client.start();
904         final Future<ClientSessionEndpoint> connectFuture = client.connect(
905                 "localhost", serverEndpoint.getPort(), TIMEOUT);
906         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
907 
908         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
909         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
910                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
911                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
912         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
913         Assert.assertNotNull(result1);
914         final HttpResponse response1 = result1.getHead();
915         Assert.assertNotNull(response1);
916         Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
917         Assert.assertNotNull("You shall not pass", result1.getBody());
918     }
919 
920     @Test
921     public void testMessageWithTrailers() throws Exception {
922         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
923 
924             @Override
925             public AsyncServerExchangeHandler get() {
926                 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
927 
928                     @Override
929                     protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
930                             final HttpRequest request,
931                             final EntityDetails entityDetails,
932                             final HttpContext context) throws HttpException {
933                         return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
934                     }
935 
936                     @Override
937                     protected void handle(
938                             final Message<HttpRequest, String> requestMessage,
939                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
940                             final HttpContext context) throws HttpException, IOException {
941                         responseTrigger.submitResponse(new BasicResponseProducer(
942                                 HttpStatus.SC_OK,
943                                 new DigestingEntityProducer("MD5",
944                                         new StringAsyncEntityProducer("Hello back with some trailers"))), context);
945                     }
946                 };
947             }
948 
949         });
950         final InetSocketAddress serverEndpoint = server.start();
951 
952         client.start();
953 
954         final Future<ClientSessionEndpoint> connectFuture = client.connect(
955                 "localhost", serverEndpoint.getPort(), TIMEOUT);
956         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
957 
958         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
959         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
960         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
961                 new BasicRequestProducer(request1, null),
962                 new BasicResponseConsumer<>(entityConsumer), null);
963         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
964         Assert.assertNotNull(result1);
965         final HttpResponse response1 = result1.getHead();
966         Assert.assertNotNull(response1);
967         Assert.assertEquals(200, response1.getCode());
968         Assert.assertEquals("Hello back with some trailers", result1.getBody());
969 
970         final List<Header> trailers = entityConsumer.getTrailers();
971         Assert.assertNotNull(trailers);
972         Assert.assertEquals(2, trailers.size());
973         final Map<String, String> map = new HashMap<>();
974         for (final Header header: trailers) {
975             map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
976         }
977         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
978         Assert.assertEquals("MD5", map.get("digest-algo"));
979         Assert.assertEquals(digest, map.get("digest"));
980     }
981 
982     @Test
983     public void testConnectionPing() throws Exception {
984         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
985 
986             @Override
987             public AsyncServerExchangeHandler get() {
988                 return new SingleLineResponseHandler("Hi there");
989             }
990 
991         });
992         final InetSocketAddress serverEndpoint = server.start();
993 
994         client.start();
995         final Future<ClientSessionEndpoint> connectFuture = client.connect(
996                 "localhost", serverEndpoint.getPort(), TIMEOUT);
997         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
998 
999         final int n = 10;
1000         final CountDownLatch latch = new CountDownLatch(n);
1001         final AtomicInteger count = new AtomicInteger(0);
1002         for (int i = 0; i < n; i++) {
1003             streamEndpoint.execute(
1004                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1005                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1006             streamEndpoint.execute(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
1007 
1008                 @Override
1009                 public void execute(final Boolean result) {
1010                     if (result) {
1011                         count.incrementAndGet();
1012                     }
1013                     latch.countDown();
1014                 }
1015 
1016             })), Command.Priority.NORMAL);
1017 
1018         }
1019         Assert.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1020         Assert.assertEquals(n, count.get());
1021     }
1022 
1023     @Test
1024     public void testRequestWithInvalidConnectionHeader() throws Exception {
1025         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1026 
1027             @Override
1028             public AsyncServerExchangeHandler get() {
1029                 return new SingleLineResponseHandler("Hi there");
1030             }
1031 
1032         });
1033         final InetSocketAddress serverEndpoint = server.start();
1034 
1035         client.start();
1036 
1037         final Future<IOSession> sessionFuture = client.requestSession(new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
1038         final IOSession session = sessionFuture.get();
1039         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(session);
1040 
1041         final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1042         request.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
1043         final HttpCoreContext coreContext = HttpCoreContext.create();
1044         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1045                     new BasicRequestProducer(request, null),
1046                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
1047                     coreContext, null);
1048         try {
1049             future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1050             Assert.fail("ExecutionException is expected");
1051         } catch (final ExecutionException ex) {
1052             MatcherAssert.assertThat(ex.getCause(), CoreMatchers.instanceOf(ProtocolException.class));
1053         }
1054 
1055         final EndpointDetails endpointDetails = coreContext.getEndpointDetails();
1056         MatcherAssert.assertThat(endpointDetails.getRequestCount(), CoreMatchers.equalTo(0L));
1057     }
1058 
1059     @Test
1060     public void testHeaderTooLarge() throws Exception {
1061         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1062 
1063             @Override
1064             public AsyncServerExchangeHandler get() {
1065                 return new SingleLineResponseHandler("Hi there");
1066             }
1067 
1068         });
1069         final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1070                 .setMaxHeaderListSize(100)
1071                 .build());
1072         client.start();
1073 
1074         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1075                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1076         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1077 
1078         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1079         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1080                 "1234567890123456789012345678901234567890");
1081         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1082                 new BasicRequestProducer(request1, null),
1083                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1084         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1085         Assert.assertNotNull(result1);
1086         final HttpResponse response1 = result1.getHead();
1087         Assert.assertNotNull(response1);
1088         Assert.assertEquals(431, response1.getCode());
1089         Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1090     }
1091 
1092     @Test
1093     public void testHeaderTooLargePost() throws Exception {
1094         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1095 
1096             @Override
1097             public AsyncServerExchangeHandler get() {
1098                 return new SingleLineResponseHandler("Hi there");
1099             }
1100 
1101         });
1102         final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1103                 .setMaxHeaderListSize(100)
1104                 .build());
1105         client.start(
1106                 new DefaultHttpProcessor(new H2RequestContent(), new H2RequestTargetHost(), new H2RequestConnControl()),
1107                 H2Config.DEFAULT);
1108 
1109         final Future<ClientSessionEndpoint> connectFuture = client.connect(
1110                 "localhost", serverEndpoint.getPort(), TIMEOUT);
1111         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1112 
1113         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1114         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1115                 "1234567890123456789012345678901234567890");
1116 
1117         final byte[] b = new byte[2048];
1118         for (int i = 0; i < b.length; i++) {
1119             b[i] = (byte) ('a' + i % 10);
1120         }
1121 
1122         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1123                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1124                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1125         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1126         Assert.assertNotNull(result1);
1127         final HttpResponse response1 = result1.getHead();
1128         Assert.assertNotNull(response1);
1129         Assert.assertEquals(431, response1.getCode());
1130         Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1131     }
1132 
1133 }