1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.BufferedReader;
31 import java.io.BufferedWriter;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.io.InterruptedIOException;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.net.InetSocketAddress;
39 import java.net.URI;
40 import java.net.URISyntaxException;
41 import java.nio.ByteBuffer;
42 import java.nio.charset.Charset;
43 import java.nio.charset.StandardCharsets;
44 import java.util.Arrays;
45 import java.util.Collection;
46 import java.util.HashMap;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Locale;
50 import java.util.Map;
51 import java.util.Queue;
52 import java.util.StringTokenizer;
53 import java.util.concurrent.BlockingQueue;
54 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.LinkedBlockingDeque;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.atomic.AtomicReference;
62
63 import org.apache.hc.core5.function.Callback;
64 import org.apache.hc.core5.function.Decorator;
65 import org.apache.hc.core5.function.Supplier;
66 import org.apache.hc.core5.http.ContentType;
67 import org.apache.hc.core5.http.EndpointDetails;
68 import org.apache.hc.core5.http.EntityDetails;
69 import org.apache.hc.core5.http.Header;
70 import org.apache.hc.core5.http.HeaderElements;
71 import org.apache.hc.core5.http.HttpException;
72 import org.apache.hc.core5.http.HttpHeaders;
73 import org.apache.hc.core5.http.HttpHost;
74 import org.apache.hc.core5.http.HttpRequest;
75 import org.apache.hc.core5.http.HttpResponse;
76 import org.apache.hc.core5.http.HttpStatus;
77 import org.apache.hc.core5.http.Message;
78 import org.apache.hc.core5.http.Method;
79 import org.apache.hc.core5.http.ProtocolException;
80 import org.apache.hc.core5.http.URIScheme;
81 import org.apache.hc.core5.http.message.BasicHttpRequest;
82 import org.apache.hc.core5.http.nio.AsyncPushConsumer;
83 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
84 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
85 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
86 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
87 import org.apache.hc.core5.http.nio.CapacityChannel;
88 import org.apache.hc.core5.http.nio.DataStreamChannel;
89 import org.apache.hc.core5.http.nio.HandlerFactory;
90 import org.apache.hc.core5.http.nio.ResponseChannel;
91 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
92 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
93 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
94 import org.apache.hc.core5.http.nio.entity.NoopEntityConsumer;
95 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
96 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
97 import org.apache.hc.core5.http.nio.support.AbstractAsyncPushHandler;
98 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
99 import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
100 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
101 import org.apache.hc.core5.http.nio.support.BasicPushProducer;
102 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
103 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
104 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
105 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
106 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
107 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
108 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
109 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
110 import org.apache.hc.core5.http.protocol.HttpContext;
111 import org.apache.hc.core5.http.protocol.HttpCoreContext;
112 import org.apache.hc.core5.http2.H2Error;
113 import org.apache.hc.core5.http2.H2StreamResetException;
114 import org.apache.hc.core5.http2.config.H2Config;
115 import org.apache.hc.core5.http2.nio.command.PingCommand;
116 import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
117 import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
118 import org.apache.hc.core5.http2.protocol.H2RequestContent;
119 import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
120 import org.apache.hc.core5.reactor.Command;
121 import org.apache.hc.core5.reactor.IOReactorConfig;
122 import org.apache.hc.core5.reactor.IOSession;
123 import org.apache.hc.core5.testing.SSLTestContexts;
124 import org.apache.hc.core5.util.TextUtils;
125 import org.apache.hc.core5.util.TimeValue;
126 import org.apache.hc.core5.util.Timeout;
127 import org.hamcrest.CoreMatchers;
128 import org.hamcrest.MatcherAssert;
129 import org.junit.After;
130 import org.junit.Assert;
131 import org.junit.Before;
132 import org.junit.Test;
133 import org.junit.runner.RunWith;
134 import org.junit.runners.Parameterized;
135 import org.slf4j.Logger;
136 import org.slf4j.LoggerFactory;
137
138 @RunWith(Parameterized.class)
139 public class H2IntegrationTest extends InternalH2ServerTestBase {
140
141 private final Logger log = LoggerFactory.getLogger(getClass());
142
143 @Parameterized.Parameters(name = "{0}")
144 public static Collection<Object[]> protocols() {
145 return Arrays.asList(new Object[][]{
146 { URIScheme.HTTP },
147 { URIScheme.HTTPS }
148 });
149 }
150
151 public H2IntegrationTest(final URIScheme scheme) {
152 super(scheme);
153 }
154
155 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
156 private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
157
158 private H2TestClient client;
159
160 @Before
161 public void setup() throws Exception {
162 log.debug("Starting up test client");
163 client = new H2TestClient(buildReactorConfig(),
164 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
165 }
166
167 protected IOReactorConfig buildReactorConfig() {
168 return IOReactorConfig.DEFAULT;
169 }
170
171 @After
172 public void cleanup() throws Exception {
173 log.debug("Shutting down test client");
174 if (client != null) {
175 client.shutdown(TimeValue.ofSeconds(5));
176 }
177 }
178
179 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
180 try {
181 return new URI("http", null, "localhost", serverEndpoint.getPort(), path, null, null);
182 } catch (final URISyntaxException e) {
183 throw new IllegalStateException();
184 }
185 }
186
187 @Test
188 public void testSimpleGet() throws Exception {
189 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
190
191 @Override
192 public AsyncServerExchangeHandler get() {
193 return new SingleLineResponseHandler("Hi there");
194 }
195
196 });
197 final InetSocketAddress serverEndpoint = server.start();
198
199 client.start();
200 final Future<ClientSessionEndpoint> connectFuture = client.connect(
201 "localhost", serverEndpoint.getPort(), TIMEOUT);
202 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
203
204 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
205 for (int i = 0; i < 10; i++) {
206 queue.add(streamEndpoint.execute(
207 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
208 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
209
210 }
211 while (!queue.isEmpty()) {
212 final Future<Message<HttpResponse, String>> future = queue.remove();
213 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
214 Assert.assertNotNull(result);
215 final HttpResponse response = result.getHead();
216 final String entity = result.getBody();
217 Assert.assertNotNull(response);
218 Assert.assertEquals(200, response.getCode());
219 Assert.assertEquals("Hi there", entity);
220 }
221 }
222
223 @Test
224 public void testSimpleHead() throws Exception {
225 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
226
227 @Override
228 public AsyncServerExchangeHandler get() {
229 return new SingleLineResponseHandler("Hi there");
230 }
231
232 });
233 final InetSocketAddress serverEndpoint = server.start();
234
235 client.start();
236 final Future<ClientSessionEndpoint> connectFuture = client.connect(
237 "localhost", serverEndpoint.getPort(), TIMEOUT);
238 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
239
240 for (int i = 0; i < 5; i++) {
241 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
242 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
243 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
244 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
245 Assert.assertNotNull(result);
246 final HttpResponse response1 = result.getHead();
247 Assert.assertNotNull(response1);
248 Assert.assertEquals(200, response1.getCode());
249 Assert.assertNull(result.getBody());
250 }
251 }
252
253 @Test
254 public void testLargeGet() throws Exception {
255 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
256
257 @Override
258 public AsyncServerExchangeHandler get() {
259 return new MultiLineResponseHandler("0123456789abcdef", 5000);
260 }
261
262 });
263 final InetSocketAddress serverEndpoint = server.start();
264
265 client.start();
266 final Future<ClientSessionEndpoint> connectFuture = client.connect(
267 "localhost", serverEndpoint.getPort(), TIMEOUT);
268 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
269
270 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
271 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
272 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
273
274 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
275 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
276 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
277
278 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
279 Assert.assertNotNull(result1);
280 final HttpResponse response1 = result1.getHead();
281 Assert.assertNotNull(response1);
282 Assert.assertEquals(200, response1.getCode());
283 final String s1 = result1.getBody();
284 Assert.assertNotNull(s1);
285 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
286 while (t1.hasMoreTokens()) {
287 Assert.assertEquals("0123456789abcdef", t1.nextToken());
288 }
289
290 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
291 Assert.assertNotNull(result2);
292 final HttpResponse response2 = result2.getHead();
293 Assert.assertNotNull(response2);
294 Assert.assertEquals(200, response2.getCode());
295 final String s2 = result2.getBody();
296 Assert.assertNotNull(s2);
297 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
298 while (t2.hasMoreTokens()) {
299 Assert.assertEquals("0123456789abcdef", t2.nextToken());
300 }
301 }
302
303 @Test
304 public void testBasicPost() throws Exception {
305 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
306
307 @Override
308 public AsyncServerExchangeHandler get() {
309 return new SingleLineResponseHandler("Hi back");
310 }
311
312 });
313 final InetSocketAddress serverEndpoint = server.start();
314
315 client.start();
316 final Future<ClientSessionEndpoint> connectFuture = client.connect(
317 "localhost", serverEndpoint.getPort(), TIMEOUT);
318 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
319
320 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
321 for (int i = 0; i < 10; i++) {
322 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
323 queue.add(streamEndpoint.execute(
324 new BasicRequestProducer(request, new StringAsyncEntityProducer("Hi there", ContentType.TEXT_PLAIN)),
325 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
326
327 }
328 while (!queue.isEmpty()) {
329 final Future<Message<HttpResponse, String>> future = queue.remove();
330 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
331 Assert.assertNotNull(result);
332 final HttpResponse response = result.getHead();
333 final String entity1 = result.getBody();
334 Assert.assertNotNull(response);
335 Assert.assertEquals(200, response.getCode());
336 Assert.assertEquals("Hi back", entity1);
337 }
338 }
339
340 @Test
341 public void testLargePost() throws Exception {
342 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
343
344 @Override
345 public AsyncServerExchangeHandler get() {
346 return new EchoHandler(2048);
347 }
348
349 });
350 final InetSocketAddress serverEndpoint = server.start();
351
352 client.start();
353 final Future<ClientSessionEndpoint> connectFuture = client.connect(
354 "localhost", serverEndpoint.getPort(), TIMEOUT);
355 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
356
357 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
358 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
359 new MultiLineEntityProducer("0123456789abcdef", 5000)),
360 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
361 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
362 Assert.assertNotNull(result1);
363 final HttpResponse response1 = result1.getHead();
364 Assert.assertNotNull(response1);
365 Assert.assertEquals(200, response1.getCode());
366 final String s1 = result1.getBody();
367 Assert.assertNotNull(s1);
368 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
369 while (t1.hasMoreTokens()) {
370 Assert.assertEquals("0123456789abcdef", t1.nextToken());
371 }
372 }
373
374 @Test
375 public void testSlowResponseConsumer() throws Exception {
376 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
377
378 @Override
379 public AsyncServerExchangeHandler get() {
380 return new MultiLineResponseHandler("0123456789abcd", 3);
381 }
382
383 });
384 final InetSocketAddress serverEndpoint = server.start();
385
386 client.start(H2Config.custom().setInitialWindowSize(16).build());
387 final Future<ClientSessionEndpoint> connectFuture = client.connect(
388 "localhost", serverEndpoint.getPort(), TIMEOUT);
389 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
390
391 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
392 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/"), null),
393 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
394
395 @Override
396 protected String consumeData(
397 final ContentType contentType, final InputStream inputStream) throws IOException {
398 Charset charset = contentType != null ? contentType.getCharset() : null;
399 if (charset == null) {
400 charset = StandardCharsets.US_ASCII;
401 }
402
403 final StringBuilder buffer = new StringBuilder();
404 try {
405 final byte[] tmp = new byte[16];
406 int l;
407 while ((l = inputStream.read(tmp)) != -1) {
408 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
409 Thread.sleep(500);
410 }
411 } catch (final InterruptedException ex) {
412 Thread.currentThread().interrupt();
413 throw new InterruptedIOException(ex.getMessage());
414 }
415 return buffer.toString();
416 }
417 }),
418 null);
419
420 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
421 Assert.assertNotNull(result1);
422 final HttpResponse response1 = result1.getHead();
423 Assert.assertNotNull(response1);
424 Assert.assertEquals(200, response1.getCode());
425 final String s1 = result1.getBody();
426 Assert.assertNotNull(s1);
427 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
428 while (t1.hasMoreTokens()) {
429 Assert.assertEquals("0123456789abcd", t1.nextToken());
430 }
431 }
432
433 @Test
434 public void testSlowRequestProducer() throws Exception {
435 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
436
437 @Override
438 public AsyncServerExchangeHandler get() {
439 return new EchoHandler(2048);
440 }
441
442 });
443 final InetSocketAddress serverEndpoint = server.start();
444
445 client.start();
446 final Future<ClientSessionEndpoint> connectFuture = client.connect(
447 "localhost", serverEndpoint.getPort(), TIMEOUT);
448 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
449
450 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
451 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
452 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
453
454 @Override
455 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
456 Charset charset = contentType.getCharset();
457 if (charset == null) {
458 charset = StandardCharsets.US_ASCII;
459 }
460 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
461 for (int i = 0; i < 500; i++) {
462 if (i % 100 == 0) {
463 writer.flush();
464 Thread.sleep(500);
465 }
466 writer.write("0123456789abcdef\r\n");
467 }
468 } catch (final InterruptedException ex) {
469 Thread.currentThread().interrupt();
470 throw new InterruptedIOException(ex.getMessage());
471 }
472 }
473
474 }),
475 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
476 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
477 Assert.assertNotNull(result1);
478 final HttpResponse response1 = result1.getHead();
479 Assert.assertNotNull(response1);
480 Assert.assertEquals(200, response1.getCode());
481 final String s1 = result1.getBody();
482 Assert.assertNotNull(s1);
483 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
484 while (t1.hasMoreTokens()) {
485 Assert.assertEquals("0123456789abcdef", t1.nextToken());
486 }
487 }
488
489 @Test
490 public void testSlowResponseProducer() throws Exception {
491 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
492
493 @Override
494 public AsyncServerExchangeHandler get() {
495 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
496
497 @Override
498 protected void handle(
499 final HttpRequest request,
500 final InputStream requestStream,
501 final HttpResponse response,
502 final OutputStream responseStream,
503 final HttpContext context) throws IOException, HttpException {
504
505 if (!"/hello".equals(request.getPath())) {
506 response.setCode(HttpStatus.SC_NOT_FOUND);
507 return;
508 }
509 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
510 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
511 return;
512 }
513 if (requestStream == null) {
514 return;
515 }
516 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
517 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
518 Charset charset = contentType != null ? contentType.getCharset() : null;
519 if (charset == null) {
520 charset = StandardCharsets.US_ASCII;
521 }
522 response.setCode(HttpStatus.SC_OK);
523 response.setHeader(h1);
524 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
525 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
526 try {
527 String l;
528 int count = 0;
529 while ((l = reader.readLine()) != null) {
530 writer.write(l);
531 writer.write("\r\n");
532 count++;
533 if (count % 500 == 0) {
534 Thread.sleep(500);
535 }
536 }
537 writer.flush();
538 } catch (final InterruptedException ex) {
539 Thread.currentThread().interrupt();
540 throw new InterruptedIOException(ex.getMessage());
541 }
542 }
543 }
544 };
545 }
546
547 });
548 final InetSocketAddress serverEndpoint = server.start();
549
550 client.start(H2Config.custom()
551 .setInitialWindowSize(512)
552 .build());
553
554 final Future<ClientSessionEndpoint> connectFuture = client.connect(
555 "localhost", serverEndpoint.getPort(), TIMEOUT);
556 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
557
558 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
559 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
560 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
561 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
562 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
563 Assert.assertNotNull(result1);
564 final HttpResponse response1 = result1.getHead();
565 Assert.assertNotNull(response1);
566 Assert.assertEquals(200, response1.getCode());
567 final String s1 = result1.getBody();
568 Assert.assertNotNull(s1);
569 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
570 while (t1.hasMoreTokens()) {
571 Assert.assertEquals("0123456789abcd", t1.nextToken());
572 }
573 }
574
575 @Test
576 public void testPush() throws Exception {
577 final InetSocketAddress serverEndpoint = server.start();
578 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
579
580 @Override
581 public AsyncServerExchangeHandler get() {
582 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
583
584 @Override
585 protected void handle(
586 final Message<HttpRequest, Void> request,
587 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
588 final HttpContext context) throws IOException, HttpException {
589 responseTrigger.pushPromise(
590 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
591 context,
592 new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)));
593 responseTrigger.submitResponse(
594 AsyncResponseBuilder.create(HttpStatus.SC_OK).setEntity("Hi there", ContentType.TEXT_PLAIN).build(),
595 context);
596 }
597 };
598 }
599
600 });
601
602 client.start(H2Config.custom().setPushEnabled(true).build());
603
604 final BlockingQueue<Message<HttpResponse, String>> pushMessageQueue = new LinkedBlockingDeque<>();
605
606 final Future<ClientSessionEndpoint> connectFuture = client.connect(
607 "localhost", serverEndpoint.getPort(), TIMEOUT);
608 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
609
610 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
611 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
612 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
613 new HandlerFactory<AsyncPushConsumer>() {
614
615 @Override
616 public AsyncPushConsumer create(
617 final HttpRequest request, final HttpContext context) throws HttpException {
618 return new AbstractAsyncPushHandler<Message<HttpResponse, String>>(new BasicResponseConsumer<>(new StringAsyncEntityConsumer())) {
619
620 @Override
621 protected void handleResponse(
622 final HttpRequest promise,
623 final Message<HttpResponse, String> responseMessage) throws IOException, HttpException {
624 try {
625 pushMessageQueue.put(responseMessage);
626 } catch (final InterruptedException ex) {
627 Thread.currentThread().interrupt();
628 throw new InterruptedIOException(ex.getMessage());
629 }
630 }
631
632 };
633 }
634 },
635 null,
636 null);
637 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
638 Assert.assertNotNull(result1);
639 final HttpResponse response1 = result1.getHead();
640 final String entity1 = result1.getBody();
641 Assert.assertNotNull(response1);
642 Assert.assertEquals(200, response1.getCode());
643 Assert.assertEquals("Hi there", entity1);
644
645 final Message<HttpResponse, String> result2 = pushMessageQueue.poll(5, TimeUnit.SECONDS);
646 Assert.assertNotNull(result2);
647 final HttpResponse response2 = result2.getHead();
648 final String entity2 = result2.getBody();
649 Assert.assertEquals(200, response2.getCode());
650 Assert.assertNotNull(entity2);
651 final StringTokenizer t1 = new StringTokenizer(entity2, "\r\n");
652 while (t1.hasMoreTokens()) {
653 Assert.assertEquals("Pushing lots of stuff", t1.nextToken());
654 }
655 }
656
657 @Test
658 public void testPushRefused() throws Exception {
659 final BlockingQueue<Exception> pushResultQueue = new LinkedBlockingDeque<>();
660 final InetSocketAddress serverEndpoint = server.start();
661 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
662
663 @Override
664 public AsyncServerExchangeHandler get() {
665 return new MessageExchangeHandler<Void>(new NoopEntityConsumer()) {
666
667 @Override
668 protected void handle(
669 final Message<HttpRequest, Void> request,
670 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
671 final HttpContext context) throws IOException, HttpException {
672
673 responseTrigger.pushPromise(
674 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/stuff")),
675 context, new BasicPushProducer(AsyncEntityProducers.create("Pushing all sorts of stuff")) {
676
677 @Override
678 public void failed(final Exception cause) {
679 pushResultQueue.add(cause);
680 super.failed(cause);
681 }
682
683 });
684 responseTrigger.pushPromise(
685 new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/more-stuff")),
686 context, new BasicPushProducer(new MultiLineEntityProducer("Pushing lots of stuff", 500)) {
687
688 @Override
689 public void failed(final Exception cause) {
690 pushResultQueue.add(cause);
691 super.failed(cause);
692 }
693
694 });
695 responseTrigger.submitResponse(
696 new BasicResponseProducer(HttpStatus.SC_OK, AsyncEntityProducers.create("Hi there")),
697 context);
698 }
699 };
700 }
701
702 });
703
704 client.start(H2Config.custom().setPushEnabled(true).build());
705
706 final Future<ClientSessionEndpoint> connectFuture = client.connect(
707 "localhost", serverEndpoint.getPort(), TIMEOUT);
708 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
709
710 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
711 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
712 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
713 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
714 Assert.assertNotNull(result1);
715 final HttpResponse response1 = result1.getHead();
716 final String entity1 = result1.getBody();
717 Assert.assertNotNull(response1);
718 Assert.assertEquals(200, response1.getCode());
719 Assert.assertEquals("Hi there", entity1);
720
721 final Object result2 = pushResultQueue.poll(5, TimeUnit.SECONDS);
722 Assert.assertNotNull(result2);
723 Assert.assertTrue(result2 instanceof H2StreamResetException);
724 Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result2).getCode());
725
726 final Object result3 = pushResultQueue.poll(5, TimeUnit.SECONDS);
727 Assert.assertNotNull(result3);
728 Assert.assertTrue(result3 instanceof H2StreamResetException);
729 Assert.assertEquals(H2Error.REFUSED_STREAM.getCode(), ((H2StreamResetException) result3).getCode());
730 }
731
732 @Test
733 public void testExcessOfConcurrentStreams() throws Exception {
734 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
735
736 @Override
737 public AsyncServerExchangeHandler get() {
738 return new MultiLineResponseHandler("0123456789abcdef", 2000);
739 }
740
741 });
742 final InetSocketAddress serverEndpoint = server.start(H2Config.custom().setMaxConcurrentStreams(20).build());
743
744 client.start(H2Config.custom().setMaxConcurrentStreams(20).build());
745 final Future<ClientSessionEndpoint> connectFuture = client.connect(
746 "localhost", serverEndpoint.getPort(), TIMEOUT);
747 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
748
749 final Queue<Future<Message<HttpResponse, Void>>> queue = new LinkedList<>();
750 for (int i = 0; i < 2000; i++) {
751 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
752 final Future<Message<HttpResponse, Void>> future = streamEndpoint.execute(
753 new BasicRequestProducer(request1, null),
754 new BasicResponseConsumer<>(new NoopEntityConsumer()), null);
755 queue.add(future);
756 }
757
758 while (!queue.isEmpty()) {
759 final Future<Message<HttpResponse, Void>> future = queue.remove();
760 final Message<HttpResponse, Void> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
761 Assert.assertNotNull(result);
762 final HttpResponse response = result.getHead();
763 Assert.assertNotNull(response);
764 Assert.assertEquals(200, response.getCode());
765 }
766 }
767
768 @Test
769 public void testExpectationFailed() throws Exception {
770 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
771
772 @Override
773 public AsyncServerExchangeHandler get() {
774 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
775
776 @Override
777 protected void handle(
778 final Message<HttpRequest, String> request,
779 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
780 final HttpContext context) throws IOException, HttpException {
781 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
782
783 }
784 };
785 }
786
787 });
788 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
789
790 @Override
791 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
792
793 return new BasicAsyncServerExpectationDecorator(handler) {
794
795 @Override
796 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
797 final Header h = request.getFirstHeader("password");
798 if (h != null && "secret".equals(h.getValue())) {
799 return null;
800 } else {
801 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
802 }
803 }
804 };
805
806 }
807 }, H2Config.DEFAULT);
808
809 client.start();
810 final Future<ClientSessionEndpoint> connectFuture = client.connect(
811 "localhost", serverEndpoint.getPort(), TIMEOUT);
812 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
813
814 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
815 request1.addHeader("password", "secret");
816 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
817 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
818 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
819 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
820 Assert.assertNotNull(result1);
821 final HttpResponse response1 = result1.getHead();
822 Assert.assertNotNull(response1);
823 Assert.assertEquals(200, response1.getCode());
824 Assert.assertNotNull("All is well", result1.getBody());
825
826 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
827 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
828 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
829 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
830 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
831 Assert.assertNotNull(result2);
832 final HttpResponse response2 = result2.getHead();
833 Assert.assertNotNull(response2);
834 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
835 Assert.assertNotNull("You shall not pass", result2.getBody());
836 }
837
838 @Test
839 public void testPrematureResponse() throws Exception {
840 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
841
842 @Override
843 public AsyncServerExchangeHandler get() {
844 return new AsyncServerExchangeHandler() {
845
846 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
847
848 @Override
849 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
850 capacityChannel.update(Integer.MAX_VALUE);
851 }
852
853 @Override
854 public void consume(final ByteBuffer src) throws IOException {
855 }
856
857 @Override
858 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
859 }
860
861 @Override
862 public void handleRequest(
863 final HttpRequest request,
864 final EntityDetails entityDetails,
865 final ResponseChannel responseChannel,
866 final HttpContext context) throws HttpException, IOException {
867 final AsyncResponseProducer producer;
868 final Header h = request.getFirstHeader("password");
869 if (h != null && "secret".equals(h.getValue())) {
870 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
871 } else {
872 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
873 }
874 responseProducer.set(producer);
875 producer.sendResponse(responseChannel, context);
876 }
877
878 @Override
879 public int available() {
880 final AsyncResponseProducer producer = this.responseProducer.get();
881 return producer.available();
882 }
883
884 @Override
885 public void produce(final DataStreamChannel channel) throws IOException {
886 final AsyncResponseProducer producer = this.responseProducer.get();
887 producer.produce(channel);
888 }
889
890 @Override
891 public void failed(final Exception cause) {
892 }
893
894 @Override
895 public void releaseResources() {
896 }
897 };
898 }
899
900 });
901 final InetSocketAddress serverEndpoint = server.start();
902
903 client.start();
904 final Future<ClientSessionEndpoint> connectFuture = client.connect(
905 "localhost", serverEndpoint.getPort(), TIMEOUT);
906 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
907
908 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
909 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
910 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 5000)),
911 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
912 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
913 Assert.assertNotNull(result1);
914 final HttpResponse response1 = result1.getHead();
915 Assert.assertNotNull(response1);
916 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
917 Assert.assertNotNull("You shall not pass", result1.getBody());
918 }
919
920 @Test
921 public void testMessageWithTrailers() throws Exception {
922 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
923
924 @Override
925 public AsyncServerExchangeHandler get() {
926 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
927
928 @Override
929 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
930 final HttpRequest request,
931 final EntityDetails entityDetails,
932 final HttpContext context) throws HttpException {
933 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
934 }
935
936 @Override
937 protected void handle(
938 final Message<HttpRequest, String> requestMessage,
939 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
940 final HttpContext context) throws HttpException, IOException {
941 responseTrigger.submitResponse(new BasicResponseProducer(
942 HttpStatus.SC_OK,
943 new DigestingEntityProducer("MD5",
944 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
945 }
946 };
947 }
948
949 });
950 final InetSocketAddress serverEndpoint = server.start();
951
952 client.start();
953
954 final Future<ClientSessionEndpoint> connectFuture = client.connect(
955 "localhost", serverEndpoint.getPort(), TIMEOUT);
956 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
957
958 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
959 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
960 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
961 new BasicRequestProducer(request1, null),
962 new BasicResponseConsumer<>(entityConsumer), null);
963 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
964 Assert.assertNotNull(result1);
965 final HttpResponse response1 = result1.getHead();
966 Assert.assertNotNull(response1);
967 Assert.assertEquals(200, response1.getCode());
968 Assert.assertEquals("Hello back with some trailers", result1.getBody());
969
970 final List<Header> trailers = entityConsumer.getTrailers();
971 Assert.assertNotNull(trailers);
972 Assert.assertEquals(2, trailers.size());
973 final Map<String, String> map = new HashMap<>();
974 for (final Header header: trailers) {
975 map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
976 }
977 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
978 Assert.assertEquals("MD5", map.get("digest-algo"));
979 Assert.assertEquals(digest, map.get("digest"));
980 }
981
982 @Test
983 public void testConnectionPing() throws Exception {
984 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
985
986 @Override
987 public AsyncServerExchangeHandler get() {
988 return new SingleLineResponseHandler("Hi there");
989 }
990
991 });
992 final InetSocketAddress serverEndpoint = server.start();
993
994 client.start();
995 final Future<ClientSessionEndpoint> connectFuture = client.connect(
996 "localhost", serverEndpoint.getPort(), TIMEOUT);
997 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
998
999 final int n = 10;
1000 final CountDownLatch latch = new CountDownLatch(n);
1001 final AtomicInteger count = new AtomicInteger(0);
1002 for (int i = 0; i < n; i++) {
1003 streamEndpoint.execute(
1004 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1005 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1006 streamEndpoint.execute(new PingCommand(new BasicPingHandler(new Callback<Boolean>() {
1007
1008 @Override
1009 public void execute(final Boolean result) {
1010 if (result) {
1011 count.incrementAndGet();
1012 }
1013 latch.countDown();
1014 }
1015
1016 })), Command.Priority.NORMAL);
1017
1018 }
1019 Assert.assertTrue(latch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1020 Assert.assertEquals(n, count.get());
1021 }
1022
1023 @Test
1024 public void testRequestWithInvalidConnectionHeader() throws Exception {
1025 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1026
1027 @Override
1028 public AsyncServerExchangeHandler get() {
1029 return new SingleLineResponseHandler("Hi there");
1030 }
1031
1032 });
1033 final InetSocketAddress serverEndpoint = server.start();
1034
1035 client.start();
1036
1037 final Future<IOSession> sessionFuture = client.requestSession(new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
1038 final IOSession session = sessionFuture.get();
1039 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(session);
1040
1041 final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1042 request.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
1043 final HttpCoreContext coreContext = HttpCoreContext.create();
1044 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1045 new BasicRequestProducer(request, null),
1046 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
1047 coreContext, null);
1048 try {
1049 future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1050 Assert.fail("ExecutionException is expected");
1051 } catch (final ExecutionException ex) {
1052 MatcherAssert.assertThat(ex.getCause(), CoreMatchers.instanceOf(ProtocolException.class));
1053 }
1054
1055 final EndpointDetails endpointDetails = coreContext.getEndpointDetails();
1056 MatcherAssert.assertThat(endpointDetails.getRequestCount(), CoreMatchers.equalTo(0L));
1057 }
1058
1059 @Test
1060 public void testHeaderTooLarge() throws Exception {
1061 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1062
1063 @Override
1064 public AsyncServerExchangeHandler get() {
1065 return new SingleLineResponseHandler("Hi there");
1066 }
1067
1068 });
1069 final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1070 .setMaxHeaderListSize(100)
1071 .build());
1072 client.start();
1073
1074 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1075 "localhost", serverEndpoint.getPort(), TIMEOUT);
1076 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1077
1078 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1079 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1080 "1234567890123456789012345678901234567890");
1081 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1082 new BasicRequestProducer(request1, null),
1083 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1084 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1085 Assert.assertNotNull(result1);
1086 final HttpResponse response1 = result1.getHead();
1087 Assert.assertNotNull(response1);
1088 Assert.assertEquals(431, response1.getCode());
1089 Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1090 }
1091
1092 @Test
1093 public void testHeaderTooLargePost() throws Exception {
1094 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1095
1096 @Override
1097 public AsyncServerExchangeHandler get() {
1098 return new SingleLineResponseHandler("Hi there");
1099 }
1100
1101 });
1102 final InetSocketAddress serverEndpoint = server.start(H2Config.custom()
1103 .setMaxHeaderListSize(100)
1104 .build());
1105 client.start(
1106 new DefaultHttpProcessor(new H2RequestContent(), new H2RequestTargetHost(), new H2RequestConnControl()),
1107 H2Config.DEFAULT);
1108
1109 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1110 "localhost", serverEndpoint.getPort(), TIMEOUT);
1111 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1112
1113 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1114 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1115 "1234567890123456789012345678901234567890");
1116
1117 final byte[] b = new byte[2048];
1118 for (int i = 0; i < b.length; i++) {
1119 b[i] = (byte) ('a' + i % 10);
1120 }
1121
1122 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1123 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1124 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1125 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1126 Assert.assertNotNull(result1);
1127 final HttpResponse response1 = result1.getHead();
1128 Assert.assertNotNull(response1);
1129 Assert.assertEquals(431, response1.getCode());
1130 Assert.assertEquals("Maximum header list size exceeded", result1.getBody());
1131 }
1132
1133 }