1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.BufferedReader;
31 import java.io.BufferedWriter;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.io.InterruptedIOException;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.net.InetSocketAddress;
39 import java.net.URI;
40 import java.net.URISyntaxException;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.WritableByteChannel;
43 import java.nio.charset.Charset;
44 import java.nio.charset.StandardCharsets;
45 import java.util.Arrays;
46 import java.util.Collection;
47 import java.util.HashMap;
48 import java.util.LinkedList;
49 import java.util.List;
50 import java.util.Locale;
51 import java.util.Map;
52 import java.util.Queue;
53 import java.util.Random;
54 import java.util.StringTokenizer;
55 import java.util.concurrent.CancellationException;
56 import java.util.concurrent.ExecutionException;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.atomic.AtomicReference;
60
61 import org.apache.hc.core5.function.Decorator;
62 import org.apache.hc.core5.function.Supplier;
63 import org.apache.hc.core5.http.ConnectionReuseStrategy;
64 import org.apache.hc.core5.http.ContentLengthStrategy;
65 import org.apache.hc.core5.http.ContentType;
66 import org.apache.hc.core5.http.EntityDetails;
67 import org.apache.hc.core5.http.Header;
68 import org.apache.hc.core5.http.HeaderElements;
69 import org.apache.hc.core5.http.HttpException;
70 import org.apache.hc.core5.http.HttpHeaders;
71 import org.apache.hc.core5.http.HttpHost;
72 import org.apache.hc.core5.http.HttpRequest;
73 import org.apache.hc.core5.http.HttpResponse;
74 import org.apache.hc.core5.http.HttpStatus;
75 import org.apache.hc.core5.http.HttpVersion;
76 import org.apache.hc.core5.http.MalformedChunkCodingException;
77 import org.apache.hc.core5.http.Message;
78 import org.apache.hc.core5.http.Method;
79 import org.apache.hc.core5.http.ProtocolException;
80 import org.apache.hc.core5.http.URIScheme;
81 import org.apache.hc.core5.http.config.CharCodingConfig;
82 import org.apache.hc.core5.http.config.Http1Config;
83 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
84 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
85 import org.apache.hc.core5.http.impl.Http1StreamListener;
86 import org.apache.hc.core5.http.impl.HttpProcessors;
87 import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
88 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
89 import org.apache.hc.core5.http.message.BasicHttpRequest;
90 import org.apache.hc.core5.http.message.BasicHttpResponse;
91 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
92 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
93 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
94 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
95 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
96 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
97 import org.apache.hc.core5.http.nio.CapacityChannel;
98 import org.apache.hc.core5.http.nio.ContentEncoder;
99 import org.apache.hc.core5.http.nio.DataStreamChannel;
100 import org.apache.hc.core5.http.nio.HandlerFactory;
101 import org.apache.hc.core5.http.nio.NHttpMessageParser;
102 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
103 import org.apache.hc.core5.http.nio.ResponseChannel;
104 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
105 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
107 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
109 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
110 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
111 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
112 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
113 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
114 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
116 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
117 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
120 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
121 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
122 import org.apache.hc.core5.http.protocol.HttpContext;
123 import org.apache.hc.core5.http.protocol.HttpProcessor;
124 import org.apache.hc.core5.http.protocol.RequestConnControl;
125 import org.apache.hc.core5.http.protocol.RequestContent;
126 import org.apache.hc.core5.http.protocol.RequestTargetHost;
127 import org.apache.hc.core5.http.protocol.RequestValidateHost;
128 import org.apache.hc.core5.reactor.IOReactorConfig;
129 import org.apache.hc.core5.reactor.IOSession;
130 import org.apache.hc.core5.reactor.ProtocolIOSession;
131 import org.apache.hc.core5.testing.SSLTestContexts;
132 import org.apache.hc.core5.util.CharArrayBuffer;
133 import org.apache.hc.core5.util.TextUtils;
134 import org.apache.hc.core5.util.TimeValue;
135 import org.apache.hc.core5.util.Timeout;
136 import org.junit.After;
137 import org.junit.Assert;
138 import org.junit.Before;
139 import org.junit.Test;
140 import org.junit.runner.RunWith;
141 import org.junit.runners.Parameterized;
142 import org.slf4j.Logger;
143 import org.slf4j.LoggerFactory;
144
145 @RunWith(Parameterized.class)
146 public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
147
148 private final Logger log = LoggerFactory.getLogger(getClass());
149
150 @Parameterized.Parameters(name = "{0}")
151 public static Collection<Object[]> protocols() {
152 return Arrays.asList(new Object[][]{
153 { URIScheme.HTTP },
154 { URIScheme.HTTPS }
155 });
156 }
157
158 public Http1IntegrationTest(final URIScheme scheme) {
159 super(scheme);
160 }
161
162 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
163 private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
164
165 private Http1TestClient client;
166
167 @Before
168 public void setup() throws Exception {
169 log.debug("Starting up test client");
170 client = new Http1TestClient(
171 buildReactorConfig(),
172 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
173 }
174
175 protected IOReactorConfig buildReactorConfig() {
176 return IOReactorConfig.DEFAULT;
177 }
178
179 @After
180 public void cleanup() throws Exception {
181 log.debug("Shutting down test client");
182 if (client != null) {
183 client.shutdown(TimeValue.ofSeconds(5));
184 }
185 }
186
187 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
188 try {
189 return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
190 } catch (final URISyntaxException e) {
191 throw new IllegalStateException();
192 }
193 }
194
195 @Test
196 public void testSimpleGet() throws Exception {
197 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
198
199 @Override
200 public AsyncServerExchangeHandler get() {
201 return new SingleLineResponseHandler("Hi there");
202 }
203
204 });
205 final InetSocketAddress serverEndpoint = server.start();
206
207 client.start();
208 final Future<ClientSessionEndpoint> connectFuture = client.connect(
209 "localhost", serverEndpoint.getPort(), TIMEOUT);
210 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
211
212 for (int i = 0; i < 5; i++) {
213 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
214 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
215 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
216 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
217 Assert.assertNotNull(result);
218 final HttpResponse response1 = result.getHead();
219 final String entity1 = result.getBody();
220 Assert.assertNotNull(response1);
221 Assert.assertEquals(200, response1.getCode());
222 Assert.assertEquals("Hi there", entity1);
223 }
224 }
225
226 @Test
227 public void testSimpleGetConnectionClose() throws Exception {
228 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
229
230 @Override
231 public AsyncServerExchangeHandler get() {
232 return new SingleLineResponseHandler("Hi there");
233 }
234
235 });
236 final InetSocketAddress serverEndpoint = server.start();
237
238 client.start();
239 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
240 for (int i = 0; i < 5; i++) {
241 final Future<ClientSessionEndpoint> connectFuture = client.connect(
242 "localhost", serverEndpoint.getPort(), TIMEOUT);
243 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
244 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
245 AsyncRequestBuilder.get(requestURI)
246 .addHeader(HttpHeaders.CONNECTION, "close")
247 .build(),
248 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
249 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
250 Assert.assertNotNull(result);
251 final HttpResponse response1 = result.getHead();
252 final String entity1 = result.getBody();
253 Assert.assertNotNull(response1);
254 Assert.assertEquals(200, response1.getCode());
255 Assert.assertEquals("Hi there", entity1);
256 }
257 }
258 }
259
260 @Test
261 public void testSimpleGetIdentityTransfer() throws Exception {
262 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
263
264 @Override
265 public AsyncServerExchangeHandler get() {
266 return new SingleLineResponseHandler("Hi there");
267 }
268
269 });
270 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
271 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
272
273 client.start();
274
275 final int reqNo = 5;
276
277 for (int i = 0; i < reqNo; i++) {
278 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
279 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
280
281 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
282 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
283 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
284 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
285
286 streamEndpoint.close();
287
288 Assert.assertNotNull(result);
289 final HttpResponse response = result.getHead();
290 final String entity = result.getBody();
291 Assert.assertNotNull(response);
292 Assert.assertEquals(200, response.getCode());
293 Assert.assertEquals("Hi there", entity);
294 }
295
296 }
297
298 @Test
299 public void testPostIdentityTransfer() throws Exception {
300 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
301
302 @Override
303 public AsyncServerExchangeHandler get() {
304 return new SingleLineResponseHandler("Hi there");
305 }
306
307 });
308 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
309 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
310
311 client.start();
312
313 final int reqNo = 5;
314
315 for (int i = 0; i < reqNo; i++) {
316 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
317 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
318
319 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
320 new BasicRequestProducer(Method.POST,
321 createRequestURI(serverEndpoint, "/hello"),
322 new MultiLineEntityProducer("Hello", 16 * i)),
323 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
324 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
325
326 streamEndpoint.close();
327
328 Assert.assertNotNull(result);
329 final HttpResponse response = result.getHead();
330 final String entity = result.getBody();
331 Assert.assertNotNull(response);
332 Assert.assertEquals(200, response.getCode());
333 Assert.assertEquals("Hi there", entity);
334 }
335 }
336
337 @Test
338 public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
339 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
340
341 @Override
342 public AsyncServerExchangeHandler get() {
343 return new ImmediateResponseExchangeHandler(500, "Go away");
344 }
345
346 });
347 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
348 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
349
350 client.start();
351
352 final int reqNo = 5;
353
354 for (int i = 0; i < reqNo; i++) {
355 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
356 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
357
358 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
359 new BasicRequestProducer(Method.POST,
360 createRequestURI(serverEndpoint, "/hello"),
361 new MultiLineEntityProducer("Hello", 16 * i)),
362 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
363 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
364
365 streamEndpoint.close();
366
367 Assert.assertNotNull(result);
368 final HttpResponse response = result.getHead();
369 final String entity = result.getBody();
370 Assert.assertNotNull(response);
371 Assert.assertEquals(500, response.getCode());
372 Assert.assertEquals("Go away", entity);
373 }
374 }
375
376 @Test
377 public void testSimpleGetsPipelined() throws Exception {
378 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
379
380 @Override
381 public AsyncServerExchangeHandler get() {
382 return new SingleLineResponseHandler("Hi there");
383 }
384
385 });
386 final InetSocketAddress serverEndpoint = server.start();
387
388 client.start();
389 final Future<ClientSessionEndpoint> connectFuture = client.connect(
390 "localhost", serverEndpoint.getPort(), TIMEOUT);
391 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
392
393 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
394 for (int i = 0; i < 5; i++) {
395 queue.add(streamEndpoint.execute(
396 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
397 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
398 }
399 while (!queue.isEmpty()) {
400 final Future<Message<HttpResponse, String>> future = queue.remove();
401 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
402 Assert.assertNotNull(result);
403 final HttpResponse response = result.getHead();
404 final String entity = result.getBody();
405 Assert.assertNotNull(response);
406 Assert.assertEquals(200, response.getCode());
407 Assert.assertEquals("Hi there", entity);
408 }
409 }
410
411 @Test
412 public void testLargeGet() throws Exception {
413 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
414
415 @Override
416 public AsyncServerExchangeHandler get() {
417 return new MultiLineResponseHandler("0123456789abcdef", 5000);
418 }
419
420 });
421 final InetSocketAddress serverEndpoint = server.start();
422
423 client.start();
424 final Future<ClientSessionEndpoint> connectFuture = client.connect(
425 "localhost", serverEndpoint.getPort(), TIMEOUT);
426 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
427
428 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
429 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
430 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
431
432 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
433 Assert.assertNotNull(result1);
434 final HttpResponse response1 = result1.getHead();
435 Assert.assertNotNull(response1);
436 Assert.assertEquals(200, response1.getCode());
437 final String s1 = result1.getBody();
438 Assert.assertNotNull(s1);
439 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
440 while (t1.hasMoreTokens()) {
441 Assert.assertEquals("0123456789abcdef", t1.nextToken());
442 }
443
444 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
445 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
446 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
447
448 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
449 Assert.assertNotNull(result2);
450 final HttpResponse response2 = result2.getHead();
451 Assert.assertNotNull(response2);
452 Assert.assertEquals(200, response2.getCode());
453 final String s2 = result2.getBody();
454 Assert.assertNotNull(s2);
455 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
456 while (t2.hasMoreTokens()) {
457 Assert.assertEquals("0123456789abcdef", t2.nextToken());
458 }
459 }
460
461 @Test
462 public void testLargeGetsPipelined() throws Exception {
463 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
464
465 @Override
466 public AsyncServerExchangeHandler get() {
467 return new MultiLineResponseHandler("0123456789abcdef", 2000);
468 }
469
470 });
471 final InetSocketAddress serverEndpoint = server.start();
472
473 client.start();
474 final Future<ClientSessionEndpoint> connectFuture = client.connect(
475 "localhost", serverEndpoint.getPort(), TIMEOUT);
476 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477
478 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
479 for (int i = 0; i < 5; i++) {
480 queue.add(streamEndpoint.execute(
481 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
482 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
483 }
484 while (!queue.isEmpty()) {
485 final Future<Message<HttpResponse, String>> future = queue.remove();
486 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
487 Assert.assertNotNull(result);
488 final HttpResponse response = result.getHead();
489 Assert.assertNotNull(response);
490 Assert.assertEquals(200, response.getCode());
491 final String entity = result.getBody();
492 Assert.assertNotNull(entity);
493 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
494 while (t.hasMoreTokens()) {
495 Assert.assertEquals("0123456789abcdef", t.nextToken());
496 }
497 }
498 }
499
500 @Test
501 public void testBasicPost() throws Exception {
502 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
503
504 @Override
505 public AsyncServerExchangeHandler get() {
506 return new SingleLineResponseHandler("Hi back");
507 }
508
509 });
510 final InetSocketAddress serverEndpoint = server.start();
511
512 client.start();
513 final Future<ClientSessionEndpoint> connectFuture = client.connect(
514 "localhost", serverEndpoint.getPort(), TIMEOUT);
515 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
516
517 for (int i = 0; i < 5; i++) {
518 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
519 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
520 AsyncEntityProducers.create("Hi there")),
521 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
522 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
523 Assert.assertNotNull(result);
524 final HttpResponse response1 = result.getHead();
525 final String entity1 = result.getBody();
526 Assert.assertNotNull(response1);
527 Assert.assertEquals(200, response1.getCode());
528 Assert.assertEquals("Hi back", entity1);
529 }
530 }
531
532 @Test
533 public void testBasicPostPipelined() throws Exception {
534 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
535
536 @Override
537 public AsyncServerExchangeHandler get() {
538 return new SingleLineResponseHandler("Hi back");
539 }
540
541 });
542 final InetSocketAddress serverEndpoint = server.start();
543
544 client.start();
545 final Future<ClientSessionEndpoint> connectFuture = client.connect(
546 "localhost", serverEndpoint.getPort(), TIMEOUT);
547 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
548
549 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
550 for (int i = 0; i < 5; i++) {
551 queue.add(streamEndpoint.execute(
552 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
553 AsyncEntityProducers.create("Hi there")),
554 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
555 }
556 while (!queue.isEmpty()) {
557 final Future<Message<HttpResponse, String>> future = queue.remove();
558 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
559 Assert.assertNotNull(result);
560 final HttpResponse response = result.getHead();
561 final String entity = result.getBody();
562 Assert.assertNotNull(response);
563 Assert.assertEquals(200, response.getCode());
564 Assert.assertEquals("Hi back", entity);
565 }
566 }
567
568 @Test
569 public void testHttp10Post() throws Exception {
570 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
571
572 @Override
573 public AsyncServerExchangeHandler get() {
574 return new SingleLineResponseHandler("Hi back");
575 }
576
577 });
578 final InetSocketAddress serverEndpoint = server.start();
579
580 client.start();
581 final Future<ClientSessionEndpoint> connectFuture = client.connect(
582 "localhost", serverEndpoint.getPort(), TIMEOUT);
583 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
584
585 for (int i = 0; i < 5; i++) {
586 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
587 request.setVersion(HttpVersion.HTTP_1_0);
588 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
589 new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
590 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
591 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
592 Assert.assertNotNull(result);
593 final HttpResponse response1 = result.getHead();
594 final String entity1 = result.getBody();
595 Assert.assertNotNull(response1);
596 Assert.assertEquals(200, response1.getCode());
597 Assert.assertEquals("Hi back", entity1);
598 }
599 }
600
601 @Test
602 public void testNoEntityPost() throws Exception {
603 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
604
605 @Override
606 public AsyncServerExchangeHandler get() {
607 return new SingleLineResponseHandler("Hi back");
608 }
609
610 });
611 final InetSocketAddress serverEndpoint = server.start();
612
613 client.start();
614 final Future<ClientSessionEndpoint> connectFuture = client.connect(
615 "localhost", serverEndpoint.getPort(), TIMEOUT);
616 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
617
618 for (int i = 0; i < 5; i++) {
619 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
620 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
621 new BasicRequestProducer(request, null),
622 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
623 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
624 Assert.assertNotNull(result);
625 final HttpResponse response1 = result.getHead();
626 final String entity1 = result.getBody();
627 Assert.assertNotNull(response1);
628 Assert.assertEquals(200, response1.getCode());
629 Assert.assertEquals("Hi back", entity1);
630 }
631 }
632
633 @Test
634 public void testLargePost() throws Exception {
635 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
636
637 @Override
638 public AsyncServerExchangeHandler get() {
639 return new EchoHandler(2048);
640 }
641
642 });
643 final InetSocketAddress serverEndpoint = server.start();
644
645 client.start();
646 final Future<ClientSessionEndpoint> connectFuture = client.connect(
647 "localhost", serverEndpoint.getPort(), TIMEOUT);
648 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
649
650 for (int i = 0; i < 5; i++) {
651 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
652 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
653 new MultiLineEntityProducer("0123456789abcdef", 5000)),
654 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
655 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
656 Assert.assertNotNull(result);
657 final HttpResponse response = result.getHead();
658 Assert.assertNotNull(response);
659 Assert.assertEquals(200, response.getCode());
660 final String entity = result.getBody();
661 Assert.assertNotNull(entity);
662 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
663 while (t.hasMoreTokens()) {
664 Assert.assertEquals("0123456789abcdef", t.nextToken());
665 }
666 }
667 }
668
669 @Test
670 public void testPostsPipelinedLargeResponse() throws Exception {
671 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
672
673 @Override
674 public AsyncServerExchangeHandler get() {
675 return new MultiLineResponseHandler("0123456789abcdef", 2000);
676 }
677
678 });
679 final InetSocketAddress serverEndpoint = server.start();
680
681 client.start();
682 final Future<ClientSessionEndpoint> connectFuture = client.connect(
683 "localhost", serverEndpoint.getPort(), TIMEOUT);
684 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
685
686 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
687 for (int i = 0; i < 2; i++) {
688 queue.add(streamEndpoint.execute(
689 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
690 AsyncEntityProducers.create("Hi there")),
691 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
692 }
693 while (!queue.isEmpty()) {
694 final Future<Message<HttpResponse, String>> future = queue.remove();
695 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
696 Assert.assertNotNull(result);
697 final HttpResponse response = result.getHead();
698 Assert.assertNotNull(response);
699 Assert.assertEquals(200, response.getCode());
700 final String entity = result.getBody();
701 Assert.assertNotNull(entity);
702 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
703 while (t.hasMoreTokens()) {
704 Assert.assertEquals("0123456789abcdef", t.nextToken());
705 }
706 }
707 }
708
709
710 @Test
711 public void testLargePostsPipelined() throws Exception {
712 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
713
714 @Override
715 public AsyncServerExchangeHandler get() {
716 return new EchoHandler(2048);
717 }
718
719 });
720 final InetSocketAddress serverEndpoint = server.start();
721
722 client.start();
723 final Future<ClientSessionEndpoint> connectFuture = client.connect(
724 "localhost", serverEndpoint.getPort(), TIMEOUT);
725 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
726
727 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
728 for (int i = 0; i < 5; i++) {
729 queue.add(streamEndpoint.execute(
730 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
731 new MultiLineEntityProducer("0123456789abcdef", 5000)),
732 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
733 }
734 while (!queue.isEmpty()) {
735 final Future<Message<HttpResponse, String>> future = queue.remove();
736 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
737 Assert.assertNotNull(result);
738 final HttpResponse response = result.getHead();
739 Assert.assertNotNull(response);
740 Assert.assertEquals(200, response.getCode());
741 final String entity = result.getBody();
742 Assert.assertNotNull(entity);
743 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
744 while (t.hasMoreTokens()) {
745 Assert.assertEquals("0123456789abcdef", t.nextToken());
746 }
747 }
748 }
749
750 @Test
751 public void testSimpleHead() throws Exception {
752 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
753
754 @Override
755 public AsyncServerExchangeHandler get() {
756 return new SingleLineResponseHandler("Hi there");
757 }
758
759 });
760 final InetSocketAddress serverEndpoint = server.start();
761
762 client.start();
763 final Future<ClientSessionEndpoint> connectFuture = client.connect(
764 "localhost", serverEndpoint.getPort(), TIMEOUT);
765 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
766
767 for (int i = 0; i < 5; i++) {
768 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
769 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
770 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
771 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
772 Assert.assertNotNull(result);
773 final HttpResponse response1 = result.getHead();
774 Assert.assertNotNull(response1);
775 Assert.assertEquals(200, response1.getCode());
776 Assert.assertNull(result.getBody());
777 }
778 }
779
780 @Test
781 public void testSimpleHeadConnectionClose() throws Exception {
782 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
783
784 @Override
785 public AsyncServerExchangeHandler get() {
786 return new SingleLineResponseHandler("Hi there");
787 }
788
789 });
790 final InetSocketAddress serverEndpoint = server.start();
791
792 client.start();
793 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
794 for (int i = 0; i < 5; i++) {
795 final Future<ClientSessionEndpoint> connectFuture = client.connect(
796 "localhost", serverEndpoint.getPort(), TIMEOUT);
797 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
798 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
799 AsyncRequestBuilder.head(requestURI)
800 .addHeader(HttpHeaders.CONNECTION, "close")
801 .build(),
802 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
803 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
804 Assert.assertNotNull(result);
805 final HttpResponse response1 = result.getHead();
806 Assert.assertNotNull(response1);
807 Assert.assertEquals(200, response1.getCode());
808 Assert.assertNull(result.getBody());
809 }
810 }
811 }
812
813 @Test
814 public void testHeadPipelined() throws Exception {
815 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
816
817 @Override
818 public AsyncServerExchangeHandler get() {
819 return new SingleLineResponseHandler("Hi there");
820 }
821
822 });
823 final InetSocketAddress serverEndpoint = server.start();
824
825 client.start();
826 final Future<ClientSessionEndpoint> connectFuture = client.connect(
827 "localhost", serverEndpoint.getPort(), TIMEOUT);
828 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
829
830 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
831 for (int i = 0; i < 5; i++) {
832 queue.add(streamEndpoint.execute(
833 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
834 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
835 }
836 while (!queue.isEmpty()) {
837 final Future<Message<HttpResponse, String>> future = queue.remove();
838 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
839 Assert.assertNotNull(result);
840 final HttpResponse response1 = result.getHead();
841 Assert.assertNotNull(response1);
842 Assert.assertEquals(200, response1.getCode());
843 Assert.assertNull(result.getBody());
844 }
845 }
846
847 @Test
848 public void testExpectationFailed() throws Exception {
849 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
850
851 @Override
852 public AsyncServerExchangeHandler get() {
853 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
854
855 @Override
856 protected void handle(
857 final Message<HttpRequest, String> request,
858 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
859 final HttpContext context) throws IOException, HttpException {
860 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
861
862 }
863 };
864 }
865
866 });
867 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
868
869 @Override
870 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
871
872 return new BasicAsyncServerExpectationDecorator(handler) {
873
874 @Override
875 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
876 final Header h = request.getFirstHeader("password");
877 if (h != null && "secret".equals(h.getValue())) {
878 return null;
879 } else {
880 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
881 }
882 }
883 };
884
885 }
886 }, Http1Config.DEFAULT);
887
888 client.start();
889 final Future<IOSession> sessionFuture = client.requestSession(
890 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
891 final IOSession ioSession = sessionFuture.get();
892 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
893
894 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
895 request1.addHeader("password", "secret");
896 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
897 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
898 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
899 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
900 Assert.assertNotNull(result1);
901 final HttpResponse response1 = result1.getHead();
902 Assert.assertNotNull(response1);
903 Assert.assertEquals(200, response1.getCode());
904 Assert.assertNotNull("All is well", result1.getBody());
905
906 Assert.assertTrue(ioSession.isOpen());
907
908 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
909 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
910 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
911 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
912 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
913 Assert.assertNotNull(result2);
914 final HttpResponse response2 = result2.getHead();
915 Assert.assertNotNull(response2);
916 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
917 Assert.assertNotNull("You shall not pass", result2.getBody());
918
919 Assert.assertTrue(ioSession.isOpen());
920
921 final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
922 request3.addHeader("password", "secret");
923 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
924 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
925 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
926 final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
927 Assert.assertNotNull(result3);
928 final HttpResponse response3 = result3.getHead();
929 Assert.assertNotNull(response3);
930 Assert.assertEquals(200, response3.getCode());
931 Assert.assertNotNull("All is well", result3.getBody());
932
933 Assert.assertTrue(ioSession.isOpen());
934
935 final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
936 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
937 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
938 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
939 final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
940 Assert.assertNotNull(result4);
941 final HttpResponse response4 = result4.getHead();
942 Assert.assertNotNull(response4);
943 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
944 Assert.assertNotNull("You shall not pass", result4.getBody());
945
946 Assert.assertFalse(ioSession.isOpen());
947 }
948
949 @Test
950 public void testExpectationFailedCloseConnection() throws Exception {
951 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
952
953 @Override
954 public AsyncServerExchangeHandler get() {
955 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
956
957 @Override
958 protected void handle(
959 final Message<HttpRequest, String> request,
960 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
961 final HttpContext context) throws IOException, HttpException {
962 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
963
964 }
965 };
966 }
967
968 });
969 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
970
971 @Override
972 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
973
974 return new BasicAsyncServerExpectationDecorator(handler) {
975
976 @Override
977 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
978 final Header h = request.getFirstHeader("password");
979 if (h != null && "secret".equals(h.getValue())) {
980 return null;
981 } else {
982 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
983 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
984 return new BasicResponseProducer(response, "You shall not pass");
985 }
986 }
987 };
988
989 }
990 }, Http1Config.DEFAULT);
991
992 client.start();
993 final Future<IOSession> sessionFuture = client.requestSession(
994 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
995 final IOSession ioSession = sessionFuture.get();
996 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
997
998 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
999 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1000 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1001 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1002 100000,
1003 ContentType.TEXT_PLAIN)),
1004 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1005 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1006 Assert.assertNotNull(result1);
1007 final HttpResponse response1 = result1.getHead();
1008 Assert.assertNotNull(response1);
1009 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1010 Assert.assertNotNull("You shall not pass", result1.getBody());
1011
1012 Assert.assertFalse(streamEndpoint.isOpen());
1013 }
1014
1015 @Test
1016 public void testDelayedExpectationVerification() throws Exception {
1017 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1018
1019 @Override
1020 public AsyncServerExchangeHandler get() {
1021 return new AsyncServerExchangeHandler() {
1022
1023 private final Random random = new Random(System.currentTimeMillis());
1024 private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
1025 "All is well");
1026
1027 @Override
1028 public void handleRequest(
1029 final HttpRequest request,
1030 final EntityDetails entityDetails,
1031 final ResponseChannel responseChannel,
1032 final HttpContext context) throws HttpException, IOException {
1033
1034 Executors.newSingleThreadExecutor().execute(new Runnable() {
1035 @Override
1036 public void run() {
1037 try {
1038 if (entityDetails != null) {
1039 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
1040 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
1041 Thread.sleep(random.nextInt(1000));
1042 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
1043 }
1044 final HttpResponse response = new BasicHttpResponse(200);
1045 synchronized (entityProducer) {
1046 responseChannel.sendResponse(response, entityProducer, context);
1047 }
1048 }
1049 } catch (final Exception ignore) {
1050
1051 }
1052 }
1053 });
1054
1055 }
1056
1057 @Override
1058 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1059 capacityChannel.update(Integer.MAX_VALUE);
1060 }
1061
1062 @Override
1063 public void consume(final ByteBuffer src) throws IOException {
1064 }
1065
1066 @Override
1067 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1068 }
1069
1070 @Override
1071 public int available() {
1072 synchronized (entityProducer) {
1073 return entityProducer.available();
1074 }
1075 }
1076
1077 @Override
1078 public void produce(final DataStreamChannel channel) throws IOException {
1079 synchronized (entityProducer) {
1080 entityProducer.produce(channel);
1081 }
1082 }
1083
1084 @Override
1085 public void failed(final Exception cause) {
1086 }
1087
1088 @Override
1089 public void releaseResources() {
1090 }
1091
1092 };
1093
1094 }
1095 });
1096 final InetSocketAddress serverEndpoint = server.start();
1097
1098 client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
1099 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1100 "localhost", serverEndpoint.getPort(), TIMEOUT);
1101 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1102
1103 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1104 for (int i = 0; i < 5; i++) {
1105 queue.add(streamEndpoint.execute(
1106 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1107 AsyncEntityProducers.create("Some important message")),
1108 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1109 }
1110 while (!queue.isEmpty()) {
1111 final Future<Message<HttpResponse, String>> future = queue.remove();
1112 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1113 Assert.assertNotNull(result);
1114 final HttpResponse response = result.getHead();
1115 Assert.assertNotNull(response);
1116 Assert.assertEquals(200, response.getCode());
1117 Assert.assertNotNull("All is well", result.getBody());
1118 }
1119 }
1120
1121 @Test
1122 public void testPrematureResponse() throws Exception {
1123 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1124
1125 @Override
1126 public AsyncServerExchangeHandler get() {
1127 return new AsyncServerExchangeHandler() {
1128
1129 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
1130
1131 @Override
1132 public void handleRequest(
1133 final HttpRequest request,
1134 final EntityDetails entityDetails,
1135 final ResponseChannel responseChannel,
1136 final HttpContext context) throws HttpException, IOException {
1137 final AsyncResponseProducer producer;
1138 final Header h = request.getFirstHeader("password");
1139 if (h != null && "secret".equals(h.getValue())) {
1140 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1141 } else {
1142 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1143 }
1144 responseProducer.set(producer);
1145 producer.sendResponse(responseChannel, context);
1146 }
1147
1148 @Override
1149 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1150 capacityChannel.update(Integer.MAX_VALUE);
1151 }
1152
1153 @Override
1154 public void consume(final ByteBuffer src) throws IOException {
1155 }
1156
1157 @Override
1158 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1159 }
1160
1161 @Override
1162 public int available() {
1163 final AsyncResponseProducer producer = responseProducer.get();
1164 return producer.available();
1165 }
1166
1167 @Override
1168 public void produce(final DataStreamChannel channel) throws IOException {
1169 final AsyncResponseProducer producer = responseProducer.get();
1170 producer.produce(channel);
1171 }
1172
1173 @Override
1174 public void failed(final Exception cause) {
1175 }
1176
1177 @Override
1178 public void releaseResources() {
1179 }
1180 };
1181 }
1182
1183 });
1184 final InetSocketAddress serverEndpoint = server.start();
1185
1186 client.start();
1187 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1188 "localhost", serverEndpoint.getPort(), TIMEOUT);
1189 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1190
1191 for (int i = 0; i < 3; i++) {
1192 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1193 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1194 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1195 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1196 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1197 Assert.assertNotNull(result1);
1198 final HttpResponse response1 = result1.getHead();
1199 Assert.assertNotNull(response1);
1200 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1201 Assert.assertNotNull("You shall not pass", result1.getBody());
1202
1203 Assert.assertTrue(streamEndpoint.isOpen());
1204 }
1205 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1206 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1207 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1208 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1209 100000,
1210 ContentType.TEXT_PLAIN)),
1211 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1212 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1213 Assert.assertNotNull(result1);
1214 final HttpResponse response1 = result1.getHead();
1215 Assert.assertNotNull(response1);
1216 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1217 Assert.assertNotNull("You shall not pass", result1.getBody());
1218
1219 Assert.assertFalse(streamEndpoint.isOpen());
1220 }
1221
1222 @Test
1223 public void testSlowResponseConsumer() throws Exception {
1224 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
1225
1226 @Override
1227 public AsyncServerExchangeHandler get() {
1228 return new MultiLineResponseHandler("0123456789abcd", 100);
1229 }
1230
1231 });
1232 final InetSocketAddress serverEndpoint = server.start();
1233
1234 client.start(Http1Config.custom().setBufferSize(256).build());
1235
1236 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1237 "localhost", serverEndpoint.getPort(), TIMEOUT);
1238 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1239
1240 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1241 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1242 new BasicRequestProducer(request1, null),
1243 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1244
1245 @Override
1246 protected String consumeData(
1247 final ContentType contentType, final InputStream inputStream) throws IOException {
1248 Charset charset = contentType != null ? contentType.getCharset() : null;
1249 if (charset == null) {
1250 charset = StandardCharsets.US_ASCII;
1251 }
1252
1253 final StringBuilder buffer = new StringBuilder();
1254 try {
1255 final byte[] tmp = new byte[16];
1256 int l;
1257 while ((l = inputStream.read(tmp)) != -1) {
1258 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1259 Thread.sleep(50);
1260 }
1261 } catch (final InterruptedException ex) {
1262 Thread.currentThread().interrupt();
1263 throw new InterruptedIOException(ex.getMessage());
1264 }
1265 return buffer.toString();
1266 }
1267 }),
1268 null);
1269
1270 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1271 Assert.assertNotNull(result1);
1272 final HttpResponse response1 = result1.getHead();
1273 Assert.assertNotNull(response1);
1274 Assert.assertEquals(200, response1.getCode());
1275 final String s1 = result1.getBody();
1276 Assert.assertNotNull(s1);
1277 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1278 while (t1.hasMoreTokens()) {
1279 Assert.assertEquals("0123456789abcd", t1.nextToken());
1280 }
1281 }
1282
1283 @Test
1284 public void testSlowRequestProducer() throws Exception {
1285 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1286
1287 @Override
1288 public AsyncServerExchangeHandler get() {
1289 return new EchoHandler(2048);
1290 }
1291
1292 });
1293 final InetSocketAddress serverEndpoint = server.start();
1294
1295 client.start();
1296 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1297 "localhost", serverEndpoint.getPort(), TIMEOUT);
1298 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1299
1300 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1301 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1302 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1303
1304 @Override
1305 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1306 Charset charset = contentType.getCharset();
1307 if (charset == null) {
1308 charset = StandardCharsets.US_ASCII;
1309 }
1310 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1311 for (int i = 0; i < 500; i++) {
1312 if (i % 100 == 0) {
1313 writer.flush();
1314 Thread.sleep(500);
1315 }
1316 writer.write("0123456789abcdef\r\n");
1317 }
1318 } catch (final InterruptedException ex) {
1319 Thread.currentThread().interrupt();
1320 throw new InterruptedIOException(ex.getMessage());
1321 }
1322 }
1323
1324 }),
1325 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1326 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1327 Assert.assertNotNull(result1);
1328 final HttpResponse response1 = result1.getHead();
1329 Assert.assertNotNull(response1);
1330 Assert.assertEquals(200, response1.getCode());
1331 final String s1 = result1.getBody();
1332 Assert.assertNotNull(s1);
1333 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1334 while (t1.hasMoreTokens()) {
1335 Assert.assertEquals("0123456789abcdef", t1.nextToken());
1336 }
1337 }
1338
1339 @Test
1340 public void testSlowResponseProducer() throws Exception {
1341 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1342
1343 @Override
1344 public AsyncServerExchangeHandler get() {
1345 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1346
1347 @Override
1348 protected void handle(
1349 final HttpRequest request,
1350 final InputStream requestStream,
1351 final HttpResponse response,
1352 final OutputStream responseStream,
1353 final HttpContext context) throws IOException, HttpException {
1354
1355 if (!"/hello".equals(request.getPath())) {
1356 response.setCode(HttpStatus.SC_NOT_FOUND);
1357 return;
1358 }
1359 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1360 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1361 return;
1362 }
1363 if (requestStream == null) {
1364 return;
1365 }
1366 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1367 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1368 Charset charset = contentType != null ? contentType.getCharset() : null;
1369 if (charset == null) {
1370 charset = StandardCharsets.US_ASCII;
1371 }
1372 response.setCode(HttpStatus.SC_OK);
1373 response.setHeader(h1);
1374 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1375 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1376 try {
1377 String l;
1378 int count = 0;
1379 while ((l = reader.readLine()) != null) {
1380 writer.write(l);
1381 writer.write("\r\n");
1382 count++;
1383 if (count % 500 == 0) {
1384 Thread.sleep(500);
1385 }
1386 }
1387 writer.flush();
1388 } catch (final InterruptedException ex) {
1389 Thread.currentThread().interrupt();
1390 throw new InterruptedIOException(ex.getMessage());
1391 }
1392 }
1393 }
1394 };
1395 }
1396
1397 });
1398 final InetSocketAddress serverEndpoint = server.start();
1399
1400 client.start(Http1Config.custom().setBufferSize(256).build());
1401
1402 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1403 "localhost", serverEndpoint.getPort(), TIMEOUT);
1404 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1405
1406 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1407 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1408 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1409 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1410 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1411 Assert.assertNotNull(result1);
1412 final HttpResponse response1 = result1.getHead();
1413 Assert.assertNotNull(response1);
1414 Assert.assertEquals(200, response1.getCode());
1415 final String s1 = result1.getBody();
1416 Assert.assertNotNull(s1);
1417 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1418 while (t1.hasMoreTokens()) {
1419 Assert.assertEquals("0123456789abcd", t1.nextToken());
1420 }
1421 }
1422
1423 @Test
1424 public void testPipelinedConnectionClose() throws Exception {
1425 server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1426
1427 @Override
1428 public AsyncServerExchangeHandler get() {
1429 return new SingleLineResponseHandler("Hi back");
1430 }
1431
1432 });
1433 final InetSocketAddress serverEndpoint = server.start();
1434
1435 client.start();
1436 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1437 "localhost", serverEndpoint.getPort(), TIMEOUT);
1438 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1439
1440 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1441 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1442 AsyncEntityProducers.create("Hi there")),
1443 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1444 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1445 request2.addHeader(HttpHeaders.CONNECTION, "close");
1446 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1447 new BasicRequestProducer(request2,
1448 AsyncEntityProducers.create("Hi there")),
1449 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1450 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1451 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1452 AsyncEntityProducers.create("Hi there")),
1453 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1454
1455 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1456 Assert.assertNotNull(result1);
1457 final HttpResponse response1 = result1.getHead();
1458 final String entity1 = result1.getBody();
1459 Assert.assertNotNull(response1);
1460 Assert.assertEquals(200, response1.getCode());
1461 Assert.assertEquals("Hi back", entity1);
1462
1463 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1464 Assert.assertNotNull(result2);
1465 final HttpResponse response2 = result2.getHead();
1466 final String entity2 = result2.getBody();
1467 Assert.assertNotNull(response2);
1468 Assert.assertEquals(200, response2.getCode());
1469 Assert.assertEquals("Hi back", entity2);
1470
1471 try {
1472 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1473 Assert.fail("ExecutionException expected");
1474 } catch (final CancellationException | ExecutionException ignore) {
1475 }
1476
1477 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1478 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1479 AsyncEntityProducers.create("Hi there")),
1480 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1481 try {
1482 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1483 Assert.fail("CancellationException or ExecutionException expected");
1484 } catch (final CancellationException ignore) {
1485 Assert.assertTrue(future4.isCancelled());
1486 } catch (final ExecutionException ignore) {
1487 }
1488 }
1489
1490 @Test
1491 public void testPipelinedInvalidRequest() throws Exception {
1492 server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1493
1494 @Override
1495 public AsyncServerExchangeHandler get() {
1496 return new SingleLineResponseHandler("Hi back");
1497 }
1498
1499 });
1500 final InetSocketAddress serverEndpoint = server.start();
1501
1502 client.start();
1503 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1504 "localhost", serverEndpoint.getPort(), TIMEOUT);
1505 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1506
1507 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1508 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1509 AsyncEntityProducers.create("Hi there")),
1510 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1511 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1512 request2.addHeader(HttpHeaders.HOST, "blah:blah");
1513 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1514 new BasicRequestProducer(request2,
1515 AsyncEntityProducers.create("Hi there")),
1516 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1517 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1518 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1519 AsyncEntityProducers.create("Hi there")),
1520 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1521
1522 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1523 Assert.assertNotNull(result1);
1524 final HttpResponse response1 = result1.getHead();
1525 final String entity1 = result1.getBody();
1526 Assert.assertNotNull(response1);
1527 Assert.assertEquals(200, response1.getCode());
1528 Assert.assertEquals("Hi back", entity1);
1529
1530 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1531 Assert.assertNotNull(result2);
1532 final HttpResponse response2 = result2.getHead();
1533 final String entity2 = result2.getBody();
1534 Assert.assertNotNull(response2);
1535 Assert.assertEquals(400, response2.getCode());
1536 Assert.assertTrue(entity2.length() > 0);
1537
1538 try {
1539 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1540 Assert.fail("ExecutionException expected");
1541 } catch (final CancellationException | ExecutionException ignore) {
1542 }
1543 }
1544
1545 private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1546
1547 private static class BrokenChunkEncoder extends AbstractContentEncoder {
1548
1549 private final CharArrayBuffer lineBuffer;
1550 private boolean done;
1551
1552 BrokenChunkEncoder(
1553 final WritableByteChannel channel,
1554 final SessionOutputBuffer buffer,
1555 final BasicHttpTransportMetrics metrics) {
1556 super(channel, buffer, metrics);
1557 lineBuffer = new CharArrayBuffer(16);
1558 }
1559
1560 @Override
1561 public void complete(final List<? extends Header> trailers) throws IOException {
1562 super.complete(trailers);
1563 }
1564
1565 @Override
1566 public int write(final ByteBuffer src) throws IOException {
1567 final int chunk;
1568 if (!done) {
1569 lineBuffer.clear();
1570 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1571 buffer().writeLine(lineBuffer);
1572 buffer().write(ByteBuffer.wrap(GARBAGE));
1573 done = true;
1574 chunk = GARBAGE.length;
1575 } else {
1576 chunk = 0;
1577 }
1578 final long bytesWritten = buffer().flush(channel());
1579 if (bytesWritten > 0) {
1580 metrics().incrementBytesTransferred(bytesWritten);
1581 }
1582 if (!buffer().hasData()) {
1583 channel().close();
1584 }
1585 return chunk;
1586 }
1587
1588 }
1589
1590 @Test
1591 public void testTruncatedChunk() throws Exception {
1592 final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1593 HttpProcessors.server(),
1594 new HandlerFactory<AsyncServerExchangeHandler>() {
1595
1596 @Override
1597 public AsyncServerExchangeHandler create(
1598 final HttpRequest request,
1599 final HttpContext context) throws HttpException {
1600 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1601
1602 @Override
1603 protected void handle(
1604 final Message<HttpRequest, String> request,
1605 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1606 final HttpContext context) throws IOException, HttpException {
1607 responseTrigger.submitResponse(
1608 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1609 }
1610
1611 };
1612 }
1613
1614 },
1615 Http1Config.DEFAULT,
1616 CharCodingConfig.DEFAULT,
1617 DefaultConnectionReuseStrategy.INSTANCE,
1618 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1619
1620 @Override
1621 protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1622 final ProtocolIOSession ioSession,
1623 final HttpProcessor httpProcessor,
1624 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1625 final Http1Config http1Config,
1626 final CharCodingConfig connectionConfig,
1627 final ConnectionReuseStrategy connectionReuseStrategy,
1628 final NHttpMessageParser<HttpRequest> incomingMessageParser,
1629 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1630 final ContentLengthStrategy incomingContentStrategy,
1631 final ContentLengthStrategy outgoingContentStrategy,
1632 final Http1StreamListener streamListener) {
1633 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1634 scheme.id,
1635 http1Config, connectionConfig, connectionReuseStrategy,
1636 incomingMessageParser, outgoingMessageWriter,
1637 incomingContentStrategy, outgoingContentStrategy,
1638 streamListener) {
1639
1640 @Override
1641 protected ContentEncoder createContentEncoder(
1642 final long len,
1643 final WritableByteChannel channel,
1644 final SessionOutputBuffer buffer,
1645 final BasicHttpTransportMetrics metrics) throws HttpException {
1646 if (len == ContentLengthStrategy.CHUNKED) {
1647 return new BrokenChunkEncoder(channel, buffer, metrics);
1648 } else {
1649 return super.createContentEncoder(len, channel, buffer, metrics);
1650 }
1651 }
1652
1653 };
1654 }
1655
1656 });
1657
1658 client.start();
1659 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1660 "localhost", serverEndpoint.getPort(), TIMEOUT);
1661 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1662
1663 final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1664 final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1665
1666 @Override
1667 public void releaseResources() {
1668
1669 }
1670
1671 };
1672 final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1673 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1674 try {
1675 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1676 Assert.fail("ExecutionException should have been thrown");
1677 } catch (final ExecutionException ex) {
1678 final Throwable cause = ex.getCause();
1679 Assert.assertTrue(cause instanceof MalformedChunkCodingException);
1680 Assert.assertEquals("garbage", entityConsumer.generateContent());
1681 }
1682 }
1683
1684 @Test
1685 public void testExceptionInHandler() throws Exception {
1686 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1687
1688 @Override
1689 public AsyncServerExchangeHandler get() {
1690 return new SingleLineResponseHandler("Hi there") {
1691
1692 @Override
1693 protected void handle(
1694 final Message<HttpRequest, String> request,
1695 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1696 final HttpContext context) throws IOException, HttpException {
1697 throw new HttpException("Boom");
1698 }
1699 };
1700 }
1701
1702 });
1703 final InetSocketAddress serverEndpoint = server.start();
1704
1705 client.start();
1706 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1707 "localhost", serverEndpoint.getPort(), TIMEOUT);
1708 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1709
1710 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1711 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1712 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1713 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1714 Assert.assertNotNull(result);
1715 final HttpResponse response1 = result.getHead();
1716 final String entity1 = result.getBody();
1717 Assert.assertNotNull(response1);
1718 Assert.assertEquals(500, response1.getCode());
1719 Assert.assertEquals("Boom", entity1);
1720 }
1721
1722 @Test
1723 public void testNoServiceHandler() throws Exception {
1724 final InetSocketAddress serverEndpoint = server.start();
1725
1726 client.start();
1727 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1728 "localhost", serverEndpoint.getPort(), TIMEOUT);
1729 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1730
1731 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1732 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1733 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1734 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1735 Assert.assertNotNull(result);
1736 final HttpResponse response1 = result.getHead();
1737 final String entity1 = result.getBody();
1738 Assert.assertNotNull(response1);
1739 Assert.assertEquals(404, response1.getCode());
1740 Assert.assertEquals("Resource not found", entity1);
1741 }
1742
1743 @Test
1744 public void testResponseNoContent() throws Exception {
1745 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1746
1747 @Override
1748 public AsyncServerExchangeHandler get() {
1749 return new SingleLineResponseHandler("Hi there") {
1750
1751 @Override
1752 protected void handle(
1753 final Message<HttpRequest, String> request,
1754 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1755 final HttpContext context) throws IOException, HttpException {
1756 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1757 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1758 }
1759 };
1760 }
1761
1762 });
1763 final InetSocketAddress serverEndpoint = server.start();
1764
1765 client.start();
1766 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1767 "localhost", serverEndpoint.getPort(), TIMEOUT);
1768 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1769
1770 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1771 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1772 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1773 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1774 Assert.assertNotNull(result);
1775 final HttpResponse response1 = result.getHead();
1776 Assert.assertNotNull(response1);
1777 Assert.assertEquals(204, response1.getCode());
1778 Assert.assertNull(result.getBody());
1779 }
1780
1781 @Test
1782 public void testAbsentHostHeader() throws Exception {
1783 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1784
1785 @Override
1786 public AsyncServerExchangeHandler get() {
1787 return new SingleLineResponseHandler("Hi there");
1788 }
1789
1790 });
1791 final InetSocketAddress serverEndpoint = server.start();
1792
1793 client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()), Http1Config.DEFAULT);
1794
1795 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1796 "localhost", serverEndpoint.getPort(), TIMEOUT);
1797 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1798
1799 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1800 request1.setVersion(HttpVersion.HTTP_1_0);
1801 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1802 new BasicRequestProducer(request1, null),
1803 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1804 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1805 Assert.assertNotNull(result1);
1806 final HttpResponse response1 = result1.getHead();
1807 Assert.assertNotNull(response1);
1808 Assert.assertEquals(200, response1.getCode());
1809 Assert.assertEquals("Hi there", result1.getBody());
1810
1811 final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1812 request2.setVersion(HttpVersion.HTTP_1_1);
1813 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1814 new BasicRequestProducer(request2, null),
1815 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1816 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1817 Assert.assertNotNull(result2);
1818 final HttpResponse response2 = result2.getHead();
1819 Assert.assertNotNull(response2);
1820 Assert.assertEquals(400, response2.getCode());
1821 Assert.assertEquals("Host header is absent", result2.getBody());
1822 }
1823
1824 @Test
1825 public void testMessageWithTrailers() throws Exception {
1826 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1827
1828 @Override
1829 public AsyncServerExchangeHandler get() {
1830 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1831
1832 @Override
1833 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1834 final HttpRequest request,
1835 final EntityDetails entityDetails,
1836 final HttpContext context) throws HttpException {
1837 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1838 }
1839
1840 @Override
1841 protected void handle(
1842 final Message<HttpRequest, String> requestMessage,
1843 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1844 final HttpContext context) throws HttpException, IOException {
1845 responseTrigger.submitResponse(new BasicResponseProducer(
1846 HttpStatus.SC_OK,
1847 new DigestingEntityProducer("MD5",
1848 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1849 }
1850 };
1851 }
1852
1853 });
1854 final InetSocketAddress serverEndpoint = server.start();
1855
1856 client.start();
1857
1858 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1859 "localhost", serverEndpoint.getPort(), TIMEOUT);
1860 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1861
1862 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1863 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1864 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1865 new BasicRequestProducer(request1, null),
1866 new BasicResponseConsumer<>(entityConsumer), null);
1867 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1868 Assert.assertNotNull(result1);
1869 final HttpResponse response1 = result1.getHead();
1870 Assert.assertNotNull(response1);
1871 Assert.assertEquals(200, response1.getCode());
1872 Assert.assertEquals("Hello back with some trailers", result1.getBody());
1873
1874 final List<Header> trailers = entityConsumer.getTrailers();
1875 Assert.assertNotNull(trailers);
1876 Assert.assertEquals(2, trailers.size());
1877 final Map<String, String> map = new HashMap<>();
1878 for (final Header header: trailers) {
1879 map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
1880 }
1881 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1882 Assert.assertEquals("MD5", map.get("digest-algo"));
1883 Assert.assertEquals(digest, map.get("digest"));
1884 }
1885
1886 @Test
1887 public void testProtocolException() throws Exception {
1888 server.register("/boom", new Supplier<AsyncServerExchangeHandler>() {
1889
1890 @Override
1891 public AsyncServerExchangeHandler get() {
1892 return new AsyncServerExchangeHandler() {
1893
1894 private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1895
1896 @Override
1897 public void releaseResources() {
1898 entityProducer.releaseResources();
1899 }
1900
1901 @Override
1902 public void handleRequest(
1903 final HttpRequest request,
1904 final EntityDetails entityDetails,
1905 final ResponseChannel responseChannel,
1906 final HttpContext context) throws HttpException, IOException {
1907 final String requestUri = request.getRequestUri();
1908 if (requestUri.endsWith("boom")) {
1909 throw new ProtocolException("Boom!!!");
1910 }
1911 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1912 }
1913
1914 @Override
1915 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1916 capacityChannel.update(Integer.MAX_VALUE);
1917 }
1918
1919 @Override
1920 public void consume(final ByteBuffer src) throws IOException {
1921 }
1922
1923 @Override
1924 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1925
1926 }
1927
1928 @Override
1929 public int available() {
1930 return entityProducer.available();
1931 }
1932
1933 @Override
1934 public void produce(final DataStreamChannel channel) throws IOException {
1935 entityProducer.produce(channel);
1936 }
1937
1938 @Override
1939 public void failed(final Exception cause) {
1940 releaseResources();
1941 }
1942
1943 };
1944 }
1945
1946 });
1947
1948 final InetSocketAddress serverEndpoint = server.start();
1949
1950 client.start();
1951 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1952 "localhost", serverEndpoint.getPort(), TIMEOUT);
1953 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1954 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1955 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1956 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1957 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1958 Assert.assertNotNull(result);
1959 final HttpResponse response1 = result.getHead();
1960 final String entity1 = result.getBody();
1961 Assert.assertNotNull(response1);
1962 Assert.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1963 Assert.assertEquals("Boom!!!", entity1);
1964 }
1965
1966 @Test
1967 public void testHeaderTooLarge() throws Exception {
1968 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1969
1970 @Override
1971 public AsyncServerExchangeHandler get() {
1972 return new SingleLineResponseHandler("Hi there");
1973 }
1974
1975 });
1976 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1977 .setMaxLineLength(100)
1978 .build());
1979 client.start();
1980
1981 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1982 "localhost", serverEndpoint.getPort(), TIMEOUT);
1983 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1984
1985 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1986 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1987 "1234567890123456789012345678901234567890");
1988 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1989 new BasicRequestProducer(request1, null),
1990 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1991 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1992 Assert.assertNotNull(result1);
1993 final HttpResponse response1 = result1.getHead();
1994 Assert.assertNotNull(response1);
1995 Assert.assertEquals(431, response1.getCode());
1996 Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1997 }
1998
1999 @Test
2000 public void testHeaderTooLargePost() throws Exception {
2001 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
2002
2003 @Override
2004 public AsyncServerExchangeHandler get() {
2005 return new SingleLineResponseHandler("Hi there");
2006 }
2007
2008 });
2009 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
2010 .setMaxLineLength(100)
2011 .build());
2012 client.start(
2013 new DefaultHttpProcessor(new RequestContent(), new RequestTargetHost(), new RequestConnControl()), null);
2014
2015 final Future<ClientSessionEndpoint> connectFuture = client.connect(
2016 "localhost", serverEndpoint.getPort(), TIMEOUT);
2017 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
2018
2019 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
2020 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
2021 "1234567890123456789012345678901234567890");
2022
2023 final byte[] b = new byte[2048];
2024 for (int i = 0; i < b.length; i++) {
2025 b[i] = (byte) ('a' + i % 10);
2026 }
2027
2028 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
2029 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
2030 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
2031 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
2032 Assert.assertNotNull(result1);
2033 final HttpResponse response1 = result1.getHead();
2034 Assert.assertNotNull(response1);
2035 Assert.assertEquals(431, response1.getCode());
2036 Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
2037 }
2038
2039 }