1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.BufferedReader;
31 import java.io.BufferedWriter;
32 import java.io.IOException;
33 import java.io.InputStream;
34 import java.io.InputStreamReader;
35 import java.io.InterruptedIOException;
36 import java.io.OutputStream;
37 import java.io.OutputStreamWriter;
38 import java.net.InetSocketAddress;
39 import java.net.URI;
40 import java.net.URISyntaxException;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.WritableByteChannel;
43 import java.nio.charset.Charset;
44 import java.nio.charset.StandardCharsets;
45 import java.util.Arrays;
46 import java.util.Collection;
47 import java.util.HashMap;
48 import java.util.LinkedList;
49 import java.util.List;
50 import java.util.Locale;
51 import java.util.Map;
52 import java.util.Queue;
53 import java.util.Random;
54 import java.util.StringTokenizer;
55 import java.util.concurrent.CancellationException;
56 import java.util.concurrent.ExecutionException;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.atomic.AtomicReference;
60
61 import org.apache.hc.core5.function.Decorator;
62 import org.apache.hc.core5.function.Supplier;
63 import org.apache.hc.core5.http.ConnectionReuseStrategy;
64 import org.apache.hc.core5.http.ContentLengthStrategy;
65 import org.apache.hc.core5.http.ContentType;
66 import org.apache.hc.core5.http.EntityDetails;
67 import org.apache.hc.core5.http.Header;
68 import org.apache.hc.core5.http.HeaderElements;
69 import org.apache.hc.core5.http.HttpException;
70 import org.apache.hc.core5.http.HttpHeaders;
71 import org.apache.hc.core5.http.HttpHost;
72 import org.apache.hc.core5.http.HttpRequest;
73 import org.apache.hc.core5.http.HttpResponse;
74 import org.apache.hc.core5.http.HttpStatus;
75 import org.apache.hc.core5.http.HttpVersion;
76 import org.apache.hc.core5.http.MalformedChunkCodingException;
77 import org.apache.hc.core5.http.Message;
78 import org.apache.hc.core5.http.Method;
79 import org.apache.hc.core5.http.ProtocolException;
80 import org.apache.hc.core5.http.URIScheme;
81 import org.apache.hc.core5.http.config.CharCodingConfig;
82 import org.apache.hc.core5.http.config.Http1Config;
83 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
84 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
85 import org.apache.hc.core5.http.impl.Http1StreamListener;
86 import org.apache.hc.core5.http.impl.HttpProcessors;
87 import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
88 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
89 import org.apache.hc.core5.http.message.BasicHttpRequest;
90 import org.apache.hc.core5.http.message.BasicHttpResponse;
91 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
92 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
93 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
94 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
95 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
96 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
97 import org.apache.hc.core5.http.nio.CapacityChannel;
98 import org.apache.hc.core5.http.nio.ContentEncoder;
99 import org.apache.hc.core5.http.nio.DataStreamChannel;
100 import org.apache.hc.core5.http.nio.HandlerFactory;
101 import org.apache.hc.core5.http.nio.NHttpMessageParser;
102 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
103 import org.apache.hc.core5.http.nio.ResponseChannel;
104 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
105 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
107 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
109 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
110 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
111 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
112 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
113 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
114 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
116 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
120 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
121 import org.apache.hc.core5.http.protocol.HttpContext;
122 import org.apache.hc.core5.http.protocol.HttpProcessor;
123 import org.apache.hc.core5.http.protocol.RequestConnControl;
124 import org.apache.hc.core5.http.protocol.RequestContent;
125 import org.apache.hc.core5.http.protocol.RequestTargetHost;
126 import org.apache.hc.core5.http.protocol.RequestValidateHost;
127 import org.apache.hc.core5.reactor.IOReactorConfig;
128 import org.apache.hc.core5.reactor.IOSession;
129 import org.apache.hc.core5.reactor.ProtocolIOSession;
130 import org.apache.hc.core5.testing.SSLTestContexts;
131 import org.apache.hc.core5.util.CharArrayBuffer;
132 import org.apache.hc.core5.util.TextUtils;
133 import org.apache.hc.core5.util.TimeValue;
134 import org.apache.hc.core5.util.Timeout;
135 import org.junit.After;
136 import org.junit.Assert;
137 import org.junit.Before;
138 import org.junit.Test;
139 import org.junit.runner.RunWith;
140 import org.junit.runners.Parameterized;
141 import org.slf4j.Logger;
142 import org.slf4j.LoggerFactory;
143
144 @RunWith(Parameterized.class)
145 public class Http1IntegrationTest extends InternalHttp1ServerTestBase {
146
147 private final Logger log = LoggerFactory.getLogger(getClass());
148
149 @Parameterized.Parameters(name = "{0}")
150 public static Collection<Object[]> protocols() {
151 return Arrays.asList(new Object[][]{
152 { URIScheme.HTTP },
153 { URIScheme.HTTPS }
154 });
155 }
156
157 public Http1IntegrationTest(final URIScheme scheme) {
158 super(scheme);
159 }
160
161 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
162 private static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
163
164 private Http1TestClient client;
165
166 @Before
167 public void setup() throws Exception {
168 log.debug("Starting up test client");
169 client = new Http1TestClient(
170 buildReactorConfig(),
171 scheme == URIScheme.HTTPS ? SSLTestContexts.createClientSSLContext() : null, null, null);
172 }
173
174 protected IOReactorConfig buildReactorConfig() {
175 return IOReactorConfig.DEFAULT;
176 }
177
178 @After
179 public void cleanup() throws Exception {
180 log.debug("Shutting down test client");
181 if (client != null) {
182 client.shutdown(TimeValue.ofSeconds(5));
183 }
184 }
185
186 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
187 try {
188 return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
189 } catch (final URISyntaxException e) {
190 throw new IllegalStateException();
191 }
192 }
193
194 @Test
195 public void testSimpleGet() throws Exception {
196 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
197
198 @Override
199 public AsyncServerExchangeHandler get() {
200 return new SingleLineResponseHandler("Hi there");
201 }
202
203 });
204 final InetSocketAddress serverEndpoint = server.start();
205
206 client.start();
207 final Future<ClientSessionEndpoint> connectFuture = client.connect(
208 "localhost", serverEndpoint.getPort(), TIMEOUT);
209 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
210
211 for (int i = 0; i < 5; i++) {
212 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
213 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
214 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
215 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
216 Assert.assertNotNull(result);
217 final HttpResponse response1 = result.getHead();
218 final String entity1 = result.getBody();
219 Assert.assertNotNull(response1);
220 Assert.assertEquals(200, response1.getCode());
221 Assert.assertEquals("Hi there", entity1);
222 }
223 }
224
225 @Test
226 public void testSimpleGetConnectionClose() throws Exception {
227 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
228
229 @Override
230 public AsyncServerExchangeHandler get() {
231 return new SingleLineResponseHandler("Hi there");
232 }
233
234 });
235 final InetSocketAddress serverEndpoint = server.start();
236
237 client.start();
238 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
239 for (int i = 0; i < 5; i++) {
240 final Future<ClientSessionEndpoint> connectFuture = client.connect(
241 "localhost", serverEndpoint.getPort(), TIMEOUT);
242 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
243 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
244 AsyncRequestBuilder.get(requestURI)
245 .addHeader(HttpHeaders.CONNECTION, "close")
246 .build(),
247 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
248 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
249 Assert.assertNotNull(result);
250 final HttpResponse response1 = result.getHead();
251 final String entity1 = result.getBody();
252 Assert.assertNotNull(response1);
253 Assert.assertEquals(200, response1.getCode());
254 Assert.assertEquals("Hi there", entity1);
255 }
256 }
257 }
258
259 @Test
260 public void testSimpleGetIdentityTransfer() throws Exception {
261 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
262
263 @Override
264 public AsyncServerExchangeHandler get() {
265 return new SingleLineResponseHandler("Hi there");
266 }
267
268 });
269 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
270 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
271
272 client.start();
273 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
274 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
275
276 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
277 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
278 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
279 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
280 Assert.assertNotNull(result);
281 final HttpResponse response = result.getHead();
282 final String entity = result.getBody();
283 Assert.assertNotNull(response);
284 Assert.assertEquals(200, response.getCode());
285 Assert.assertEquals("Hi there", entity);
286 }
287
288 @Test
289 public void testSimpleGetsPipelined() throws Exception {
290 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
291
292 @Override
293 public AsyncServerExchangeHandler get() {
294 return new SingleLineResponseHandler("Hi there");
295 }
296
297 });
298 final InetSocketAddress serverEndpoint = server.start();
299
300 client.start();
301 final Future<ClientSessionEndpoint> connectFuture = client.connect(
302 "localhost", serverEndpoint.getPort(), TIMEOUT);
303 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
304
305 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
306 for (int i = 0; i < 5; i++) {
307 queue.add(streamEndpoint.execute(
308 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
309 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
310 }
311 while (!queue.isEmpty()) {
312 final Future<Message<HttpResponse, String>> future = queue.remove();
313 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
314 Assert.assertNotNull(result);
315 final HttpResponse response = result.getHead();
316 final String entity = result.getBody();
317 Assert.assertNotNull(response);
318 Assert.assertEquals(200, response.getCode());
319 Assert.assertEquals("Hi there", entity);
320 }
321 }
322
323 @Test
324 public void testLargeGet() throws Exception {
325 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
326
327 @Override
328 public AsyncServerExchangeHandler get() {
329 return new MultiLineResponseHandler("0123456789abcdef", 5000);
330 }
331
332 });
333 final InetSocketAddress serverEndpoint = server.start();
334
335 client.start();
336 final Future<ClientSessionEndpoint> connectFuture = client.connect(
337 "localhost", serverEndpoint.getPort(), TIMEOUT);
338 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
339
340 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
341 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
342 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
343
344 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
345 Assert.assertNotNull(result1);
346 final HttpResponse response1 = result1.getHead();
347 Assert.assertNotNull(response1);
348 Assert.assertEquals(200, response1.getCode());
349 final String s1 = result1.getBody();
350 Assert.assertNotNull(s1);
351 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
352 while (t1.hasMoreTokens()) {
353 Assert.assertEquals("0123456789abcdef", t1.nextToken());
354 }
355
356 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
357 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
358 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
359
360 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
361 Assert.assertNotNull(result2);
362 final HttpResponse response2 = result2.getHead();
363 Assert.assertNotNull(response2);
364 Assert.assertEquals(200, response2.getCode());
365 final String s2 = result2.getBody();
366 Assert.assertNotNull(s2);
367 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
368 while (t2.hasMoreTokens()) {
369 Assert.assertEquals("0123456789abcdef", t2.nextToken());
370 }
371 }
372
373 @Test
374 public void testLargeGetsPipelined() throws Exception {
375 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
376
377 @Override
378 public AsyncServerExchangeHandler get() {
379 return new MultiLineResponseHandler("0123456789abcdef", 2000);
380 }
381
382 });
383 final InetSocketAddress serverEndpoint = server.start();
384
385 client.start();
386 final Future<ClientSessionEndpoint> connectFuture = client.connect(
387 "localhost", serverEndpoint.getPort(), TIMEOUT);
388 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
389
390 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
391 for (int i = 0; i < 5; i++) {
392 queue.add(streamEndpoint.execute(
393 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
394 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
395 }
396 while (!queue.isEmpty()) {
397 final Future<Message<HttpResponse, String>> future = queue.remove();
398 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
399 Assert.assertNotNull(result);
400 final HttpResponse response = result.getHead();
401 Assert.assertNotNull(response);
402 Assert.assertEquals(200, response.getCode());
403 final String entity = result.getBody();
404 Assert.assertNotNull(entity);
405 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
406 while (t.hasMoreTokens()) {
407 Assert.assertEquals("0123456789abcdef", t.nextToken());
408 }
409 }
410 }
411
412 @Test
413 public void testBasicPost() throws Exception {
414 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
415
416 @Override
417 public AsyncServerExchangeHandler get() {
418 return new SingleLineResponseHandler("Hi back");
419 }
420
421 });
422 final InetSocketAddress serverEndpoint = server.start();
423
424 client.start();
425 final Future<ClientSessionEndpoint> connectFuture = client.connect(
426 "localhost", serverEndpoint.getPort(), TIMEOUT);
427 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
428
429 for (int i = 0; i < 5; i++) {
430 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
431 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
432 AsyncEntityProducers.create("Hi there")),
433 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
434 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
435 Assert.assertNotNull(result);
436 final HttpResponse response1 = result.getHead();
437 final String entity1 = result.getBody();
438 Assert.assertNotNull(response1);
439 Assert.assertEquals(200, response1.getCode());
440 Assert.assertEquals("Hi back", entity1);
441 }
442 }
443
444 @Test
445 public void testBasicPostPipelined() throws Exception {
446 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
447
448 @Override
449 public AsyncServerExchangeHandler get() {
450 return new SingleLineResponseHandler("Hi back");
451 }
452
453 });
454 final InetSocketAddress serverEndpoint = server.start();
455
456 client.start();
457 final Future<ClientSessionEndpoint> connectFuture = client.connect(
458 "localhost", serverEndpoint.getPort(), TIMEOUT);
459 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
460
461 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
462 for (int i = 0; i < 5; i++) {
463 queue.add(streamEndpoint.execute(
464 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
465 AsyncEntityProducers.create("Hi there")),
466 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
467 }
468 while (!queue.isEmpty()) {
469 final Future<Message<HttpResponse, String>> future = queue.remove();
470 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
471 Assert.assertNotNull(result);
472 final HttpResponse response = result.getHead();
473 final String entity = result.getBody();
474 Assert.assertNotNull(response);
475 Assert.assertEquals(200, response.getCode());
476 Assert.assertEquals("Hi back", entity);
477 }
478 }
479
480 @Test
481 public void testHttp10Post() throws Exception {
482 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
483
484 @Override
485 public AsyncServerExchangeHandler get() {
486 return new SingleLineResponseHandler("Hi back");
487 }
488
489 });
490 final InetSocketAddress serverEndpoint = server.start();
491
492 client.start();
493 final Future<ClientSessionEndpoint> connectFuture = client.connect(
494 "localhost", serverEndpoint.getPort(), TIMEOUT);
495 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
496
497 for (int i = 0; i < 5; i++) {
498 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
499 request.setVersion(HttpVersion.HTTP_1_0);
500 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
501 new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
502 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
503 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
504 Assert.assertNotNull(result);
505 final HttpResponse response1 = result.getHead();
506 final String entity1 = result.getBody();
507 Assert.assertNotNull(response1);
508 Assert.assertEquals(200, response1.getCode());
509 Assert.assertEquals("Hi back", entity1);
510 }
511 }
512
513 @Test
514 public void testNoEntityPost() throws Exception {
515 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
516
517 @Override
518 public AsyncServerExchangeHandler get() {
519 return new SingleLineResponseHandler("Hi back");
520 }
521
522 });
523 final InetSocketAddress serverEndpoint = server.start();
524
525 client.start();
526 final Future<ClientSessionEndpoint> connectFuture = client.connect(
527 "localhost", serverEndpoint.getPort(), TIMEOUT);
528 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
529
530 for (int i = 0; i < 5; i++) {
531 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
532 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
533 new BasicRequestProducer(request, null),
534 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
535 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
536 Assert.assertNotNull(result);
537 final HttpResponse response1 = result.getHead();
538 final String entity1 = result.getBody();
539 Assert.assertNotNull(response1);
540 Assert.assertEquals(200, response1.getCode());
541 Assert.assertEquals("Hi back", entity1);
542 }
543 }
544
545 @Test
546 public void testLargePost() throws Exception {
547 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
548
549 @Override
550 public AsyncServerExchangeHandler get() {
551 return new EchoHandler(2048);
552 }
553
554 });
555 final InetSocketAddress serverEndpoint = server.start();
556
557 client.start();
558 final Future<ClientSessionEndpoint> connectFuture = client.connect(
559 "localhost", serverEndpoint.getPort(), TIMEOUT);
560 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
561
562 for (int i = 0; i < 5; i++) {
563 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
564 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
565 new MultiLineEntityProducer("0123456789abcdef", 5000)),
566 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
567 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
568 Assert.assertNotNull(result);
569 final HttpResponse response = result.getHead();
570 Assert.assertNotNull(response);
571 Assert.assertEquals(200, response.getCode());
572 final String entity = result.getBody();
573 Assert.assertNotNull(entity);
574 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
575 while (t.hasMoreTokens()) {
576 Assert.assertEquals("0123456789abcdef", t.nextToken());
577 }
578 }
579 }
580
581 @Test
582 public void testPostsPipelinedLargeResponse() throws Exception {
583 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
584
585 @Override
586 public AsyncServerExchangeHandler get() {
587 return new MultiLineResponseHandler("0123456789abcdef", 2000);
588 }
589
590 });
591 final InetSocketAddress serverEndpoint = server.start();
592
593 client.start();
594 final Future<ClientSessionEndpoint> connectFuture = client.connect(
595 "localhost", serverEndpoint.getPort(), TIMEOUT);
596 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
597
598 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
599 for (int i = 0; i < 2; i++) {
600 queue.add(streamEndpoint.execute(
601 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
602 AsyncEntityProducers.create("Hi there")),
603 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
604 }
605 while (!queue.isEmpty()) {
606 final Future<Message<HttpResponse, String>> future = queue.remove();
607 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
608 Assert.assertNotNull(result);
609 final HttpResponse response = result.getHead();
610 Assert.assertNotNull(response);
611 Assert.assertEquals(200, response.getCode());
612 final String entity = result.getBody();
613 Assert.assertNotNull(entity);
614 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
615 while (t.hasMoreTokens()) {
616 Assert.assertEquals("0123456789abcdef", t.nextToken());
617 }
618 }
619 }
620
621
622 @Test
623 public void testLargePostsPipelined() throws Exception {
624 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
625
626 @Override
627 public AsyncServerExchangeHandler get() {
628 return new EchoHandler(2048);
629 }
630
631 });
632 final InetSocketAddress serverEndpoint = server.start();
633
634 client.start();
635 final Future<ClientSessionEndpoint> connectFuture = client.connect(
636 "localhost", serverEndpoint.getPort(), TIMEOUT);
637 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
638
639 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
640 for (int i = 0; i < 5; i++) {
641 queue.add(streamEndpoint.execute(
642 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
643 new MultiLineEntityProducer("0123456789abcdef", 5000)),
644 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
645 }
646 while (!queue.isEmpty()) {
647 final Future<Message<HttpResponse, String>> future = queue.remove();
648 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
649 Assert.assertNotNull(result);
650 final HttpResponse response = result.getHead();
651 Assert.assertNotNull(response);
652 Assert.assertEquals(200, response.getCode());
653 final String entity = result.getBody();
654 Assert.assertNotNull(entity);
655 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
656 while (t.hasMoreTokens()) {
657 Assert.assertEquals("0123456789abcdef", t.nextToken());
658 }
659 }
660 }
661
662 @Test
663 public void testSimpleHead() throws Exception {
664 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
665
666 @Override
667 public AsyncServerExchangeHandler get() {
668 return new SingleLineResponseHandler("Hi there");
669 }
670
671 });
672 final InetSocketAddress serverEndpoint = server.start();
673
674 client.start();
675 final Future<ClientSessionEndpoint> connectFuture = client.connect(
676 "localhost", serverEndpoint.getPort(), TIMEOUT);
677 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
678
679 for (int i = 0; i < 5; i++) {
680 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
681 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
682 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
683 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
684 Assert.assertNotNull(result);
685 final HttpResponse response1 = result.getHead();
686 Assert.assertNotNull(response1);
687 Assert.assertEquals(200, response1.getCode());
688 Assert.assertNull(result.getBody());
689 }
690 }
691
692 @Test
693 public void testSimpleHeadConnectionClose() throws Exception {
694 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
695
696 @Override
697 public AsyncServerExchangeHandler get() {
698 return new SingleLineResponseHandler("Hi there");
699 }
700
701 });
702 final InetSocketAddress serverEndpoint = server.start();
703
704 client.start();
705 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
706 for (int i = 0; i < 5; i++) {
707 final Future<ClientSessionEndpoint> connectFuture = client.connect(
708 "localhost", serverEndpoint.getPort(), TIMEOUT);
709 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
710 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
711 AsyncRequestBuilder.head(requestURI)
712 .addHeader(HttpHeaders.CONNECTION, "close")
713 .build(),
714 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
715 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
716 Assert.assertNotNull(result);
717 final HttpResponse response1 = result.getHead();
718 Assert.assertNotNull(response1);
719 Assert.assertEquals(200, response1.getCode());
720 Assert.assertNull(result.getBody());
721 }
722 }
723 }
724
725 @Test
726 public void testHeadPipelined() throws Exception {
727 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
728
729 @Override
730 public AsyncServerExchangeHandler get() {
731 return new SingleLineResponseHandler("Hi there");
732 }
733
734 });
735 final InetSocketAddress serverEndpoint = server.start();
736
737 client.start();
738 final Future<ClientSessionEndpoint> connectFuture = client.connect(
739 "localhost", serverEndpoint.getPort(), TIMEOUT);
740 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
741
742 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
743 for (int i = 0; i < 5; i++) {
744 queue.add(streamEndpoint.execute(
745 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
746 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
747 }
748 while (!queue.isEmpty()) {
749 final Future<Message<HttpResponse, String>> future = queue.remove();
750 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
751 Assert.assertNotNull(result);
752 final HttpResponse response1 = result.getHead();
753 Assert.assertNotNull(response1);
754 Assert.assertEquals(200, response1.getCode());
755 Assert.assertNull(result.getBody());
756 }
757 }
758
759 @Test
760 public void testExpectationFailed() throws Exception {
761 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
762
763 @Override
764 public AsyncServerExchangeHandler get() {
765 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
766
767 @Override
768 protected void handle(
769 final Message<HttpRequest, String> request,
770 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
771 final HttpContext context) throws IOException, HttpException {
772 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
773
774 }
775 };
776 }
777
778 });
779 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
780
781 @Override
782 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
783
784 return new BasicAsyncServerExpectationDecorator(handler) {
785
786 @Override
787 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
788 final Header h = request.getFirstHeader("password");
789 if (h != null && "secret".equals(h.getValue())) {
790 return null;
791 } else {
792 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
793 }
794 }
795 };
796
797 }
798 }, Http1Config.DEFAULT);
799
800 client.start();
801 final Future<IOSession> sessionFuture = client.requestSession(
802 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
803 final IOSession ioSession = sessionFuture.get();
804 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
805
806 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
807 request1.addHeader("password", "secret");
808 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
809 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
810 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
811 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
812 Assert.assertNotNull(result1);
813 final HttpResponse response1 = result1.getHead();
814 Assert.assertNotNull(response1);
815 Assert.assertEquals(200, response1.getCode());
816 Assert.assertNotNull("All is well", result1.getBody());
817
818 Assert.assertTrue(ioSession.isOpen());
819
820 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
821 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
822 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
823 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
824 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
825 Assert.assertNotNull(result2);
826 final HttpResponse response2 = result2.getHead();
827 Assert.assertNotNull(response2);
828 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
829 Assert.assertNotNull("You shall not pass", result2.getBody());
830
831 Assert.assertTrue(ioSession.isOpen());
832
833 final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
834 request3.addHeader("password", "secret");
835 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
836 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
837 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
838 final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
839 Assert.assertNotNull(result3);
840 final HttpResponse response3 = result3.getHead();
841 Assert.assertNotNull(response3);
842 Assert.assertEquals(200, response3.getCode());
843 Assert.assertNotNull("All is well", result3.getBody());
844
845 Assert.assertTrue(ioSession.isOpen());
846
847 final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
848 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
849 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
850 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
851 final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
852 Assert.assertNotNull(result4);
853 final HttpResponse response4 = result4.getHead();
854 Assert.assertNotNull(response4);
855 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
856 Assert.assertNotNull("You shall not pass", result4.getBody());
857
858 Assert.assertFalse(ioSession.isOpen());
859 }
860
861 @Test
862 public void testExpectationFailedCloseConnection() throws Exception {
863 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
864
865 @Override
866 public AsyncServerExchangeHandler get() {
867 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
868
869 @Override
870 protected void handle(
871 final Message<HttpRequest, String> request,
872 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
873 final HttpContext context) throws IOException, HttpException {
874 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
875
876 }
877 };
878 }
879
880 });
881 final InetSocketAddress serverEndpoint = server.start(null, new Decorator<AsyncServerExchangeHandler>() {
882
883 @Override
884 public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler handler) {
885
886 return new BasicAsyncServerExpectationDecorator(handler) {
887
888 @Override
889 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
890 final Header h = request.getFirstHeader("password");
891 if (h != null && "secret".equals(h.getValue())) {
892 return null;
893 } else {
894 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
895 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
896 return new BasicResponseProducer(response, "You shall not pass");
897 }
898 }
899 };
900
901 }
902 }, Http1Config.DEFAULT);
903
904 client.start();
905 final Future<IOSession> sessionFuture = client.requestSession(
906 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
907 final IOSession ioSession = sessionFuture.get();
908 final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession);
909
910 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
911 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
912 new BasicRequestProducer(request1, new MultiBinEntityProducer(
913 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
914 100000,
915 ContentType.TEXT_PLAIN)),
916 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
917 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
918 Assert.assertNotNull(result1);
919 final HttpResponse response1 = result1.getHead();
920 Assert.assertNotNull(response1);
921 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
922 Assert.assertNotNull("You shall not pass", result1.getBody());
923
924 Assert.assertFalse(streamEndpoint.isOpen());
925 }
926
927 @Test
928 public void testDelayedExpectationVerification() throws Exception {
929 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
930
931 @Override
932 public AsyncServerExchangeHandler get() {
933 return new AsyncServerExchangeHandler() {
934
935 private final Random random = new Random(System.currentTimeMillis());
936 private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
937 "All is well");
938
939 @Override
940 public void handleRequest(
941 final HttpRequest request,
942 final EntityDetails entityDetails,
943 final ResponseChannel responseChannel,
944 final HttpContext context) throws HttpException, IOException {
945
946 Executors.newSingleThreadExecutor().execute(new Runnable() {
947 @Override
948 public void run() {
949 try {
950 if (entityDetails != null) {
951 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
952 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
953 Thread.sleep(random.nextInt(1000));
954 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
955 }
956 final HttpResponse response = new BasicHttpResponse(200);
957 synchronized (entityProducer) {
958 responseChannel.sendResponse(response, entityProducer, context);
959 }
960 }
961 } catch (final Exception ignore) {
962
963 }
964 }
965 });
966
967 }
968
969 @Override
970 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
971 capacityChannel.update(Integer.MAX_VALUE);
972 }
973
974 @Override
975 public void consume(final ByteBuffer src) throws IOException {
976 }
977
978 @Override
979 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
980 }
981
982 @Override
983 public int available() {
984 synchronized (entityProducer) {
985 return entityProducer.available();
986 }
987 }
988
989 @Override
990 public void produce(final DataStreamChannel channel) throws IOException {
991 synchronized (entityProducer) {
992 entityProducer.produce(channel);
993 }
994 }
995
996 @Override
997 public void failed(final Exception cause) {
998 }
999
1000 @Override
1001 public void releaseResources() {
1002 }
1003
1004 };
1005
1006 }
1007 });
1008 final InetSocketAddress serverEndpoint = server.start();
1009
1010 client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
1011 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1012 "localhost", serverEndpoint.getPort(), TIMEOUT);
1013 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1014
1015 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1016 for (int i = 0; i < 5; i++) {
1017 queue.add(streamEndpoint.execute(
1018 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1019 AsyncEntityProducers.create("Some important message")),
1020 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1021 }
1022 while (!queue.isEmpty()) {
1023 final Future<Message<HttpResponse, String>> future = queue.remove();
1024 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1025 Assert.assertNotNull(result);
1026 final HttpResponse response = result.getHead();
1027 Assert.assertNotNull(response);
1028 Assert.assertEquals(200, response.getCode());
1029 Assert.assertNotNull("All is well", result.getBody());
1030 }
1031 }
1032
1033 @Test
1034 public void testPrematureResponse() throws Exception {
1035 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1036
1037 @Override
1038 public AsyncServerExchangeHandler get() {
1039 return new AsyncServerExchangeHandler() {
1040
1041 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>(null);
1042
1043 @Override
1044 public void handleRequest(
1045 final HttpRequest request,
1046 final EntityDetails entityDetails,
1047 final ResponseChannel responseChannel,
1048 final HttpContext context) throws HttpException, IOException {
1049 final AsyncResponseProducer producer;
1050 final Header h = request.getFirstHeader("password");
1051 if (h != null && "secret".equals(h.getValue())) {
1052 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1053 } else {
1054 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1055 }
1056 responseProducer.set(producer);
1057 producer.sendResponse(responseChannel, context);
1058 }
1059
1060 @Override
1061 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1062 capacityChannel.update(Integer.MAX_VALUE);
1063 }
1064
1065 @Override
1066 public void consume(final ByteBuffer src) throws IOException {
1067 }
1068
1069 @Override
1070 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1071 }
1072
1073 @Override
1074 public int available() {
1075 final AsyncResponseProducer producer = responseProducer.get();
1076 return producer.available();
1077 }
1078
1079 @Override
1080 public void produce(final DataStreamChannel channel) throws IOException {
1081 final AsyncResponseProducer producer = responseProducer.get();
1082 producer.produce(channel);
1083 }
1084
1085 @Override
1086 public void failed(final Exception cause) {
1087 }
1088
1089 @Override
1090 public void releaseResources() {
1091 }
1092 };
1093 }
1094
1095 });
1096 final InetSocketAddress serverEndpoint = server.start();
1097
1098 client.start();
1099 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1100 "localhost", serverEndpoint.getPort(), TIMEOUT);
1101 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1102
1103 for (int i = 0; i < 3; i++) {
1104 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1105 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1106 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1107 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1108 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1109 Assert.assertNotNull(result1);
1110 final HttpResponse response1 = result1.getHead();
1111 Assert.assertNotNull(response1);
1112 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1113 Assert.assertNotNull("You shall not pass", result1.getBody());
1114
1115 Assert.assertTrue(streamEndpoint.isOpen());
1116 }
1117 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1118 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1119 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1120 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1121 100000,
1122 ContentType.TEXT_PLAIN)),
1123 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1124 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1125 Assert.assertNotNull(result1);
1126 final HttpResponse response1 = result1.getHead();
1127 Assert.assertNotNull(response1);
1128 Assert.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1129 Assert.assertNotNull("You shall not pass", result1.getBody());
1130
1131 Assert.assertFalse(streamEndpoint.isOpen());
1132 }
1133
1134 @Test
1135 public void testSlowResponseConsumer() throws Exception {
1136 server.register("/", new Supplier<AsyncServerExchangeHandler>() {
1137
1138 @Override
1139 public AsyncServerExchangeHandler get() {
1140 return new MultiLineResponseHandler("0123456789abcd", 100);
1141 }
1142
1143 });
1144 final InetSocketAddress serverEndpoint = server.start();
1145
1146 client.start(Http1Config.custom().setBufferSize(256).build());
1147
1148 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1149 "localhost", serverEndpoint.getPort(), TIMEOUT);
1150 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1151
1152 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1153 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1154 new BasicRequestProducer(request1, null),
1155 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1156
1157 @Override
1158 protected String consumeData(
1159 final ContentType contentType, final InputStream inputStream) throws IOException {
1160 Charset charset = contentType != null ? contentType.getCharset() : null;
1161 if (charset == null) {
1162 charset = StandardCharsets.US_ASCII;
1163 }
1164
1165 final StringBuilder buffer = new StringBuilder();
1166 try {
1167 final byte[] tmp = new byte[16];
1168 int l;
1169 while ((l = inputStream.read(tmp)) != -1) {
1170 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1171 Thread.sleep(50);
1172 }
1173 } catch (final InterruptedException ex) {
1174 Thread.currentThread().interrupt();
1175 throw new InterruptedIOException(ex.getMessage());
1176 }
1177 return buffer.toString();
1178 }
1179 }),
1180 null);
1181
1182 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1183 Assert.assertNotNull(result1);
1184 final HttpResponse response1 = result1.getHead();
1185 Assert.assertNotNull(response1);
1186 Assert.assertEquals(200, response1.getCode());
1187 final String s1 = result1.getBody();
1188 Assert.assertNotNull(s1);
1189 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1190 while (t1.hasMoreTokens()) {
1191 Assert.assertEquals("0123456789abcd", t1.nextToken());
1192 }
1193 }
1194
1195 @Test
1196 public void testSlowRequestProducer() throws Exception {
1197 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1198
1199 @Override
1200 public AsyncServerExchangeHandler get() {
1201 return new EchoHandler(2048);
1202 }
1203
1204 });
1205 final InetSocketAddress serverEndpoint = server.start();
1206
1207 client.start();
1208 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1209 "localhost", serverEndpoint.getPort(), TIMEOUT);
1210 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1211
1212 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1213 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1214 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1215
1216 @Override
1217 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1218 Charset charset = contentType.getCharset();
1219 if (charset == null) {
1220 charset = StandardCharsets.US_ASCII;
1221 }
1222 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1223 for (int i = 0; i < 500; i++) {
1224 if (i % 100 == 0) {
1225 writer.flush();
1226 Thread.sleep(500);
1227 }
1228 writer.write("0123456789abcdef\r\n");
1229 }
1230 } catch (final InterruptedException ex) {
1231 Thread.currentThread().interrupt();
1232 throw new InterruptedIOException(ex.getMessage());
1233 }
1234 }
1235
1236 }),
1237 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1238 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1239 Assert.assertNotNull(result1);
1240 final HttpResponse response1 = result1.getHead();
1241 Assert.assertNotNull(response1);
1242 Assert.assertEquals(200, response1.getCode());
1243 final String s1 = result1.getBody();
1244 Assert.assertNotNull(s1);
1245 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1246 while (t1.hasMoreTokens()) {
1247 Assert.assertEquals("0123456789abcdef", t1.nextToken());
1248 }
1249 }
1250
1251 @Test
1252 public void testSlowResponseProducer() throws Exception {
1253 server.register("*", new Supplier<AsyncServerExchangeHandler>() {
1254
1255 @Override
1256 public AsyncServerExchangeHandler get() {
1257 return new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1258
1259 @Override
1260 protected void handle(
1261 final HttpRequest request,
1262 final InputStream requestStream,
1263 final HttpResponse response,
1264 final OutputStream responseStream,
1265 final HttpContext context) throws IOException, HttpException {
1266
1267 if (!"/hello".equals(request.getPath())) {
1268 response.setCode(HttpStatus.SC_NOT_FOUND);
1269 return;
1270 }
1271 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1272 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1273 return;
1274 }
1275 if (requestStream == null) {
1276 return;
1277 }
1278 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1279 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1280 Charset charset = contentType != null ? contentType.getCharset() : null;
1281 if (charset == null) {
1282 charset = StandardCharsets.US_ASCII;
1283 }
1284 response.setCode(HttpStatus.SC_OK);
1285 response.setHeader(h1);
1286 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1287 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1288 try {
1289 String l;
1290 int count = 0;
1291 while ((l = reader.readLine()) != null) {
1292 writer.write(l);
1293 writer.write("\r\n");
1294 count++;
1295 if (count % 500 == 0) {
1296 Thread.sleep(500);
1297 }
1298 }
1299 writer.flush();
1300 } catch (final InterruptedException ex) {
1301 Thread.currentThread().interrupt();
1302 throw new InterruptedIOException(ex.getMessage());
1303 }
1304 }
1305 }
1306 };
1307 }
1308
1309 });
1310 final InetSocketAddress serverEndpoint = server.start();
1311
1312 client.start(Http1Config.custom().setBufferSize(256).build());
1313
1314 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1315 "localhost", serverEndpoint.getPort(), TIMEOUT);
1316 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1317
1318 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1319 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1320 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1321 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1322 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1323 Assert.assertNotNull(result1);
1324 final HttpResponse response1 = result1.getHead();
1325 Assert.assertNotNull(response1);
1326 Assert.assertEquals(200, response1.getCode());
1327 final String s1 = result1.getBody();
1328 Assert.assertNotNull(s1);
1329 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1330 while (t1.hasMoreTokens()) {
1331 Assert.assertEquals("0123456789abcd", t1.nextToken());
1332 }
1333 }
1334
1335 @Test
1336 public void testPipelinedConnectionClose() throws Exception {
1337 server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1338
1339 @Override
1340 public AsyncServerExchangeHandler get() {
1341 return new SingleLineResponseHandler("Hi back");
1342 }
1343
1344 });
1345 final InetSocketAddress serverEndpoint = server.start();
1346
1347 client.start();
1348 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1349 "localhost", serverEndpoint.getPort(), TIMEOUT);
1350 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1351
1352 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1353 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1354 AsyncEntityProducers.create("Hi there")),
1355 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1356 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1357 request2.addHeader(HttpHeaders.CONNECTION, "close");
1358 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1359 new BasicRequestProducer(request2,
1360 AsyncEntityProducers.create("Hi there")),
1361 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1362 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1363 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1364 AsyncEntityProducers.create("Hi there")),
1365 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1366
1367 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1368 Assert.assertNotNull(result1);
1369 final HttpResponse response1 = result1.getHead();
1370 final String entity1 = result1.getBody();
1371 Assert.assertNotNull(response1);
1372 Assert.assertEquals(200, response1.getCode());
1373 Assert.assertEquals("Hi back", entity1);
1374
1375 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1376 Assert.assertNotNull(result2);
1377 final HttpResponse response2 = result2.getHead();
1378 final String entity2 = result2.getBody();
1379 Assert.assertNotNull(response2);
1380 Assert.assertEquals(200, response2.getCode());
1381 Assert.assertEquals("Hi back", entity2);
1382
1383 try {
1384 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1385 Assert.fail("ExecutionException expected");
1386 } catch (final CancellationException | ExecutionException ignore) {
1387 }
1388
1389 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1390 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1391 AsyncEntityProducers.create("Hi there")),
1392 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1393 try {
1394 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1395 Assert.fail("CancellationException or ExecutionException expected");
1396 } catch (final CancellationException ignore) {
1397 Assert.assertTrue(future4.isCancelled());
1398 } catch (final ExecutionException ignore) {
1399 }
1400 }
1401
1402 @Test
1403 public void testPipelinedInvalidRequest() throws Exception {
1404 server.register("/hello*", new Supplier<AsyncServerExchangeHandler>() {
1405
1406 @Override
1407 public AsyncServerExchangeHandler get() {
1408 return new SingleLineResponseHandler("Hi back");
1409 }
1410
1411 });
1412 final InetSocketAddress serverEndpoint = server.start();
1413
1414 client.start();
1415 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1416 "localhost", serverEndpoint.getPort(), TIMEOUT);
1417 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1418
1419 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1420 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1421 AsyncEntityProducers.create("Hi there")),
1422 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1423 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1424 request2.addHeader(HttpHeaders.HOST, "blah:blah");
1425 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1426 new BasicRequestProducer(request2,
1427 AsyncEntityProducers.create("Hi there")),
1428 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1429 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1430 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1431 AsyncEntityProducers.create("Hi there")),
1432 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1433
1434 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1435 Assert.assertNotNull(result1);
1436 final HttpResponse response1 = result1.getHead();
1437 final String entity1 = result1.getBody();
1438 Assert.assertNotNull(response1);
1439 Assert.assertEquals(200, response1.getCode());
1440 Assert.assertEquals("Hi back", entity1);
1441
1442 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1443 Assert.assertNotNull(result2);
1444 final HttpResponse response2 = result2.getHead();
1445 final String entity2 = result2.getBody();
1446 Assert.assertNotNull(response2);
1447 Assert.assertEquals(400, response2.getCode());
1448 Assert.assertTrue(entity2.length() > 0);
1449
1450 try {
1451 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1452 Assert.fail("ExecutionException expected");
1453 } catch (final CancellationException | ExecutionException ignore) {
1454 }
1455 }
1456
1457 private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1458
1459 private static class BrokenChunkEncoder extends AbstractContentEncoder {
1460
1461 private final CharArrayBuffer lineBuffer;
1462 private boolean done;
1463
1464 BrokenChunkEncoder(
1465 final WritableByteChannel channel,
1466 final SessionOutputBuffer buffer,
1467 final BasicHttpTransportMetrics metrics) {
1468 super(channel, buffer, metrics);
1469 lineBuffer = new CharArrayBuffer(16);
1470 }
1471
1472 @Override
1473 public void complete(final List<? extends Header> trailers) throws IOException {
1474 super.complete(trailers);
1475 }
1476
1477 @Override
1478 public int write(final ByteBuffer src) throws IOException {
1479 final int chunk;
1480 if (!done) {
1481 lineBuffer.clear();
1482 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1483 buffer().writeLine(lineBuffer);
1484 buffer().write(ByteBuffer.wrap(GARBAGE));
1485 done = true;
1486 chunk = GARBAGE.length;
1487 } else {
1488 chunk = 0;
1489 }
1490 final long bytesWritten = buffer().flush(channel());
1491 if (bytesWritten > 0) {
1492 metrics().incrementBytesTransferred(bytesWritten);
1493 }
1494 if (!buffer().hasData()) {
1495 channel().close();
1496 }
1497 return chunk;
1498 }
1499
1500 }
1501
1502 @Test
1503 public void testTruncatedChunk() throws Exception {
1504 final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1505 HttpProcessors.server(),
1506 new HandlerFactory<AsyncServerExchangeHandler>() {
1507
1508 @Override
1509 public AsyncServerExchangeHandler create(
1510 final HttpRequest request,
1511 final HttpContext context) throws HttpException {
1512 return new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1513
1514 @Override
1515 protected void handle(
1516 final Message<HttpRequest, String> request,
1517 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1518 final HttpContext context) throws IOException, HttpException {
1519 responseTrigger.submitResponse(
1520 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1521 }
1522
1523 };
1524 }
1525
1526 },
1527 Http1Config.DEFAULT,
1528 CharCodingConfig.DEFAULT,
1529 DefaultConnectionReuseStrategy.INSTANCE,
1530 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1531
1532 @Override
1533 protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1534 final ProtocolIOSession ioSession,
1535 final HttpProcessor httpProcessor,
1536 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1537 final Http1Config http1Config,
1538 final CharCodingConfig connectionConfig,
1539 final ConnectionReuseStrategy connectionReuseStrategy,
1540 final NHttpMessageParser<HttpRequest> incomingMessageParser,
1541 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1542 final ContentLengthStrategy incomingContentStrategy,
1543 final ContentLengthStrategy outgoingContentStrategy,
1544 final Http1StreamListener streamListener) {
1545 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1546 scheme.id,
1547 http1Config, connectionConfig, connectionReuseStrategy,
1548 incomingMessageParser, outgoingMessageWriter,
1549 incomingContentStrategy, outgoingContentStrategy,
1550 streamListener) {
1551
1552 @Override
1553 protected ContentEncoder createContentEncoder(
1554 final long len,
1555 final WritableByteChannel channel,
1556 final SessionOutputBuffer buffer,
1557 final BasicHttpTransportMetrics metrics) throws HttpException {
1558 if (len == ContentLengthStrategy.CHUNKED) {
1559 return new BrokenChunkEncoder(channel, buffer, metrics);
1560 } else {
1561 return super.createContentEncoder(len, channel, buffer, metrics);
1562 }
1563 }
1564
1565 };
1566 }
1567
1568 });
1569
1570 client.start();
1571 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1572 "localhost", serverEndpoint.getPort(), TIMEOUT);
1573 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1574
1575 final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1576 final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1577
1578 @Override
1579 public void releaseResources() {
1580
1581 }
1582
1583 };
1584 final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1585 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1586 try {
1587 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1588 Assert.fail("ExecutionException should have been thrown");
1589 } catch (final ExecutionException ex) {
1590 final Throwable cause = ex.getCause();
1591 Assert.assertTrue(cause instanceof MalformedChunkCodingException);
1592 Assert.assertEquals("garbage", entityConsumer.generateContent());
1593 }
1594 }
1595
1596 @Test
1597 public void testExceptionInHandler() throws Exception {
1598 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1599
1600 @Override
1601 public AsyncServerExchangeHandler get() {
1602 return new SingleLineResponseHandler("Hi there") {
1603
1604 @Override
1605 protected void handle(
1606 final Message<HttpRequest, String> request,
1607 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1608 final HttpContext context) throws IOException, HttpException {
1609 throw new HttpException("Boom");
1610 }
1611 };
1612 }
1613
1614 });
1615 final InetSocketAddress serverEndpoint = server.start();
1616
1617 client.start();
1618 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1619 "localhost", serverEndpoint.getPort(), TIMEOUT);
1620 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1621
1622 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1623 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1624 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1625 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1626 Assert.assertNotNull(result);
1627 final HttpResponse response1 = result.getHead();
1628 final String entity1 = result.getBody();
1629 Assert.assertNotNull(response1);
1630 Assert.assertEquals(500, response1.getCode());
1631 Assert.assertEquals("Boom", entity1);
1632 }
1633
1634 @Test
1635 public void testNoServiceHandler() throws Exception {
1636 final InetSocketAddress serverEndpoint = server.start();
1637
1638 client.start();
1639 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1640 "localhost", serverEndpoint.getPort(), TIMEOUT);
1641 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1642
1643 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1644 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1645 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1646 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1647 Assert.assertNotNull(result);
1648 final HttpResponse response1 = result.getHead();
1649 final String entity1 = result.getBody();
1650 Assert.assertNotNull(response1);
1651 Assert.assertEquals(404, response1.getCode());
1652 Assert.assertEquals("Resource not found", entity1);
1653 }
1654
1655 @Test
1656 public void testResponseNoContent() throws Exception {
1657 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1658
1659 @Override
1660 public AsyncServerExchangeHandler get() {
1661 return new SingleLineResponseHandler("Hi there") {
1662
1663 @Override
1664 protected void handle(
1665 final Message<HttpRequest, String> request,
1666 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1667 final HttpContext context) throws IOException, HttpException {
1668 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1669 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1670 }
1671 };
1672 }
1673
1674 });
1675 final InetSocketAddress serverEndpoint = server.start();
1676
1677 client.start();
1678 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1679 "localhost", serverEndpoint.getPort(), TIMEOUT);
1680 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1681
1682 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1683 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1684 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1685 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1686 Assert.assertNotNull(result);
1687 final HttpResponse response1 = result.getHead();
1688 Assert.assertNotNull(response1);
1689 Assert.assertEquals(204, response1.getCode());
1690 Assert.assertNull(result.getBody());
1691 }
1692
1693 @Test
1694 public void testAbsentHostHeader() throws Exception {
1695 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1696
1697 @Override
1698 public AsyncServerExchangeHandler get() {
1699 return new SingleLineResponseHandler("Hi there");
1700 }
1701
1702 });
1703 final InetSocketAddress serverEndpoint = server.start();
1704
1705 client.start(new DefaultHttpProcessor(new RequestContent(), new RequestConnControl()), Http1Config.DEFAULT);
1706
1707 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1708 "localhost", serverEndpoint.getPort(), TIMEOUT);
1709 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1710
1711 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1712 request1.setVersion(HttpVersion.HTTP_1_0);
1713 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1714 new BasicRequestProducer(request1, null),
1715 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1716 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1717 Assert.assertNotNull(result1);
1718 final HttpResponse response1 = result1.getHead();
1719 Assert.assertNotNull(response1);
1720 Assert.assertEquals(200, response1.getCode());
1721 Assert.assertEquals("Hi there", result1.getBody());
1722
1723 final HttpRequest request2 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1724 request2.setVersion(HttpVersion.HTTP_1_1);
1725 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1726 new BasicRequestProducer(request2, null),
1727 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1728 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1729 Assert.assertNotNull(result2);
1730 final HttpResponse response2 = result2.getHead();
1731 Assert.assertNotNull(response2);
1732 Assert.assertEquals(400, response2.getCode());
1733 Assert.assertEquals("Host header is absent", result2.getBody());
1734 }
1735
1736 @Test
1737 public void testMessageWithTrailers() throws Exception {
1738 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1739
1740 @Override
1741 public AsyncServerExchangeHandler get() {
1742 return new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1743
1744 @Override
1745 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1746 final HttpRequest request,
1747 final EntityDetails entityDetails,
1748 final HttpContext context) throws HttpException {
1749 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1750 }
1751
1752 @Override
1753 protected void handle(
1754 final Message<HttpRequest, String> requestMessage,
1755 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1756 final HttpContext context) throws HttpException, IOException {
1757 responseTrigger.submitResponse(new BasicResponseProducer(
1758 HttpStatus.SC_OK,
1759 new DigestingEntityProducer("MD5",
1760 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1761 }
1762 };
1763 }
1764
1765 });
1766 final InetSocketAddress serverEndpoint = server.start();
1767
1768 client.start();
1769
1770 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1771 "localhost", serverEndpoint.getPort(), TIMEOUT);
1772 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1773
1774 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1775 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1776 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1777 new BasicRequestProducer(request1, null),
1778 new BasicResponseConsumer<>(entityConsumer), null);
1779 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1780 Assert.assertNotNull(result1);
1781 final HttpResponse response1 = result1.getHead();
1782 Assert.assertNotNull(response1);
1783 Assert.assertEquals(200, response1.getCode());
1784 Assert.assertEquals("Hello back with some trailers", result1.getBody());
1785
1786 final List<Header> trailers = entityConsumer.getTrailers();
1787 Assert.assertNotNull(trailers);
1788 Assert.assertEquals(2, trailers.size());
1789 final Map<String, String> map = new HashMap<>();
1790 for (final Header header: trailers) {
1791 map.put(header.getName().toLowerCase(Locale.ROOT), header.getValue());
1792 }
1793 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1794 Assert.assertEquals("MD5", map.get("digest-algo"));
1795 Assert.assertEquals(digest, map.get("digest"));
1796 }
1797
1798 @Test
1799 public void testProtocolException() throws Exception {
1800 server.register("/boom", new Supplier<AsyncServerExchangeHandler>() {
1801
1802 @Override
1803 public AsyncServerExchangeHandler get() {
1804 return new AsyncServerExchangeHandler() {
1805
1806 private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1807
1808 @Override
1809 public void releaseResources() {
1810 entityProducer.releaseResources();
1811 }
1812
1813 @Override
1814 public void handleRequest(
1815 final HttpRequest request,
1816 final EntityDetails entityDetails,
1817 final ResponseChannel responseChannel,
1818 final HttpContext context) throws HttpException, IOException {
1819 final String requestUri = request.getRequestUri();
1820 if (requestUri.endsWith("boom")) {
1821 throw new ProtocolException("Boom!!!");
1822 }
1823 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1824 }
1825
1826 @Override
1827 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1828 capacityChannel.update(Integer.MAX_VALUE);
1829 }
1830
1831 @Override
1832 public void consume(final ByteBuffer src) throws IOException {
1833 }
1834
1835 @Override
1836 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1837
1838 }
1839
1840 @Override
1841 public int available() {
1842 return entityProducer.available();
1843 }
1844
1845 @Override
1846 public void produce(final DataStreamChannel channel) throws IOException {
1847 entityProducer.produce(channel);
1848 }
1849
1850 @Override
1851 public void failed(final Exception cause) {
1852 releaseResources();
1853 }
1854
1855 };
1856 }
1857
1858 });
1859
1860 final InetSocketAddress serverEndpoint = server.start();
1861
1862 client.start();
1863 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1864 "localhost", serverEndpoint.getPort(), TIMEOUT);
1865 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1866 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1867 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1868 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1869 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1870 Assert.assertNotNull(result);
1871 final HttpResponse response1 = result.getHead();
1872 final String entity1 = result.getBody();
1873 Assert.assertNotNull(response1);
1874 Assert.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1875 Assert.assertEquals("Boom!!!", entity1);
1876 }
1877
1878 @Test
1879 public void testHeaderTooLarge() throws Exception {
1880 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1881
1882 @Override
1883 public AsyncServerExchangeHandler get() {
1884 return new SingleLineResponseHandler("Hi there");
1885 }
1886
1887 });
1888 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1889 .setMaxLineLength(100)
1890 .build());
1891 client.start();
1892
1893 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1894 "localhost", serverEndpoint.getPort(), TIMEOUT);
1895 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1896
1897 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1898 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1899 "1234567890123456789012345678901234567890");
1900 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1901 new BasicRequestProducer(request1, null),
1902 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1903 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1904 Assert.assertNotNull(result1);
1905 final HttpResponse response1 = result1.getHead();
1906 Assert.assertNotNull(response1);
1907 Assert.assertEquals(431, response1.getCode());
1908 Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1909 }
1910
1911 @Test
1912 public void testHeaderTooLargePost() throws Exception {
1913 server.register("/hello", new Supplier<AsyncServerExchangeHandler>() {
1914
1915 @Override
1916 public AsyncServerExchangeHandler get() {
1917 return new SingleLineResponseHandler("Hi there");
1918 }
1919
1920 });
1921 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1922 .setMaxLineLength(100)
1923 .build());
1924 client.start(
1925 new DefaultHttpProcessor(new RequestContent(), new RequestTargetHost(), new RequestConnControl()), null);
1926
1927 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1928 "localhost", serverEndpoint.getPort(), TIMEOUT);
1929 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1930
1931 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1932 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1933 "1234567890123456789012345678901234567890");
1934
1935 final byte[] b = new byte[2048];
1936 for (int i = 0; i < b.length; i++) {
1937 b[i] = (byte) ('a' + i % 10);
1938 }
1939
1940 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1941 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1942 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1943 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1944 Assert.assertNotNull(result1);
1945 final HttpResponse response1 = result1.getHead();
1946 Assert.assertNotNull(response1);
1947 Assert.assertEquals(431, response1.getCode());
1948 Assert.assertEquals("Maximum line length limit exceeded", result1.getBody());
1949 }
1950
1951 }