View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import static org.hamcrest.MatcherAssert.assertThat;
31  
32  import java.io.BufferedReader;
33  import java.io.BufferedWriter;
34  import java.io.IOException;
35  import java.io.InputStream;
36  import java.io.InputStreamReader;
37  import java.io.InterruptedIOException;
38  import java.io.OutputStream;
39  import java.io.OutputStreamWriter;
40  import java.net.InetSocketAddress;
41  import java.net.URI;
42  import java.net.URISyntaxException;
43  import java.nio.ByteBuffer;
44  import java.nio.charset.Charset;
45  import java.nio.charset.StandardCharsets;
46  import java.util.HashMap;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Map;
50  import java.util.Queue;
51  import java.util.StringTokenizer;
52  import java.util.concurrent.BlockingQueue;
53  import java.util.concurrent.CountDownLatch;
54  import java.util.concurrent.ExecutionException;
55  import java.util.concurrent.Executors;
56  import java.util.concurrent.Future;
57  import java.util.concurrent.LinkedBlockingDeque;
58  import java.util.concurrent.TimeUnit;
59  import java.util.concurrent.atomic.AtomicInteger;
60  import java.util.concurrent.atomic.AtomicReference;
61  
62  import org.apache.hc.core5.function.Supplier;
63  import org.apache.hc.core5.http.ContentType;
64  import org.apache.hc.core5.http.EndpointDetails;
65  import org.apache.hc.core5.http.EntityDetails;
66  import org.apache.hc.core5.http.Header;
67  import org.apache.hc.core5.http.HeaderElements;
68  import org.apache.hc.core5.http.HttpException;
69  import org.apache.hc.core5.http.HttpHeaders;
70  import org.apache.hc.core5.http.HttpHost;
71  import org.apache.hc.core5.http.HttpRequest;
72  import org.apache.hc.core5.http.HttpResponse;
73  import org.apache.hc.core5.http.HttpStatus;
74  import org.apache.hc.core5.http.Message;
75  import org.apache.hc.core5.http.Method;
76  import org.apache.hc.core5.http.ProtocolException;
77  import org.apache.hc.core5.http.URIScheme;
78  import org.apache.hc.core5.http.message.BasicHttpRequest;
79  import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
80  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
81  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
82  import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
83  import org.apache.hc.core5.http.nio.CapacityChannel;
84  import org.apache.hc.core5.http.nio.DataStreamChannel;
85  import org.apache.hc.core5.http.nio.ResponseChannel;
86  import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
87  import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
88  import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
89  import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
90  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
91  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
92  import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
93  import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
94  import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
95  import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
96  import org.apache.hc.core5.http.nio.support.BasicPushProducer;
97  import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
98  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
99  import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
100 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
101 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
102 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
103 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
104 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
105 import org.apache.hc.core5.http.protocol.HttpContext;
106 import org.apache.hc.core5.http.protocol.HttpCoreContext;
107 import org.apache.hc.core5.http2.H2Error;
108 import org.apache.hc.core5.http2.H2StreamResetException;
109 import org.apache.hc.core5.http2.config.H2Config;
110 import org.apache.hc.core5.http2.nio.command.PingCommand;
111 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
112 import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
113 import org.apache.hc.core5.http2.protocol.H2RequestContent;
114 import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
115 import org.apache.hc.core5.reactor.Command;
116 import org.apache.hc.core5.reactor.IOSession;
117 import org.apache.hc.core5.testing.nio.extension.H2TestResources;
118 import org.apache.hc.core5.util.TextUtils;
119 import org.apache.hc.core5.util.Timeout;
120 import org.hamcrest.CoreMatchers;
121 import org.junit.jupiter.api.Assertions;
122 import org.junit.jupiter.api.Test;
123 import org.junit.jupiter.api.extension.RegisterExtension;
124 
125 public abstract class H2IntegrationTest {
126 
127     private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
128     private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
129 
130     private final URIScheme scheme;
131 
132     @RegisterExtension
133     private final H2TestResources resources;
134 
135     public H2IntegrationTest(final URIScheme scheme) {
136         this.scheme = scheme;
137         this.resources = new H2TestResources(scheme, TIMEOUT);
138     }
139 
140     private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
141         try {
142             return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
143         } catch (final URISyntaxException e) {
144             throw new IllegalStateException();
145         }
146     }
147 
148     @Test
149     public void testSimpleGet() throws Exception {
150         final H2TestServer server = resources.server();
151         final H2TestClient client = resources.client();
152 
153         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
154         final InetSocketAddress serverEndpoint = server.start();
155 
156         client.start();
157         final Future<ClientSessionEndpoint> connectFuture = client.connect(
158                 "localhost", serverEndpoint.getPort(), TIMEOUT);
159         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
160 
161         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
162         for (int i = 0; i < 10; i++) {
163             queue.add(streamEndpoint.execute(
164                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
165                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
166 
167         }
168         while (!queue.isEmpty()) {
169             final Future<Message<HttpResponse, String>> future = queue.remove();
170             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
171             Assertions.assertNotNull(result);
172             final HttpResponse response = result.getHead();
173             final String entity = result.getBody();
174             Assertions.assertNotNull(response);
175             Assertions.assertEquals(200, response.getCode());
176             Assertions.assertEquals("Hi there", entity);
177         }
178     }
179 
180     @Test
181     public void testSimpleHead() throws Exception {
182         final H2TestServer server = resources.server();
183         final H2TestClient client = resources.client();
184 
185         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
186         final InetSocketAddress serverEndpoint = server.start();
187 
188         client.start();
189         final Future<ClientSessionEndpoint> connectFuture = client.connect(
190                 "localhost", serverEndpoint.getPort(), TIMEOUT);
191         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
192 
193         for (int i = 0; i < 5; i++) {
194             final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
195                     new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
196                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
197             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
198             Assertions.assertNotNull(result);
199             final HttpResponse response1 = result.getHead();
200             Assertions.assertNotNull(response1);
201             Assertions.assertEquals(200, response1.getCode());
202             Assertions.assertNull(result.getBody());
203         }
204     }
205 
206     @Test
207     public void testLargeGet() throws Exception {
208         final H2TestServer server = resources.server();
209         final H2TestClient client = resources.client();
210 
211         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
212         final InetSocketAddress serverEndpoint = server.start();
213 
214         client.start();
215         final Future<ClientSessionEndpoint> connectFuture = client.connect(
216                 "localhost", serverEndpoint.getPort(), TIMEOUT);
217         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
218 
219         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
220                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
221                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
222 
223         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
224                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
225                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
226 
227         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
228         Assertions.assertNotNull(result1);
229         final HttpResponse response1 = result1.getHead();
230         Assertions.assertNotNull(response1);
231         Assertions.assertEquals(200, response1.getCode());
232         final String s1 = result1.getBody();
233         Assertions.assertNotNull(s1);
234         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
235         while (t1.hasMoreTokens()) {
236             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
237         }
238 
239         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
240         Assertions.assertNotNull(result2);
241         final HttpResponse response2 = result2.getHead();
242         Assertions.assertNotNull(response2);
243         Assertions.assertEquals(200, response2.getCode());
244         final String s2 = result2.getBody();
245         Assertions.assertNotNull(s2);
246         final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
247         while (t2.hasMoreTokens()) {
248             Assertions.assertEquals("0123456789abcdef", t2.nextToken());
249         }
250     }
251 
252     @Test
253     public void testBasicPost() throws Exception {
254         final H2TestServer server = resources.server();
255         final H2TestClient client = resources.client();
256 
257         server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
258         final InetSocketAddress serverEndpoint = server.start();
259 
260         client.start();
261         final Future<ClientSessionEndpoint> connectFuture = client.connect(
262                 "localhost", serverEndpoint.getPort(), TIMEOUT);
263         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
264 
265         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
266         for (int i = 0; i < 10; i++) {
267             final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
268             queue.add(streamEndpoint.execute(
269                     new BasicRequestProducer(request, new StringAsyncEntityProducer("Hi there", ContentType.TEXT_PLAIN)),
270                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
271 
272         }
273         while (!queue.isEmpty()) {
274             final Future<Message<HttpResponse, String>> future = queue.remove();
275             final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
276             Assertions.assertNotNull(result);
277             final HttpResponse response = result.getHead();
278             final String entity1 = result.getBody();
279             Assertions.assertNotNull(response);
280             Assertions.assertEquals(200, response.getCode());
281             Assertions.assertEquals("Hi back", entity1);
282         }
283     }
284 
285     @Test
286     public void testLargePost() throws Exception {
287         final H2TestServer server = resources.server();
288         final H2TestClient client = resources.client();
289 
290         server.register("*", () -> new EchoHandler(2048));
291         final InetSocketAddress serverEndpoint = server.start();
292 
293         client.start();
294         final Future<ClientSessionEndpoint> connectFuture = client.connect(
295                 "localhost", serverEndpoint.getPort(), TIMEOUT);
296         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
297 
298         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
299                 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
300                         new MultiLineEntityProducer("0123456789abcdef", 5000)),
301                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
302         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
303         Assertions.assertNotNull(result1);
304         final HttpResponse response1 = result1.getHead();
305         Assertions.assertNotNull(response1);
306         Assertions.assertEquals(200, response1.getCode());
307         final String s1 = result1.getBody();
308         Assertions.assertNotNull(s1);
309         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
310         while (t1.hasMoreTokens()) {
311             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
312         }
313     }
314 
315     @Test
316     public void testSlowResponseConsumer() throws Exception {
317         final H2TestServer server = resources.server();
318         final H2TestClient client = resources.client();
319 
320         server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 3));
321         final InetSocketAddress serverEndpoint = server.start();
322 
323         client.start(H2Config.custom().setInitialWindowSize(16).build());
324         final Future<ClientSessionEndpoint> connectFuture = client.connect(
325                 "localhost", serverEndpoint.getPort(), TIMEOUT);
326         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
327 
328         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
329                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
330                 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
331 
332                     @Override
333                     protected String consumeData(
334                             final ContentType contentType, final InputStream inputStream) throws IOException {
335                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
336 
337                         final StringBuilder buffer = new StringBuilder();
338                         try {
339                             final byte[] tmp = new byte[16];
340                             int l;
341                             while ((l = inputStream.read(tmp)) != -1) {
342                                 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
343                                 Thread.sleep(500);
344                             }
345                         } catch (final InterruptedException ex) {
346                             Thread.currentThread().interrupt();
347                             throw new InterruptedIOException(ex.getMessage());
348                         }
349                         return buffer.toString();
350                     }
351                 }),
352                 null);
353 
354         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
355         Assertions.assertNotNull(result1);
356         final HttpResponse response1 = result1.getHead();
357         Assertions.assertNotNull(response1);
358         Assertions.assertEquals(200, response1.getCode());
359         final String s1 = result1.getBody();
360         Assertions.assertNotNull(s1);
361         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
362         while (t1.hasMoreTokens()) {
363             Assertions.assertEquals("0123456789abcd", t1.nextToken());
364         }
365     }
366 
367     @Test
368     public void testSlowRequestProducer() throws Exception {
369         final H2TestServer server = resources.server();
370         final H2TestClient client = resources.client();
371 
372         server.register("*", () -> new EchoHandler(2048));
373         final InetSocketAddress serverEndpoint = server.start();
374 
375         client.start();
376         final Future<ClientSessionEndpoint> connectFuture = client.connect(
377                 "localhost", serverEndpoint.getPort(), TIMEOUT);
378         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
379 
380         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
381         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
382                 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
383 
384                     @Override
385                     protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
386                         final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
387                         try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
388                             for (int i = 0; i < 500; i++) {
389                                 if (i % 100 == 0) {
390                                     writer.flush();
391                                     Thread.sleep(500);
392                                 }
393                                 writer.write("0123456789abcdef\r\n");
394                             }
395                         } catch (final InterruptedException ex) {
396                             Thread.currentThread().interrupt();
397                             throw new InterruptedIOException(ex.getMessage());
398                         }
399                     }
400 
401                 }),
402                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
403         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
404         Assertions.assertNotNull(result1);
405         final HttpResponse response1 = result1.getHead();
406         Assertions.assertNotNull(response1);
407         Assertions.assertEquals(200, response1.getCode());
408         final String s1 = result1.getBody();
409         Assertions.assertNotNull(s1);
410         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
411         while (t1.hasMoreTokens()) {
412             Assertions.assertEquals("0123456789abcdef", t1.nextToken());
413         }
414     }
415 
416     @Test
417     public void testSlowResponseProducer() throws Exception {
418         final H2TestServer server = resources.server();
419         final H2TestClient client = resources.client();
420 
421         server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
422 
423             @Override
424             protected void handle(
425                     final HttpRequest request,
426                     final InputStream requestStream,
427                     final HttpResponse response,
428                     final OutputStream responseStream,
429                     final HttpContext context) throws IOException, HttpException {
430 
431                 if (!"/hello".equals(request.getPath())) {
432                     response.setCode(HttpStatus.SC_NOT_FOUND);
433                     return;
434                 }
435                 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
436                     response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
437                     return;
438                 }
439                 if (requestStream == null) {
440                     return;
441                 }
442                 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
443                 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
444                 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
445                 response.setCode(HttpStatus.SC_OK);
446                 response.setHeader(h1);
447                 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
448                     final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
449                     try {
450                         String l;
451                         int count = 0;
452                         while ((l = reader.readLine()) != null) {
453                             writer.write(l);
454                             writer.write("\r\n");
455                             count++;
456                             if (count % 500 == 0) {
457                                 Thread.sleep(500);
458                             }
459                         }
460                         writer.flush();
461                     } catch (final InterruptedException ex) {
462                         Thread.currentThread().interrupt();
463                         throw new InterruptedIOException(ex.getMessage());
464                     }
465                 }
466             }
467         });
468         final InetSocketAddress serverEndpoint = server.start();
469 
470         client.start(H2Config.custom()
471                 .setInitialWindowSize(512)
472                 .build());
473 
474         final Future<ClientSessionEndpoint> connectFuture = client.connect(
475                 "localhost", serverEndpoint.getPort(), TIMEOUT);
476         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477 
478         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
479         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
480                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
481                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
482         final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
483         Assertions.assertNotNull(result1);
484         final HttpResponse response1 = result1.getHead();
485         Assertions.assertNotNull(response1);
486         Assertions.assertEquals(200, response1.getCode());
487         final String s1 = result1.getBody();
488         Assertions.assertNotNull(s1);
489         final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
490         while (t1.hasMoreTokens()) {
491             Assertions.assertEquals("0123456789abcd", t1.nextToken());
492         }
493     }
494 
495     @Test
496     public void testPush() throws Exception {
497         final H2TestServer server = resources.server();
498         final H2TestClient client = resources.client();
499 
500         final InetSocketAddress serverEndpoint = server.start();
501         server.register("/hello", () -> new MessageExchangeHandler<Void>(new DiscardingEntityConsumer<>()) {
502 
503             @Override
504             protected void handle(
505                     final Message<HttpRequest, Void> request,
506                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
507                     final HttpContext context) throws IOException, HttpException {
508                 responseTrigger.pushPromise(
509                         new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
510                         context,
511                         new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)));
512                 responseTrigger.submitResponse(
513                         AsyncResponseBuilder.create(HttpStatus.SC_OK).setEntity("Hi there", ContentType.TEXT_PLAIN).build(),
514                         context);
515             }
516         });
517 
518         client.start(H2Config.custom().setPushEnabled(true).build());
519 
520         final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
521 
522         final Future<ClientSessionEndpoint> connectFuture = client.connect(
523                 "localhost", serverEndpoint.getPort(), TIMEOUT);
524         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
525 
526         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
527                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
528                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
529                 (request, context) -> new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
530 
531                     @Override
532                     protected void handleResponse(
533                             final HttpRequest promise,
534                             final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
535                         try {
536                             pushMessageQueue.put(responseMessage);
537                         } catch (final InterruptedException ex) {
538                             Thread.currentThread().interrupt();
539                             throw new InterruptedIOException(ex.getMessage());
540                         }
541                     }
542 
543                 },
544                 null,
545                 null);
546         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
547         Assertions.assertNotNull(result1);
548         final HttpResponse response1 = result1.getHead();
549         final String entity1 = result1.getBody();
550         Assertions.assertNotNull(response1);
551         Assertions.assertEquals(200, response1.getCode());
552         Assertions.assertEquals("Hi there", entity1);
553 
554         final Message<HttpResponse, String> result2 = pushMessageQueue.poll(5, TimeUnit.SECONDS);
555         Assertions.assertNotNull(result2);
556         final HttpResponse response2 = result2.getHead();
557         final String entity2 = result2.getBody();
558         Assertions.assertEquals(200, response2.getCode());
559         Assertions.assertNotNull(entity2);
560         final StringTokenizer t1 = new StringTokenizer(entity2, "\r\n");
561         while (t1.hasMoreTokens()) {
562             Assertions.assertEquals("Pushing lots of stuff", t1.nextToken());
563         }
564     }
565 
566     @Test
567     public void testPushRefused() throws Exception {
568         final H2TestServer server = resources.server();
569         final H2TestClient client = resources.client();
570 
571         final BlockingQueue<Exception> pushResultQueue = new LinkedBlockingDeque<>();
572         final InetSocketAddress serverEndpoint = server.start();
573         server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
574 
575             @Override
576             public AsyncServerExchangeHandler get() {
577                 return new MessageExchangeHandler<Void>(new DiscardingEntityConsumer<>()) {
578 
579                     @Override
580                     protected void handle(
581                             final Message<HttpRequest, Void> request,
582                             final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
583                             final HttpContext context) throws IOException, HttpException {
584 
585                         responseTrigger.pushPromise(
586                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
587                                 context, new BasicPushProducer(AsyncEntityProducers.create("Pushing all sorts of stuff")) {
588 
589                             @Override
590                             public void failed(final Exception cause) {
591                                 pushResultQueue.add(cause);
592                                 super.failed(cause);
593                             }
594 
595                         });
596                         responseTrigger.pushPromise(
597                                 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/more-stuff")),
598                                 context, new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)) {
599 
600                             @Override
601                             public void failed(final Exception cause) {
602                                 pushResultQueue.add(cause);
603                                 super.failed(cause);
604                             }
605 
606                         });
607                         responseTrigger.submitResponse(
608                                 new BasicResponseProducer(HttpStatus.SC_OK, AsyncEntityProducers.create("Hi there")),
609                                 context);
610                     }
611                 };
612             }
613 
614         });
615 
616         client.start(H2Config.custom().setPushEnabled(true).build());
617 
618         final Future<ClientSessionEndpoint> connectFuture = client.connect(
619                 "localhost", serverEndpoint.getPort(), TIMEOUT);
620         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
621 
622         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
623                 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
624                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
625         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
626         Assertions.assertNotNull(result1);
627         final HttpResponse response1 = result1.getHead();
628         final String entity1 = result1.getBody();
629         Assertions.assertNotNull(response1);
630         Assertions.assertEquals(200, response1.getCode());
631         Assertions.assertEquals("Hi there", entity1);
632 
633         final Object result2 = pushResultQueue.poll(5, TimeUnit.SECONDS);
634         Assertions.assertNotNull(result2);
635         Assertions.assertTrue(result2 instanceof H2StreamResetException);
636         Assertions.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result2).getCode());
637 
638         final Object result3 = pushResultQueue.poll(5, TimeUnit.SECONDS);
639         Assertions.assertNotNull(result3);
640         Assertions.assertTrue(result3 instanceof H2StreamResetException);
641         Assertions.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result3).getCode());
642     }
643 
644     @Test
645     public void testExcessOfConcurrentStreams() throws Exception {
646         final H2TestServer server = resources.server();
647         final H2TestClient client = resources.client();
648 
649         server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
650         final InetSocketAddress serverEndpoint = server.start(H2Config.custom().setMaxConcurrentStreams(20).build());
651 
652         client.start(H2Config.custom().setMaxConcurrentStreams(20).build());
653         final Future<ClientSessionEndpoint> connectFuture = client.connect(
654                 "localhost", serverEndpoint.getPort(), TIMEOUT);
655         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
656 
657         final Queue<Future<Message<HttpResponse, Void>>> queue = new LinkedList<>();
658         for (int i = 0; i < 2000; i++) {
659             final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
660             final Future<Message<HttpResponse, Void>> future = streamEndpoint.execute(
661                     new BasicRequestProducer(request1, null),
662                     new BasicResponseConsumer<>(new DiscardingEntityConsumer<>()), null);
663             queue.add(future);
664         }
665 
666         while (!queue.isEmpty()) {
667             final Future<Message<HttpResponse, Void>> future = queue.remove();
668             final Message<HttpResponse, Void> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
669             Assertions.assertNotNull(result);
670             final HttpResponse response = result.getHead();
671             Assertions.assertNotNull(response);
672             Assertions.assertEquals(200, response.getCode());
673         }
674     }
675 
676     @Test
677     public void testExpectationFailed() throws Exception {
678         final H2TestServer server = resources.server();
679         final H2TestClient client = resources.client();
680 
681         server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
682 
683             @Override
684             protected void handle(
685                     final Message<HttpRequest, String> request,
686                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
687                     final HttpContext context) throws IOException, HttpException {
688                 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
689 
690             }
691         });
692         final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
693 
694             @Override
695             protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
696                 final Header h = request.getFirstHeader("password");
697                 if (h != null && "secret".equals(h.getValue())) {
698                     return null;
699                 } else {
700                     return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
701                 }
702             }
703         }, H2Config.DEFAULT);
704 
705         client.start();
706         final Future<ClientSessionEndpoint> connectFuture = client.connect(
707                 "localhost", serverEndpoint.getPort(), TIMEOUT);
708         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
709 
710         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
711         request1.addHeader("password", "secret");
712         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
713                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
714                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
715         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
716         Assertions.assertNotNull(result1);
717         final HttpResponse response1 = result1.getHead();
718         Assertions.assertNotNull(response1);
719         Assertions.assertEquals(200, response1.getCode());
720         Assertions.assertNotNull("All is well", result1.getBody());
721 
722         final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
723         final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
724                 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
725                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
726         final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
727         Assertions.assertNotNull(result2);
728         final HttpResponse response2 = result2.getHead();
729         Assertions.assertNotNull(response2);
730         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
731         Assertions.assertNotNull("You shall not pass", result2.getBody());
732     }
733 
734     @Test
735     public void testPrematureResponse() throws Exception {
736         final H2TestServer server = resources.server();
737         final H2TestClient client = resources.client();
738 
739         server.register("*", new Supplier<AsyncServerExchangeHandler>() {
740 
741             @Override
742             public AsyncServerExchangeHandler get() {
743                 return new AsyncServerExchangeHandler() {
744 
745                     private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
746 
747                     @Override
748                     public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
749                         capacityChannel.update(Integer.MAX_VALUE);
750                     }
751 
752                     @Override
753                     public void consume(final ByteBuffer src) throws IOException {
754                     }
755 
756                     @Override
757                     public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
758                     }
759 
760                     @Override
761                     public void handleRequest(
762                             final HttpRequest request,
763                             final EntityDetails entityDetails,
764                             final ResponseChannel responseChannel,
765                             final HttpContext context) throws HttpException, IOException {
766                         final AsyncResponseProducer producer;
767                         final Header h = request.getFirstHeader("password");
768                         if (h != null && "secret".equals(h.getValue())) {
769                             producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
770                         } else {
771                             producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
772                         }
773                         responseProducer.set(producer);
774                         producer.sendResponse(responseChannel, context);
775                     }
776 
777                     @Override
778                     public int available() {
779                         final AsyncResponseProducer producer = this.responseProducer.get();
780                         return producer.available();
781                     }
782 
783                     @Override
784                     public void produce(final DataStreamChannel channel) throws IOException {
785                         final AsyncResponseProducer producer = this.responseProducer.get();
786                         producer.produce(channel);
787                     }
788 
789                     @Override
790                     public void failed(final Exception cause) {
791                     }
792 
793                     @Override
794                     public void releaseResources() {
795                     }
796                 };
797             }
798 
799         });
800         final InetSocketAddress serverEndpoint = server.start();
801 
802         client.start();
803         final Future<ClientSessionEndpoint> connectFuture = client.connect(
804                 "localhost", serverEndpoint.getPort(), TIMEOUT);
805         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
806 
807         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
808         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
809                 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
810                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
811         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
812         Assertions.assertNotNull(result1);
813         final HttpResponse response1 = result1.getHead();
814         Assertions.assertNotNull(response1);
815         Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
816         Assertions.assertNotNull("You shall not pass", result1.getBody());
817     }
818 
819     @Test
820     public void testMessageWithTrailers() throws Exception {
821         final H2TestServer server = resources.server();
822         final H2TestClient client = resources.client();
823 
824         server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
825 
826             @Override
827             protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
828                     final HttpRequest request,
829                     final EntityDetails entityDetails,
830                     final HttpContext context) throws HttpException {
831                 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
832             }
833 
834             @Override
835             protected void handle(
836                     final Message<HttpRequest, String> requestMessage,
837                     final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
838                     final HttpContext context) throws HttpException, IOException {
839                 responseTrigger.submitResponse(new BasicResponseProducer(
840                         HttpStatus.SC_OK,
841                         new DigestingEntityProducer("MD5",
842                                 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
843             }
844         });
845         final InetSocketAddress serverEndpoint = server.start();
846 
847         client.start();
848 
849         final Future<ClientSessionEndpoint> connectFuture = client.connect(
850                 "localhost", serverEndpoint.getPort(), TIMEOUT);
851         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
852 
853         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
854         final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
855         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
856                 new BasicRequestProducer(request1, null),
857                 new BasicResponseConsumer<>(entityConsumer), null);
858         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
859         Assertions.assertNotNull(result1);
860         final HttpResponse response1 = result1.getHead();
861         Assertions.assertNotNull(response1);
862         Assertions.assertEquals(200, response1.getCode());
863         Assertions.assertEquals("Hello back with some trailers", result1.getBody());
864 
865         final List<Header> trailers = entityConsumer.getTrailers();
866         Assertions.assertNotNull(trailers);
867         Assertions.assertEquals(2, trailers.size());
868         final Map<String, String> map = new HashMap<>();
869         for (final Header header: trailers) {
870             map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
871         }
872         final String digest = TextUtils.toHexString(entityConsumer.getDigest());
873         Assertions.assertEquals("MD5", map.get("digest-algo"));
874         Assertions.assertEquals(digest, map.get("digest"));
875     }
876 
877     @Test
878     public void testConnectionPing() throws Exception {
879         final H2TestServer server = resources.server();
880         final H2TestClient client = resources.client();
881 
882         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
883         final InetSocketAddress serverEndpoint = server.start();
884 
885         client.start();
886         final Future<ClientSessionEndpoint> connectFuture = client.connect(
887                 "localhost", serverEndpoint.getPort(), TIMEOUT);
888         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
889 
890         final int n = 10;
891         final CountDownLatch latch = new CountDownLatch(n);
892         final AtomicInteger count = new AtomicInteger(0);
893         for (int i = 0; i < n; i++) {
894             streamEndpoint.execute(
895                     new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
896                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
897             streamEndpoint.execute(new PingCommand(new BasicPingHandler(result -> {
898                 if (result) {
899                     count.incrementAndGet();
900                 }
901                 latch.countDown();
902             })), Command.Priority.NORMAL);
903 
904         }
905         Assertions.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
906         Assertions.assertEquals(n, count.get());
907     }
908 
909     @Test
910     public void testRequestWithInvalidConnectionHeader() throws Exception {
911         final H2TestServer server = resources.server();
912         final H2TestClient client = resources.client();
913 
914         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
915         final InetSocketAddress serverEndpoint = server.start();
916 
917         client.start();
918 
919         final Future<IOSession> sessionFuture = client.requestSession(new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
920         final IOSession session = sessionFuture.get();
921         final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(session);
922 
923         final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
924         request.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
925         final HttpCoreContext coreContext = HttpCoreContext.create();
926         final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
927                     new BasicRequestProducer(request, null),
928                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
929                     coreContext, null);
930         final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
931                 future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
932         assertThat(exception.getCause(), CoreMatchers.instanceOf(ProtocolException.class));
933 
934         final EndpointDetails endpointDetails = coreContext.getEndpointDetails();
935         assertThat(endpointDetails.getRequestCount(), CoreMatchers.equalTo(0L));
936     }
937 
938     @Test
939     public void testHeaderTooLarge() throws Exception {
940         final H2TestServer server = resources.server();
941         final H2TestClient client = resources.client();
942 
943         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
944         final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
945                 .setMaxHeaderListSize(100)
946                 .build());
947         client.start();
948 
949         final Future<ClientSessionEndpoint> connectFuture = client.connect(
950                 "localhost", serverEndpoint.getPort(), TIMEOUT);
951         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
952 
953         final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
954         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
955                 "1234567890123456789012345678901234567890");
956         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
957                 new BasicRequestProducer(request1, null),
958                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
959         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
960         Assertions.assertNotNull(result1);
961         final HttpResponse response1 = result1.getHead();
962         Assertions.assertNotNull(response1);
963         Assertions.assertEquals(431, response1.getCode());
964         Assertions.assertEquals("Maximum header list size exceeded", result1.getBody());
965     }
966 
967     @Test
968     public void testHeaderTooLargePost() throws Exception {
969         final H2TestServer server = resources.server();
970         final H2TestClient client = resources.client();
971 
972         server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
973         final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
974                 .setMaxHeaderListSize(100)
975                 .build());
976         client.start(
977                 new DefaultHttpProcessor(H2RequestContent.INSTANCE, H2RequestTargetHost.INSTANCE, H2RequestConnControl.INSTANCE),
978                 H2Config.DEFAULT);
979 
980         final Future<ClientSessionEndpoint> connectFuture = client.connect(
981                 "localhost", serverEndpoint.getPort(), TIMEOUT);
982         final ClientSessionEndpoint streamEndpoint = connectFuture.get();
983 
984         final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
985         request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
986                 "1234567890123456789012345678901234567890");
987 
988         final byte[] b = new byte[2048];
989         for (int i = 0; i < b.length; i++) {
990             b[i] = (byte) ('a' + i % 10);
991         }
992 
993         final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
994                 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
995                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
996         final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
997         Assertions.assertNotNull(result1);
998         final HttpResponse response1 = result1.getHead();
999         Assertions.assertNotNull(response1);
1000         Assertions.assertEquals(431, response1.getCode());
1001         Assertions.assertEquals("Maximum header list size exceeded", result1.getBody());
1002     }
1003 
1004 }