1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import static org.hamcrest.MatcherAssert.assertThat;
31
32 import java.io.BufferedReader;
33 import java.io.BufferedWriter;
34 import java.io.IOException;
35 import java.io.InputStream;
36 import java.io.InputStreamReader;
37 import java.io.InterruptedIOException;
38 import java.io.OutputStream;
39 import java.io.OutputStreamWriter;
40 import java.net.InetSocketAddress;
41 import java.net.URI;
42 import java.net.URISyntaxException;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.WritableByteChannel;
45 import java.nio.charset.Charset;
46 import java.nio.charset.StandardCharsets;
47 import java.util.HashMap;
48 import java.util.LinkedList;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Queue;
52 import java.util.Random;
53 import java.util.StringTokenizer;
54 import java.util.concurrent.CancellationException;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.atomic.AtomicReference;
59 import java.util.concurrent.locks.ReentrantLock;
60
61 import org.apache.hc.core5.http.ConnectionReuseStrategy;
62 import org.apache.hc.core5.http.ContentLengthStrategy;
63 import org.apache.hc.core5.http.ContentType;
64 import org.apache.hc.core5.http.EntityDetails;
65 import org.apache.hc.core5.http.Header;
66 import org.apache.hc.core5.http.HeaderElements;
67 import org.apache.hc.core5.http.HttpException;
68 import org.apache.hc.core5.http.HttpHeaders;
69 import org.apache.hc.core5.http.HttpHost;
70 import org.apache.hc.core5.http.HttpRequest;
71 import org.apache.hc.core5.http.HttpResponse;
72 import org.apache.hc.core5.http.HttpStatus;
73 import org.apache.hc.core5.http.HttpVersion;
74 import org.apache.hc.core5.http.MalformedChunkCodingException;
75 import org.apache.hc.core5.http.Message;
76 import org.apache.hc.core5.http.Method;
77 import org.apache.hc.core5.http.ProtocolException;
78 import org.apache.hc.core5.http.URIScheme;
79 import org.apache.hc.core5.http.config.CharCodingConfig;
80 import org.apache.hc.core5.http.config.Http1Config;
81 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
82 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
83 import org.apache.hc.core5.http.impl.Http1StreamListener;
84 import org.apache.hc.core5.http.impl.HttpProcessors;
85 import org.apache.hc.core5.http.impl.nio.AbstractContentEncoder;
86 import org.apache.hc.core5.http.impl.nio.ServerHttp1StreamDuplexer;
87 import org.apache.hc.core5.http.message.BasicHttpRequest;
88 import org.apache.hc.core5.http.message.BasicHttpResponse;
89 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
90 import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
91 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
92 import org.apache.hc.core5.http.nio.AsyncResponseProducer;
93 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
94 import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
95 import org.apache.hc.core5.http.nio.CapacityChannel;
96 import org.apache.hc.core5.http.nio.ContentEncoder;
97 import org.apache.hc.core5.http.nio.DataStreamChannel;
98 import org.apache.hc.core5.http.nio.HandlerFactory;
99 import org.apache.hc.core5.http.nio.NHttpMessageParser;
100 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
101 import org.apache.hc.core5.http.nio.ResponseChannel;
102 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
103 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
104 import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
105 import org.apache.hc.core5.http.nio.entity.DigestingEntityConsumer;
106 import org.apache.hc.core5.http.nio.entity.DigestingEntityProducer;
107 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
108 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
109 import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
110 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
111 import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
112 import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
113 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
114 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
115 import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
116 import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
117 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityConsumer;
118 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicEntityProducer;
119 import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
120 import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
121 import org.apache.hc.core5.http.protocol.HttpContext;
122 import org.apache.hc.core5.http.protocol.HttpProcessor;
123 import org.apache.hc.core5.http.protocol.RequestConnControl;
124 import org.apache.hc.core5.http.protocol.RequestContent;
125 import org.apache.hc.core5.http.protocol.RequestTargetHost;
126 import org.apache.hc.core5.http.protocol.RequestValidateHost;
127 import org.apache.hc.core5.reactor.IOSession;
128 import org.apache.hc.core5.reactor.ProtocolIOSession;
129 import org.apache.hc.core5.testing.SSLTestContexts;
130 import org.apache.hc.core5.testing.nio.extension.Http1TestResources;
131 import org.apache.hc.core5.util.CharArrayBuffer;
132 import org.apache.hc.core5.util.TextUtils;
133 import org.apache.hc.core5.util.Timeout;
134 import org.hamcrest.CoreMatchers;
135 import org.junit.jupiter.api.Assertions;
136 import org.junit.jupiter.api.Test;
137 import org.junit.jupiter.api.extension.RegisterExtension;
138
139 public abstract class Http1IntegrationTest {
140
141 private static final Timeout TIMEOUT = Timeout.ofMinutes(1);
142 private static final Timeout LONG_TIMEOUT = Timeout.ofMinutes(2);
143
144 private final URIScheme scheme;
145
146 private final ReentrantLock lock = new ReentrantLock();
147
148 @RegisterExtension
149 private final Http1TestResources resources;
150
151 public Http1IntegrationTest(final URIScheme scheme) {
152 this.scheme = scheme;
153 this.resources = new Http1TestResources(scheme, TIMEOUT);
154 }
155
156 private URI createRequestURI(final InetSocketAddress serverEndpoint, final String path) {
157 try {
158 return new URI(scheme.id, null, "localhost", serverEndpoint.getPort(), path, null, null);
159 } catch (final URISyntaxException e) {
160 throw new IllegalStateException();
161 }
162 }
163
164 @Test
165 public void testSimpleGet() throws Exception {
166 final Http1TestServer server = resources.server();
167 final Http1TestClient client = resources.client();
168
169 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
170 final InetSocketAddress serverEndpoint = server.start();
171
172 client.start();
173 final Future<ClientSessionEndpoint> connectFuture = client.connect(
174 "localhost", serverEndpoint.getPort(), TIMEOUT);
175 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
176
177 for (int i = 0; i < 5; i++) {
178 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
179 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
180 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
181 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
182 Assertions.assertNotNull(result);
183 final HttpResponse response1 = result.getHead();
184 final String entity1 = result.getBody();
185 Assertions.assertNotNull(response1);
186 Assertions.assertEquals(200, response1.getCode());
187 Assertions.assertEquals("Hi there", entity1);
188 }
189 }
190
191 @Test
192 public void testSimpleGetConnectionClose() throws Exception {
193 final Http1TestServer server = resources.server();
194 final Http1TestClient client = resources.client();
195
196 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
197 final InetSocketAddress serverEndpoint = server.start();
198
199 client.start();
200 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
201 for (int i = 0; i < 5; i++) {
202 final Future<ClientSessionEndpoint> connectFuture = client.connect(
203 "localhost", serverEndpoint.getPort(), TIMEOUT);
204 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
205 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
206 AsyncRequestBuilder.get(requestURI)
207 .addHeader(HttpHeaders.CONNECTION, "close")
208 .build(),
209 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
210 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
211 Assertions.assertNotNull(result);
212 final HttpResponse response1 = result.getHead();
213 final String entity1 = result.getBody();
214 Assertions.assertNotNull(response1);
215 Assertions.assertEquals(200, response1.getCode());
216 Assertions.assertEquals("Hi there", entity1);
217 }
218 }
219 }
220
221 @Test
222 public void testSimpleGetIdentityTransfer() throws Exception {
223 final Http1TestServer server = resources.server();
224 final Http1TestClient client = resources.client();
225
226 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
227 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
228 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
229
230 client.start();
231
232 final int reqNo = 5;
233
234 for (int i = 0; i < reqNo; i++) {
235 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
236 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
237
238 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
239 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
240 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
241 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
242
243 streamEndpoint.close();
244
245 Assertions.assertNotNull(result);
246 final HttpResponse response = result.getHead();
247 final String entity = result.getBody();
248 Assertions.assertNotNull(response);
249 Assertions.assertEquals(200, response.getCode());
250 Assertions.assertEquals("Hi there", entity);
251 }
252
253 }
254
255 @Test
256 public void testPostIdentityTransfer() throws Exception {
257 final Http1TestServer server = resources.server();
258 final Http1TestClient client = resources.client();
259
260 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
261 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
262 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
263
264 client.start();
265
266 final int reqNo = 5;
267
268 for (int i = 0; i < reqNo; i++) {
269 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
270 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
271
272 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
273 new BasicRequestProducer(Method.POST,
274 createRequestURI(serverEndpoint, "/hello"),
275 new MultiLineEntityProducer("Hello", 16 * i)),
276 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
277 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
278
279 streamEndpoint.close();
280
281 Assertions.assertNotNull(result);
282 final HttpResponse response = result.getHead();
283 final String entity = result.getBody();
284 Assertions.assertNotNull(response);
285 Assertions.assertEquals(200, response.getCode());
286 Assertions.assertEquals("Hi there", entity);
287 }
288 }
289
290 @Test
291 public void testPostIdentityTransferOutOfSequenceResponse() throws Exception {
292 final Http1TestServer server = resources.server();
293 final Http1TestClient client = resources.client();
294
295 server.register("/hello", () -> new ImmediateResponseExchangeHandler(500, "Go away"));
296 final HttpProcessor httpProcessor = new DefaultHttpProcessor(new RequestValidateHost());
297 final InetSocketAddress serverEndpoint = server.start(httpProcessor, Http1Config.DEFAULT);
298
299 client.start();
300
301 final int reqNo = 5;
302
303 for (int i = 0; i < reqNo; i++) {
304 final Future<ClientSessionEndpoint> connectFuture = client.connect("localhost", serverEndpoint.getPort(), TIMEOUT);
305 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
306
307 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
308 new BasicRequestProducer(Method.POST,
309 createRequestURI(serverEndpoint, "/hello"),
310 new MultiLineEntityProducer("Hello", 16 * i)),
311 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
312 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
313
314 streamEndpoint.close();
315
316 Assertions.assertNotNull(result);
317 final HttpResponse response = result.getHead();
318 final String entity = result.getBody();
319 Assertions.assertNotNull(response);
320 Assertions.assertEquals(500, response.getCode());
321 Assertions.assertEquals("Go away", entity);
322 }
323 }
324
325 @Test
326 public void testSimpleGetsPipelined() throws Exception {
327 final Http1TestServer server = resources.server();
328 final Http1TestClient client = resources.client();
329
330 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
331 final InetSocketAddress serverEndpoint = server.start();
332
333 client.start();
334 final Future<ClientSessionEndpoint> connectFuture = client.connect(
335 "localhost", serverEndpoint.getPort(), TIMEOUT);
336 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
337
338 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
339 for (int i = 0; i < 5; i++) {
340 queue.add(streamEndpoint.execute(
341 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
342 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
343 }
344 while (!queue.isEmpty()) {
345 final Future<Message<HttpResponse, String>> future = queue.remove();
346 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
347 Assertions.assertNotNull(result);
348 final HttpResponse response = result.getHead();
349 final String entity = result.getBody();
350 Assertions.assertNotNull(response);
351 Assertions.assertEquals(200, response.getCode());
352 Assertions.assertEquals("Hi there", entity);
353 }
354 }
355
356 @Test
357 public void testLargeGet() throws Exception {
358 final Http1TestServer server = resources.server();
359 final Http1TestClient client = resources.client();
360
361 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 5000));
362 final InetSocketAddress serverEndpoint = server.start();
363
364 client.start();
365 final Future<ClientSessionEndpoint> connectFuture = client.connect(
366 "localhost", serverEndpoint.getPort(), TIMEOUT);
367 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
368
369 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
370 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
371 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
372
373 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
374 Assertions.assertNotNull(result1);
375 final HttpResponse response1 = result1.getHead();
376 Assertions.assertNotNull(response1);
377 Assertions.assertEquals(200, response1.getCode());
378 final String s1 = result1.getBody();
379 Assertions.assertNotNull(s1);
380 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
381 while (t1.hasMoreTokens()) {
382 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
383 }
384
385 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
386 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
387 new BasicResponseConsumer<>(new StringAsyncEntityConsumer(512)), null);
388
389 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
390 Assertions.assertNotNull(result2);
391 final HttpResponse response2 = result2.getHead();
392 Assertions.assertNotNull(response2);
393 Assertions.assertEquals(200, response2.getCode());
394 final String s2 = result2.getBody();
395 Assertions.assertNotNull(s2);
396 final StringTokenizer t2 = new StringTokenizer(s2, "\r\n");
397 while (t2.hasMoreTokens()) {
398 Assertions.assertEquals("0123456789abcdef", t2.nextToken());
399 }
400 }
401
402 @Test
403 public void testLargeGetsPipelined() throws Exception {
404 final Http1TestServer server = resources.server();
405 final Http1TestClient client = resources.client();
406
407 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
408 final InetSocketAddress serverEndpoint = server.start();
409
410 client.start();
411 final Future<ClientSessionEndpoint> connectFuture = client.connect(
412 "localhost", serverEndpoint.getPort(), TIMEOUT);
413 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
414
415 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
416 for (int i = 0; i < 5; i++) {
417 queue.add(streamEndpoint.execute(
418 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/")),
419 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
420 }
421 while (!queue.isEmpty()) {
422 final Future<Message<HttpResponse, String>> future = queue.remove();
423 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
424 Assertions.assertNotNull(result);
425 final HttpResponse response = result.getHead();
426 Assertions.assertNotNull(response);
427 Assertions.assertEquals(200, response.getCode());
428 final String entity = result.getBody();
429 Assertions.assertNotNull(entity);
430 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
431 while (t.hasMoreTokens()) {
432 Assertions.assertEquals("0123456789abcdef", t.nextToken());
433 }
434 }
435 }
436
437 @Test
438 public void testBasicPost() throws Exception {
439 final Http1TestServer server = resources.server();
440 final Http1TestClient client = resources.client();
441
442 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
443 final InetSocketAddress serverEndpoint = server.start();
444
445 client.start();
446 final Future<ClientSessionEndpoint> connectFuture = client.connect(
447 "localhost", serverEndpoint.getPort(), TIMEOUT);
448 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
449
450 for (int i = 0; i < 5; i++) {
451 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
452 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
453 AsyncEntityProducers.create("Hi there")),
454 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
455 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
456 Assertions.assertNotNull(result);
457 final HttpResponse response1 = result.getHead();
458 final String entity1 = result.getBody();
459 Assertions.assertNotNull(response1);
460 Assertions.assertEquals(200, response1.getCode());
461 Assertions.assertEquals("Hi back", entity1);
462 }
463 }
464
465 @Test
466 public void testBasicPostPipelined() throws Exception {
467 final Http1TestServer server = resources.server();
468 final Http1TestClient client = resources.client();
469
470 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
471 final InetSocketAddress serverEndpoint = server.start();
472
473 client.start();
474 final Future<ClientSessionEndpoint> connectFuture = client.connect(
475 "localhost", serverEndpoint.getPort(), TIMEOUT);
476 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
477
478 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
479 for (int i = 0; i < 5; i++) {
480 queue.add(streamEndpoint.execute(
481 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello"),
482 AsyncEntityProducers.create("Hi there")),
483 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
484 }
485 while (!queue.isEmpty()) {
486 final Future<Message<HttpResponse, String>> future = queue.remove();
487 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
488 Assertions.assertNotNull(result);
489 final HttpResponse response = result.getHead();
490 final String entity = result.getBody();
491 Assertions.assertNotNull(response);
492 Assertions.assertEquals(200, response.getCode());
493 Assertions.assertEquals("Hi back", entity);
494 }
495 }
496
497 @Test
498 public void testHttp10Post() throws Exception {
499 final Http1TestServer server = resources.server();
500 final Http1TestClient client = resources.client();
501
502 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
503 final InetSocketAddress serverEndpoint = server.start();
504
505 client.start();
506 final Future<ClientSessionEndpoint> connectFuture = client.connect(
507 "localhost", serverEndpoint.getPort(), TIMEOUT);
508 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
509
510 for (int i = 0; i < 5; i++) {
511 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
512 request.setVersion(HttpVersion.HTTP_1_0);
513 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
514 new BasicRequestProducer(request, AsyncEntityProducers.create("Hi there")),
515 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
516 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
517 Assertions.assertNotNull(result);
518 final HttpResponse response1 = result.getHead();
519 final String entity1 = result.getBody();
520 Assertions.assertNotNull(response1);
521 Assertions.assertEquals(200, response1.getCode());
522 Assertions.assertEquals("Hi back", entity1);
523 }
524 }
525
526 @Test
527 public void testHTTP11FeaturesDisabledWithHTTP10Requests() throws Exception {
528 final Http1TestServer server = resources.server();
529 final Http1TestClient client = resources.client();
530
531 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
532 final InetSocketAddress serverEndpoint = server.start();
533
534 client.start();
535 final Future<ClientSessionEndpoint> connectFuture = client.connect(
536 "localhost", serverEndpoint.getPort(), TIMEOUT);
537 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
538
539 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
540 request.setVersion(HttpVersion.HTTP_1_0);
541 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
542 new BasicRequestProducer(request, new BasicAsyncEntityProducer(new byte[] {'a', 'b', 'c'}, null, true)),
543 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
544 final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, future::get);
545 Assertions.assertInstanceOf(ProtocolException.class, exception.getCause());
546 }
547
548 @Test
549 public void testNoEntityPost() throws Exception {
550 final Http1TestServer server = resources.server();
551 final Http1TestClient client = resources.client();
552
553 server.register("/hello", () -> new SingleLineResponseHandler("Hi back"));
554 final InetSocketAddress serverEndpoint = server.start();
555
556 client.start();
557 final Future<ClientSessionEndpoint> connectFuture = client.connect(
558 "localhost", serverEndpoint.getPort(), TIMEOUT);
559 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
560
561 for (int i = 0; i < 5; i++) {
562 final HttpRequest request = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
563 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
564 new BasicRequestProducer(request, null),
565 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
566 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
567 Assertions.assertNotNull(result);
568 final HttpResponse response1 = result.getHead();
569 final String entity1 = result.getBody();
570 Assertions.assertNotNull(response1);
571 Assertions.assertEquals(200, response1.getCode());
572 Assertions.assertEquals("Hi back", entity1);
573 }
574 }
575
576 @Test
577 public void testLargePost() throws Exception {
578 final Http1TestServer server = resources.server();
579 final Http1TestClient client = resources.client();
580
581 server.register("*", () -> new EchoHandler(2048));
582 final InetSocketAddress serverEndpoint = server.start();
583
584 client.start();
585 final Future<ClientSessionEndpoint> connectFuture = client.connect(
586 "localhost", serverEndpoint.getPort(), TIMEOUT);
587 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
588
589 for (int i = 0; i < 5; i++) {
590 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
591 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
592 new MultiLineEntityProducer("0123456789abcdef", 5000)),
593 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
594 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
595 Assertions.assertNotNull(result);
596 final HttpResponse response = result.getHead();
597 Assertions.assertNotNull(response);
598 Assertions.assertEquals(200, response.getCode());
599 final String entity = result.getBody();
600 Assertions.assertNotNull(entity);
601 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
602 while (t.hasMoreTokens()) {
603 Assertions.assertEquals("0123456789abcdef", t.nextToken());
604 }
605 }
606 }
607
608 @Test
609 public void testPostsPipelinedLargeResponse() throws Exception {
610 final Http1TestServer server = resources.server();
611 final Http1TestClient client = resources.client();
612
613 server.register("/", () -> new MultiLineResponseHandler("0123456789abcdef", 2000));
614 final InetSocketAddress serverEndpoint = server.start();
615
616 client.start();
617 final Future<ClientSessionEndpoint> connectFuture = client.connect(
618 "localhost", serverEndpoint.getPort(), TIMEOUT);
619 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
620
621 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
622 for (int i = 0; i < 2; i++) {
623 queue.add(streamEndpoint.execute(
624 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
625 AsyncEntityProducers.create("Hi there")),
626 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
627 }
628 while (!queue.isEmpty()) {
629 final Future<Message<HttpResponse, String>> future = queue.remove();
630 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
631 Assertions.assertNotNull(result);
632 final HttpResponse response = result.getHead();
633 Assertions.assertNotNull(response);
634 Assertions.assertEquals(200, response.getCode());
635 final String entity = result.getBody();
636 Assertions.assertNotNull(entity);
637 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
638 while (t.hasMoreTokens()) {
639 Assertions.assertEquals("0123456789abcdef", t.nextToken());
640 }
641 }
642 }
643
644
645 @Test
646 public void testLargePostsPipelined() throws Exception {
647 final Http1TestServer server = resources.server();
648 final Http1TestClient client = resources.client();
649
650 server.register("*", () -> new EchoHandler(2048));
651 final InetSocketAddress serverEndpoint = server.start();
652
653 client.start();
654 final Future<ClientSessionEndpoint> connectFuture = client.connect(
655 "localhost", serverEndpoint.getPort(), TIMEOUT);
656 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
657
658 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
659 for (int i = 0; i < 5; i++) {
660 queue.add(streamEndpoint.execute(
661 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/echo"),
662 new MultiLineEntityProducer("0123456789abcdef", 5000)),
663 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
664 }
665 while (!queue.isEmpty()) {
666 final Future<Message<HttpResponse, String>> future = queue.remove();
667 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
668 Assertions.assertNotNull(result);
669 final HttpResponse response = result.getHead();
670 Assertions.assertNotNull(response);
671 Assertions.assertEquals(200, response.getCode());
672 final String entity = result.getBody();
673 Assertions.assertNotNull(entity);
674 final StringTokenizer t = new StringTokenizer(entity, "\r\n");
675 while (t.hasMoreTokens()) {
676 Assertions.assertEquals("0123456789abcdef", t.nextToken());
677 }
678 }
679 }
680
681 @Test
682 public void testSimpleHead() throws Exception {
683 final Http1TestServer server = resources.server();
684 final Http1TestClient client = resources.client();
685
686 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
687 final InetSocketAddress serverEndpoint = server.start();
688
689 client.start();
690 final Future<ClientSessionEndpoint> connectFuture = client.connect(
691 "localhost", serverEndpoint.getPort(), TIMEOUT);
692 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
693
694 for (int i = 0; i < 5; i++) {
695 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
696 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
697 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
698 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
699 Assertions.assertNotNull(result);
700 final HttpResponse response1 = result.getHead();
701 Assertions.assertNotNull(response1);
702 Assertions.assertEquals(200, response1.getCode());
703 Assertions.assertNull(result.getBody());
704 }
705 }
706
707 @Test
708 public void testSimpleHeadConnectionClose() throws Exception {
709 final Http1TestServer server = resources.server();
710 final Http1TestClient client = resources.client();
711
712 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
713 final InetSocketAddress serverEndpoint = server.start();
714
715 client.start();
716 final URI requestURI = createRequestURI(serverEndpoint, "/hello");
717 for (int i = 0; i < 5; i++) {
718 final Future<ClientSessionEndpoint> connectFuture = client.connect(
719 "localhost", serverEndpoint.getPort(), TIMEOUT);
720 try (final ClientSessionEndpoint streamEndpoint = connectFuture.get()) {
721 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
722 AsyncRequestBuilder.head(requestURI)
723 .addHeader(HttpHeaders.CONNECTION, "close")
724 .build(),
725 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
726 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
727 Assertions.assertNotNull(result);
728 final HttpResponse response1 = result.getHead();
729 Assertions.assertNotNull(response1);
730 Assertions.assertEquals(200, response1.getCode());
731 Assertions.assertNull(result.getBody());
732 }
733 }
734 }
735
736 @Test
737 public void testHeadPipelined() throws Exception {
738 final Http1TestServer server = resources.server();
739 final Http1TestClient client = resources.client();
740
741 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
742 final InetSocketAddress serverEndpoint = server.start();
743
744 client.start();
745 final Future<ClientSessionEndpoint> connectFuture = client.connect(
746 "localhost", serverEndpoint.getPort(), TIMEOUT);
747 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
748
749 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
750 for (int i = 0; i < 5; i++) {
751 queue.add(streamEndpoint.execute(
752 new BasicRequestProducer(Method.HEAD, createRequestURI(serverEndpoint, "/hello")),
753 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
754 }
755 while (!queue.isEmpty()) {
756 final Future<Message<HttpResponse, String>> future = queue.remove();
757 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
758 Assertions.assertNotNull(result);
759 final HttpResponse response1 = result.getHead();
760 Assertions.assertNotNull(response1);
761 Assertions.assertEquals(200, response1.getCode());
762 Assertions.assertNull(result.getBody());
763 }
764 }
765
766 @Test
767 public void testExpectationFailed() throws Exception {
768 final Http1TestServer server = resources.server();
769 final Http1TestClient client = resources.client();
770
771 server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
772
773 @Override
774 protected void handle(
775 final Message<HttpRequest, String> request,
776 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
777 final HttpContext context) throws IOException, HttpException {
778 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
779
780 }
781 });
782 final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
783
784 @Override
785 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
786 final Header h = request.getFirstHeader("password");
787 if (h != null && "secret".equals(h.getValue())) {
788 return null;
789 } else {
790 return new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
791 }
792 }
793 }, Http1Config.DEFAULT);
794
795 client.start();
796 final Future<IOSession> sessionFuture = client.requestSession(
797 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
798 final IOSession ioSession = sessionFuture.get();
799 try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
800
801 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
802 request1.addHeader("password", "secret");
803 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
804 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 1000)),
805 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
806 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
807 Assertions.assertNotNull(result1);
808 final HttpResponse response1 = result1.getHead();
809 Assertions.assertNotNull(response1);
810 Assertions.assertEquals(200, response1.getCode());
811 Assertions.assertEquals("All is well", result1.getBody());
812
813 Assertions.assertTrue(ioSession.isOpen());
814
815 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
816 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
817 new BasicRequestProducer(request2, new MultiLineEntityProducer("0123456789abcdef", 5000)),
818 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
819 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
820 Assertions.assertNotNull(result2);
821 final HttpResponse response2 = result2.getHead();
822 Assertions.assertNotNull(response2);
823 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response2.getCode());
824 Assertions.assertEquals("You shall not pass", result2.getBody());
825
826 Assertions.assertTrue(ioSession.isOpen());
827
828 final HttpRequest request3 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
829 request3.addHeader("password", "secret");
830 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
831 new BasicRequestProducer(request3, new MultiLineEntityProducer("0123456789abcdef", 1000)),
832 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
833 final Message<HttpResponse, String> result3 = future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
834 Assertions.assertNotNull(result3);
835 final HttpResponse response3 = result3.getHead();
836 Assertions.assertNotNull(response3);
837 Assertions.assertEquals(200, response3.getCode());
838 Assertions.assertEquals("All is well", result3.getBody());
839
840 Assertions.assertTrue(ioSession.isOpen());
841
842 final HttpRequest request4 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
843 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
844 new BasicRequestProducer(request4, AsyncEntityProducers.create("blah")),
845 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
846 final Message<HttpResponse, String> result4 = future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
847 Assertions.assertNotNull(result4);
848 final HttpResponse response4 = result4.getHead();
849 Assertions.assertNotNull(response4);
850 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response4.getCode());
851 Assertions.assertEquals("You shall not pass", result4.getBody());
852
853 Assertions.assertFalse(ioSession.isOpen());
854 }
855 }
856
857 @Test
858 public void testExpectationFailedCloseConnection() throws Exception {
859 final Http1TestServer server = resources.server();
860 final Http1TestClient client = resources.client();
861
862 server.register("*", () -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
863
864 @Override
865 protected void handle(
866 final Message<HttpRequest, String> request,
867 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
868 final HttpContext context) throws IOException, HttpException {
869 responseTrigger.submitResponse(new BasicResponseProducer(HttpStatus.SC_OK, "All is well"), context);
870
871 }
872 });
873 final InetSocketAddress serverEndpoint = server.start(null, handler -> new BasicAsyncServerExpectationDecorator(handler) {
874
875 @Override
876 protected AsyncResponseProducer verify(final HttpRequest request, final HttpContext context) throws IOException, HttpException {
877 final Header h = request.getFirstHeader("password");
878 if (h != null && "secret".equals(h.getValue())) {
879 return null;
880 } else {
881 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_UNAUTHORIZED);
882 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
883 return new BasicResponseProducer(response, "You shall not pass");
884 }
885 }
886 }, Http1Config.DEFAULT);
887
888 client.start();
889 final Future<IOSession> sessionFuture = client.requestSession(
890 new HttpHost("localhost", serverEndpoint.getPort()), TIMEOUT, null);
891 final IOSession ioSession = sessionFuture.get();
892 try (final ClientSessionEndpoint streamEndpoint = new ClientSessionEndpoint(ioSession)) {
893
894 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
895 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
896 new BasicRequestProducer(request1, new MultiBinEntityProducer(
897 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
898 100000,
899 ContentType.TEXT_PLAIN)),
900 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
901 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
902 Assertions.assertNotNull(result1);
903 final HttpResponse response1 = result1.getHead();
904 Assertions.assertNotNull(response1);
905 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
906 Assertions.assertNotNull("You shall not pass", result1.getBody());
907
908 Assertions.assertFalse(streamEndpoint.isOpen());
909 }
910 }
911
912 @Test
913 public void testDelayedExpectationVerification() throws Exception {
914 final Http1TestServer server = resources.server();
915 final Http1TestClient client = resources.client();
916
917 server.register("*", () -> new AsyncServerExchangeHandler() {
918
919 private final Random random = new Random(System.currentTimeMillis());
920 private final AsyncEntityProducer entityProducer = AsyncEntityProducers.create(
921 "All is well");
922
923 @Override
924 public void handleRequest(
925 final HttpRequest request,
926 final EntityDetails entityDetails,
927 final ResponseChannel responseChannel,
928 final HttpContext context) throws HttpException, IOException {
929
930 Executors.newSingleThreadExecutor().execute(() -> {
931 try {
932 if (entityDetails != null) {
933 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
934 if (h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue())) {
935 Thread.sleep(random.nextInt(1000));
936 responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
937 }
938 final HttpResponse response = new BasicHttpResponse(200);
939 lock.lock();
940 try {
941 responseChannel.sendResponse(response, entityProducer, context);
942 } finally {
943 lock.unlock();
944 }
945 }
946 } catch (final Exception ignore) {
947
948 }
949 });
950
951 }
952
953 @Override
954 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
955 capacityChannel.update(Integer.MAX_VALUE);
956 }
957
958 @Override
959 public void consume(final ByteBuffer src) throws IOException {
960 }
961
962 @Override
963 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
964 }
965
966 @Override
967 public int available() {
968 lock.lock();
969 try {
970 return entityProducer.available();
971 } finally {
972 lock.unlock();
973 }
974 }
975
976 @Override
977 public void produce(final DataStreamChannel channel) throws IOException {
978 lock.lock();
979 try {
980 entityProducer.produce(channel);
981 } finally {
982 lock.unlock();
983 }
984 }
985
986 @Override
987 public void failed(final Exception cause) {
988 }
989
990 @Override
991 public void releaseResources() {
992 }
993
994 });
995 final InetSocketAddress serverEndpoint = server.start();
996
997 client.start(Http1Config.custom().setWaitForContinueTimeout(Timeout.ofMilliseconds(100)).build());
998 final Future<ClientSessionEndpoint> connectFuture = client.connect(
999 "localhost", serverEndpoint.getPort(), TIMEOUT);
1000 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1001
1002 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1003 for (int i = 0; i < 5; i++) {
1004 queue.add(streamEndpoint.execute(
1005 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/"),
1006 AsyncEntityProducers.create("Some important message")),
1007 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1008 }
1009 while (!queue.isEmpty()) {
1010 final Future<Message<HttpResponse, String>> future = queue.remove();
1011 final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1012 Assertions.assertNotNull(result);
1013 final HttpResponse response = result.getHead();
1014 Assertions.assertNotNull(response);
1015 Assertions.assertEquals(200, response.getCode());
1016 Assertions.assertNotNull("All is well", result.getBody());
1017 }
1018 }
1019
1020 @Test
1021 public void testPrematureResponse() throws Exception {
1022 final Http1TestServer server = resources.server();
1023 final Http1TestClient client = resources.client();
1024
1025 server.register("*", () -> new AsyncServerExchangeHandler() {
1026
1027 private final AtomicReference<AsyncResponseProducer> responseProducer = new AtomicReference<>();
1028
1029 @Override
1030 public void handleRequest(
1031 final HttpRequest request,
1032 final EntityDetails entityDetails,
1033 final ResponseChannel responseChannel,
1034 final HttpContext context) throws HttpException, IOException {
1035 final AsyncResponseProducer producer;
1036 final Header h = request.getFirstHeader("password");
1037 if (h != null && "secret".equals(h.getValue())) {
1038 producer = new BasicResponseProducer(HttpStatus.SC_OK, "All is well");
1039 } else {
1040 producer = new BasicResponseProducer(HttpStatus.SC_UNAUTHORIZED, "You shall not pass");
1041 }
1042 responseProducer.set(producer);
1043 producer.sendResponse(responseChannel, context);
1044 }
1045
1046 @Override
1047 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1048 capacityChannel.update(Integer.MAX_VALUE);
1049 }
1050
1051 @Override
1052 public void consume(final ByteBuffer src) throws IOException {
1053 }
1054
1055 @Override
1056 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1057 }
1058
1059 @Override
1060 public int available() {
1061 final AsyncResponseProducer producer = responseProducer.get();
1062 return producer.available();
1063 }
1064
1065 @Override
1066 public void produce(final DataStreamChannel channel) throws IOException {
1067 final AsyncResponseProducer producer = responseProducer.get();
1068 producer.produce(channel);
1069 }
1070
1071 @Override
1072 public void failed(final Exception cause) {
1073 }
1074
1075 @Override
1076 public void releaseResources() {
1077 }
1078 });
1079 final InetSocketAddress serverEndpoint = server.start();
1080
1081 client.start();
1082 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1083 "localhost", serverEndpoint.getPort(), TIMEOUT);
1084 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1085
1086 for (int i = 0; i < 3; i++) {
1087 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1088 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1089 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcdef", 100000)),
1090 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1091 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1092 Assertions.assertNotNull(result1);
1093 final HttpResponse response1 = result1.getHead();
1094 Assertions.assertNotNull(response1);
1095 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1096 Assertions.assertNotNull("You shall not pass", result1.getBody());
1097
1098 Assertions.assertTrue(streamEndpoint.isOpen());
1099 }
1100 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1101 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1102 new BasicRequestProducer(request1, new MultiBinEntityProducer(
1103 new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'},
1104 100000,
1105 ContentType.TEXT_PLAIN)),
1106 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1107 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1108 Assertions.assertNotNull(result1);
1109 final HttpResponse response1 = result1.getHead();
1110 Assertions.assertNotNull(response1);
1111 Assertions.assertEquals(HttpStatus.SC_UNAUTHORIZED, response1.getCode());
1112 Assertions.assertNotNull("You shall not pass", result1.getBody());
1113 }
1114
1115 @Test
1116 public void testSlowResponseConsumer() throws Exception {
1117 final Http1TestServer server = resources.server();
1118 final Http1TestClient client = resources.client();
1119
1120 server.register("/", () -> new MultiLineResponseHandler("0123456789abcd", 100));
1121 final InetSocketAddress serverEndpoint = server.start();
1122
1123 client.start(Http1Config.custom().setBufferSize(256).build());
1124
1125 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1126 "localhost", serverEndpoint.getPort(), TIMEOUT);
1127 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1128
1129 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/"));
1130 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1131 new BasicRequestProducer(request1, null),
1132 new BasicResponseConsumer<>(new AbstractClassicEntityConsumer<String>(16, Executors.newSingleThreadExecutor()) {
1133
1134 @Override
1135 protected String consumeData(
1136 final ContentType contentType, final InputStream inputStream) throws IOException {
1137 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1138
1139 final StringBuilder buffer = new StringBuilder();
1140 try {
1141 final byte[] tmp = new byte[16];
1142 int l;
1143 while ((l = inputStream.read(tmp)) != -1) {
1144 buffer.append(charset.decode(ByteBuffer.wrap(tmp, 0, l)));
1145 Thread.sleep(50);
1146 }
1147 } catch (final InterruptedException ex) {
1148 Thread.currentThread().interrupt();
1149 throw new InterruptedIOException(ex.getMessage());
1150 }
1151 return buffer.toString();
1152 }
1153 }),
1154 null);
1155
1156 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1157 Assertions.assertNotNull(result1);
1158 final HttpResponse response1 = result1.getHead();
1159 Assertions.assertNotNull(response1);
1160 Assertions.assertEquals(200, response1.getCode());
1161 final String s1 = result1.getBody();
1162 Assertions.assertNotNull(s1);
1163 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1164 while (t1.hasMoreTokens()) {
1165 Assertions.assertEquals("0123456789abcd", t1.nextToken());
1166 }
1167 }
1168
1169 @Test
1170 public void testSlowRequestProducer() throws Exception {
1171 final Http1TestServer server = resources.server();
1172 final Http1TestClient client = resources.client();
1173
1174 server.register("*", () -> new EchoHandler(2048));
1175 final InetSocketAddress serverEndpoint = server.start();
1176
1177 client.start();
1178 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1179 "localhost", serverEndpoint.getPort(), TIMEOUT);
1180 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1181
1182 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/echo"));
1183 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1184 new BasicRequestProducer(request1, new AbstractClassicEntityProducer(4096, ContentType.TEXT_PLAIN, Executors.newSingleThreadExecutor()) {
1185
1186 @Override
1187 protected void produceData(final ContentType contentType, final OutputStream outputStream) throws IOException {
1188 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1189 try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, charset))) {
1190 for (int i = 0; i < 500; i++) {
1191 if (i % 100 == 0) {
1192 writer.flush();
1193 Thread.sleep(500);
1194 }
1195 writer.write("0123456789abcdef\r\n");
1196 }
1197 } catch (final InterruptedException ex) {
1198 Thread.currentThread().interrupt();
1199 throw new InterruptedIOException(ex.getMessage());
1200 }
1201 }
1202
1203 }),
1204 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1205 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1206 Assertions.assertNotNull(result1);
1207 final HttpResponse response1 = result1.getHead();
1208 Assertions.assertNotNull(response1);
1209 Assertions.assertEquals(200, response1.getCode());
1210 final String s1 = result1.getBody();
1211 Assertions.assertNotNull(s1);
1212 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1213 while (t1.hasMoreTokens()) {
1214 Assertions.assertEquals("0123456789abcdef", t1.nextToken());
1215 }
1216 }
1217
1218 @Test
1219 public void testSlowResponseProducer() throws Exception {
1220 final Http1TestServer server = resources.server();
1221 final Http1TestClient client = resources.client();
1222
1223 server.register("*", () -> new AbstractClassicServerExchangeHandler(2048, Executors.newSingleThreadExecutor()) {
1224
1225 @Override
1226 protected void handle(
1227 final HttpRequest request,
1228 final InputStream requestStream,
1229 final HttpResponse response,
1230 final OutputStream responseStream,
1231 final HttpContext context) throws IOException, HttpException {
1232
1233 if (!"/hello".equals(request.getPath())) {
1234 response.setCode(HttpStatus.SC_NOT_FOUND);
1235 return;
1236 }
1237 if (!Method.POST.name().equalsIgnoreCase(request.getMethod())) {
1238 response.setCode(HttpStatus.SC_NOT_IMPLEMENTED);
1239 return;
1240 }
1241 if (requestStream == null) {
1242 return;
1243 }
1244 final Header h1 = request.getFirstHeader(HttpHeaders.CONTENT_TYPE);
1245 final ContentType contentType = h1 != null ? ContentType.parse(h1.getValue()) : null;
1246 final Charset charset = ContentType.getCharset(contentType, StandardCharsets.US_ASCII);
1247 response.setCode(HttpStatus.SC_OK);
1248 response.setHeader(h1);
1249 try (final BufferedReader reader = new BufferedReader(new InputStreamReader(requestStream, charset));
1250 final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(responseStream, charset))) {
1251 try {
1252 String l;
1253 int count = 0;
1254 while ((l = reader.readLine()) != null) {
1255 writer.write(l);
1256 writer.write("\r\n");
1257 count++;
1258 if (count % 500 == 0) {
1259 Thread.sleep(500);
1260 }
1261 }
1262 writer.flush();
1263 } catch (final InterruptedException ex) {
1264 Thread.currentThread().interrupt();
1265 throw new InterruptedIOException(ex.getMessage());
1266 }
1267 }
1268 }
1269 });
1270 final InetSocketAddress serverEndpoint = server.start();
1271
1272 client.start(Http1Config.custom().setBufferSize(256).build());
1273
1274 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1275 "localhost", serverEndpoint.getPort(), TIMEOUT);
1276 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1277
1278 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1279 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1280 new BasicRequestProducer(request1, new MultiLineEntityProducer("0123456789abcd", 2000)),
1281 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1282 final Message<HttpResponse, String> result1 = future1.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1283 Assertions.assertNotNull(result1);
1284 final HttpResponse response1 = result1.getHead();
1285 Assertions.assertNotNull(response1);
1286 Assertions.assertEquals(200, response1.getCode());
1287 final String s1 = result1.getBody();
1288 Assertions.assertNotNull(s1);
1289 final StringTokenizer t1 = new StringTokenizer(s1, "\r\n");
1290 while (t1.hasMoreTokens()) {
1291 Assertions.assertEquals("0123456789abcd", t1.nextToken());
1292 }
1293 }
1294
1295 @Test
1296 public void testPipelinedConnectionClose() throws Exception {
1297 final Http1TestServer server = resources.server();
1298 final Http1TestClient client = resources.client();
1299
1300 server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1301 final InetSocketAddress serverEndpoint = server.start();
1302
1303 client.start();
1304 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1305 "localhost", serverEndpoint.getPort(), TIMEOUT);
1306 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1307
1308 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1309 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1310 AsyncEntityProducers.create("Hi there")),
1311 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1312 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1313 request2.addHeader(HttpHeaders.CONNECTION, "close");
1314 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1315 new BasicRequestProducer(request2,
1316 AsyncEntityProducers.create("Hi there")),
1317 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1318 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1319 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1320 AsyncEntityProducers.create("Hi there")),
1321 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1322
1323 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1324 Assertions.assertNotNull(result1);
1325 final HttpResponse response1 = result1.getHead();
1326 final String entity1 = result1.getBody();
1327 Assertions.assertNotNull(response1);
1328 Assertions.assertEquals(200, response1.getCode());
1329 Assertions.assertEquals("Hi back", entity1);
1330
1331 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1332 Assertions.assertNotNull(result2);
1333 final HttpResponse response2 = result2.getHead();
1334 final String entity2 = result2.getBody();
1335 Assertions.assertNotNull(response2);
1336 Assertions.assertEquals(200, response2.getCode());
1337 Assertions.assertEquals("Hi back", entity2);
1338
1339 final Exception exception = Assertions.assertThrows(Exception.class, () ->
1340 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1341 assertThat(exception, CoreMatchers.anyOf(
1342 CoreMatchers.instanceOf(CancellationException.class),
1343 CoreMatchers.instanceOf(ExecutionException.class)));
1344
1345 final Future<Message<HttpResponse, String>> future4 = streamEndpoint.execute(
1346 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1347 AsyncEntityProducers.create("Hi there")),
1348 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1349 final Exception exception2 = Assertions.assertThrows(Exception.class, () ->
1350 future4.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1351 assertThat(exception2, CoreMatchers.anyOf(
1352 CoreMatchers.instanceOf(CancellationException.class),
1353 CoreMatchers.instanceOf(ExecutionException.class)));
1354 }
1355
1356 @Test
1357 public void testPipelinedInvalidRequest() throws Exception {
1358 final Http1TestServer server = resources.server();
1359 final Http1TestClient client = resources.client();
1360
1361 server.register("/hello*", () -> new SingleLineResponseHandler("Hi back"));
1362 final InetSocketAddress serverEndpoint = server.start();
1363
1364 client.start();
1365 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1366 "localhost", serverEndpoint.getPort(), TIMEOUT);
1367 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1368
1369 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1370 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-1"),
1371 AsyncEntityProducers.create("Hi there")),
1372 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1373 final HttpRequest request2 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello-2"));
1374 request2.addHeader(HttpHeaders.HOST, "blah:blah");
1375 final Future<Message<HttpResponse, String>> future2 = streamEndpoint.execute(
1376 new BasicRequestProducer(request2,
1377 AsyncEntityProducers.create("Hi there")),
1378 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1379 final Future<Message<HttpResponse, String>> future3 = streamEndpoint.execute(
1380 new BasicRequestProducer(Method.POST, createRequestURI(serverEndpoint, "/hello-3"),
1381 AsyncEntityProducers.create("Hi there")),
1382 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1383
1384 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1385 Assertions.assertNotNull(result1);
1386 final HttpResponse response1 = result1.getHead();
1387 final String entity1 = result1.getBody();
1388 Assertions.assertNotNull(response1);
1389 Assertions.assertEquals(200, response1.getCode());
1390 Assertions.assertEquals("Hi back", entity1);
1391
1392 final Message<HttpResponse, String> result2 = future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1393 Assertions.assertNotNull(result2);
1394 final HttpResponse response2 = result2.getHead();
1395 final String entity2 = result2.getBody();
1396 Assertions.assertNotNull(response2);
1397 Assertions.assertEquals(400, response2.getCode());
1398 Assertions.assertTrue(entity2.length() > 0);
1399
1400
1401 final Exception exception = Assertions.assertThrows(Exception.class, () ->
1402 future3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1403 assertThat(exception, CoreMatchers.anyOf(
1404 CoreMatchers.instanceOf(CancellationException.class),
1405 CoreMatchers.instanceOf(ExecutionException.class)));
1406 }
1407
1408 private static final byte[] GARBAGE = "garbage".getBytes(StandardCharsets.US_ASCII);
1409
1410 private static class BrokenChunkEncoder extends AbstractContentEncoder {
1411
1412 private final CharArrayBuffer lineBuffer;
1413 private boolean done;
1414
1415 BrokenChunkEncoder(
1416 final WritableByteChannel channel,
1417 final SessionOutputBuffer buffer,
1418 final BasicHttpTransportMetrics metrics) {
1419 super(channel, buffer, metrics);
1420 lineBuffer = new CharArrayBuffer(16);
1421 }
1422
1423 @Override
1424 public void complete(final List<? extends Header> trailers) throws IOException {
1425 super.complete(trailers);
1426 }
1427
1428 @Override
1429 public int write(final ByteBuffer src) throws IOException {
1430 final int chunk;
1431 if (!done) {
1432 lineBuffer.clear();
1433 lineBuffer.append(Integer.toHexString(GARBAGE.length * 10));
1434 buffer().writeLine(lineBuffer);
1435 buffer().write(ByteBuffer.wrap(GARBAGE));
1436 done = true;
1437 chunk = GARBAGE.length;
1438 } else {
1439 chunk = 0;
1440 }
1441 final long bytesWritten = buffer().flush(channel());
1442 if (bytesWritten > 0) {
1443 metrics().incrementBytesTransferred(bytesWritten);
1444 }
1445 if (!buffer().hasData()) {
1446 channel().close();
1447 }
1448 return chunk;
1449 }
1450
1451 }
1452
1453 @Test
1454 public void testTruncatedChunk() throws Exception {
1455 final Http1TestServer server = resources.server();
1456 final Http1TestClient client = resources.client();
1457
1458 final InetSocketAddress serverEndpoint = server.start(new InternalServerHttp1EventHandlerFactory(
1459 HttpProcessors.server(),
1460 (request, context) -> new MessageExchangeHandler<String>(new StringAsyncEntityConsumer()) {
1461
1462 @Override
1463 protected void handle(
1464 final Message<HttpRequest, String> request,
1465 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1466 final HttpContext context) throws IOException, HttpException {
1467 responseTrigger.submitResponse(
1468 new BasicResponseProducer(new StringAsyncEntityProducer("useful stuff")), context);
1469 }
1470
1471 },
1472 Http1Config.DEFAULT,
1473 CharCodingConfig.DEFAULT,
1474 DefaultConnectionReuseStrategy.INSTANCE,
1475 scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null, null, null) {
1476
1477 @Override
1478 protected ServerHttp1StreamDuplexer createServerHttp1StreamDuplexer(
1479 final ProtocolIOSession ioSession,
1480 final HttpProcessor httpProcessor,
1481 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
1482 final Http1Config http1Config,
1483 final CharCodingConfig connectionConfig,
1484 final ConnectionReuseStrategy connectionReuseStrategy,
1485 final NHttpMessageParser<HttpRequest> incomingMessageParser,
1486 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
1487 final ContentLengthStrategy incomingContentStrategy,
1488 final ContentLengthStrategy outgoingContentStrategy,
1489 final Http1StreamListener streamListener) {
1490 return new ServerHttp1StreamDuplexer(ioSession, httpProcessor, exchangeHandlerFactory,
1491 scheme.id,
1492 http1Config, connectionConfig, connectionReuseStrategy,
1493 incomingMessageParser, outgoingMessageWriter,
1494 incomingContentStrategy, outgoingContentStrategy,
1495 streamListener) {
1496
1497 @Override
1498 protected ContentEncoder createContentEncoder(
1499 final long len,
1500 final WritableByteChannel channel,
1501 final SessionOutputBuffer buffer,
1502 final BasicHttpTransportMetrics metrics) throws HttpException {
1503 if (len == ContentLengthStrategy.CHUNKED) {
1504 return new BrokenChunkEncoder(channel, buffer, metrics);
1505 } else {
1506 return super.createContentEncoder(len, channel, buffer, metrics);
1507 }
1508 }
1509
1510 };
1511 }
1512
1513 });
1514
1515 client.start();
1516 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1517 "localhost", serverEndpoint.getPort(), TIMEOUT);
1518 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1519
1520 final AsyncRequestProducer requestProducer = new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1521 final StringAsyncEntityConsumer entityConsumer = new StringAsyncEntityConsumer() {
1522
1523 @Override
1524 public void releaseResources() {
1525
1526 }
1527
1528 };
1529 final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(entityConsumer);
1530 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(requestProducer, responseConsumer, null);
1531 final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () ->
1532 future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
1533 final Throwable cause = exception.getCause();
1534 Assertions.assertTrue(cause instanceof MalformedChunkCodingException);
1535 Assertions.assertEquals("garbage", entityConsumer.generateContent());
1536 }
1537
1538 @Test
1539 public void testExceptionInHandler() throws Exception {
1540 final Http1TestServer server = resources.server();
1541 final Http1TestClient client = resources.client();
1542
1543 server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1544
1545 @Override
1546 protected void handle(
1547 final Message<HttpRequest, String> request,
1548 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1549 final HttpContext context) throws IOException, HttpException {
1550 throw new HttpException("Boom");
1551 }
1552 });
1553 final InetSocketAddress serverEndpoint = server.start();
1554
1555 client.start();
1556 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1557 "localhost", serverEndpoint.getPort(), TIMEOUT);
1558 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1559
1560 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1561 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1562 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1563 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1564 Assertions.assertNotNull(result);
1565 final HttpResponse response1 = result.getHead();
1566 final String entity1 = result.getBody();
1567 Assertions.assertNotNull(response1);
1568 Assertions.assertEquals(500, response1.getCode());
1569 Assertions.assertEquals("Boom", entity1);
1570 }
1571
1572 @Test
1573 public void testNoServiceHandler() throws Exception {
1574 final Http1TestServer server = resources.server();
1575 final Http1TestClient client = resources.client();
1576
1577 final InetSocketAddress serverEndpoint = server.start();
1578
1579 client.start();
1580 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1581 "localhost", serverEndpoint.getPort(), TIMEOUT);
1582 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1583
1584 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1585 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1586 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1587 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1588 Assertions.assertNotNull(result);
1589 final HttpResponse response1 = result.getHead();
1590 final String entity1 = result.getBody();
1591 Assertions.assertNotNull(response1);
1592 Assertions.assertEquals(404, response1.getCode());
1593 Assertions.assertEquals("Resource not found", entity1);
1594 }
1595
1596 @Test
1597 public void testResponseNoContent() throws Exception {
1598 final Http1TestServer server = resources.server();
1599 final Http1TestClient client = resources.client();
1600
1601 server.register("/hello", () -> new SingleLineResponseHandler("Hi there") {
1602
1603 @Override
1604 protected void handle(
1605 final Message<HttpRequest, String> request,
1606 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1607 final HttpContext context) throws IOException, HttpException {
1608 final HttpResponse response = new BasicHttpResponse(HttpStatus.SC_NO_CONTENT);
1609 responseTrigger.submitResponse(new BasicResponseProducer(response), context);
1610 }
1611 });
1612 final InetSocketAddress serverEndpoint = server.start();
1613
1614 client.start();
1615 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1616 "localhost", serverEndpoint.getPort(), TIMEOUT);
1617 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1618
1619 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1620 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/hello")),
1621 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1622 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1623 Assertions.assertNotNull(result);
1624 final HttpResponse response1 = result.getHead();
1625 Assertions.assertNotNull(response1);
1626 Assertions.assertEquals(204, response1.getCode());
1627 Assertions.assertNull(result.getBody());
1628 }
1629
1630 @Test
1631 public void testMessageWithTrailers() throws Exception {
1632 final Http1TestServer server = resources.server();
1633 final Http1TestClient client = resources.client();
1634
1635 server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1636
1637 @Override
1638 protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1639 final HttpRequest request,
1640 final EntityDetails entityDetails,
1641 final HttpContext context) throws HttpException {
1642 return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1643 }
1644
1645 @Override
1646 protected void handle(
1647 final Message<HttpRequest, String> requestMessage,
1648 final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1649 final HttpContext context) throws HttpException, IOException {
1650 responseTrigger.submitResponse(new BasicResponseProducer(
1651 HttpStatus.SC_OK,
1652 new DigestingEntityProducer("MD5",
1653 new StringAsyncEntityProducer("Hello back with some trailers"))), context);
1654 }
1655 });
1656 final InetSocketAddress serverEndpoint = server.start();
1657
1658 client.start();
1659
1660 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1661 "localhost", serverEndpoint.getPort(), TIMEOUT);
1662 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1663
1664 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1665 final DigestingEntityConsumer<String> entityConsumer = new DigestingEntityConsumer<>("MD5", new StringAsyncEntityConsumer());
1666 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1667 new BasicRequestProducer(request1, null),
1668 new BasicResponseConsumer<>(entityConsumer), null);
1669 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1670 Assertions.assertNotNull(result1);
1671 final HttpResponse response1 = result1.getHead();
1672 Assertions.assertNotNull(response1);
1673 Assertions.assertEquals(200, response1.getCode());
1674 Assertions.assertEquals("Hello back with some trailers", result1.getBody());
1675
1676 final List<Header> trailers = entityConsumer.getTrailers();
1677 Assertions.assertNotNull(trailers);
1678 Assertions.assertEquals(2, trailers.size());
1679 final Map<String, String> map = new HashMap<>();
1680 for (final Header header: trailers) {
1681 map.put(TextUtils.toLowerCase(header.getName()), header.getValue());
1682 }
1683 final String digest = TextUtils.toHexString(entityConsumer.getDigest());
1684 Assertions.assertEquals("MD5", map.get("digest-algo"));
1685 Assertions.assertEquals(digest, map.get("digest"));
1686 }
1687
1688 @Test
1689 public void testProtocolException() throws Exception {
1690 final Http1TestServer server = resources.server();
1691 final Http1TestClient client = resources.client();
1692
1693 server.register("/boom", () -> new AsyncServerExchangeHandler() {
1694
1695 private final StringAsyncEntityProducer entityProducer = new StringAsyncEntityProducer("Everyting is OK");
1696
1697 @Override
1698 public void releaseResources() {
1699 entityProducer.releaseResources();
1700 }
1701
1702 @Override
1703 public void handleRequest(
1704 final HttpRequest request,
1705 final EntityDetails entityDetails,
1706 final ResponseChannel responseChannel,
1707 final HttpContext context) throws HttpException, IOException {
1708 final String requestUri = request.getRequestUri();
1709 if (requestUri.endsWith("boom")) {
1710 throw new ProtocolException("Boom!!!");
1711 }
1712 responseChannel.sendResponse(new BasicHttpResponse(200), entityProducer, context);
1713 }
1714
1715 @Override
1716 public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
1717 capacityChannel.update(Integer.MAX_VALUE);
1718 }
1719
1720 @Override
1721 public void consume(final ByteBuffer src) throws IOException {
1722 }
1723
1724 @Override
1725 public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
1726
1727 }
1728
1729 @Override
1730 public int available() {
1731 return entityProducer.available();
1732 }
1733
1734 @Override
1735 public void produce(final DataStreamChannel channel) throws IOException {
1736 entityProducer.produce(channel);
1737 }
1738
1739 @Override
1740 public void failed(final Exception cause) {
1741 releaseResources();
1742 }
1743
1744 });
1745
1746 final InetSocketAddress serverEndpoint = server.start();
1747
1748 client.start();
1749 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1750 "localhost", serverEndpoint.getPort(), TIMEOUT);
1751 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1752 final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
1753 new BasicRequestProducer(Method.GET, createRequestURI(serverEndpoint, "/boom")),
1754 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1755 final Message<HttpResponse, String> result = future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1756 Assertions.assertNotNull(result);
1757 final HttpResponse response1 = result.getHead();
1758 final String entity1 = result.getBody();
1759 Assertions.assertNotNull(response1);
1760 Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, response1.getCode());
1761 Assertions.assertEquals("Boom!!!", entity1);
1762 }
1763
1764 @Test
1765 public void testHeaderTooLarge() throws Exception {
1766 final Http1TestServer server = resources.server();
1767 final Http1TestClient client = resources.client();
1768
1769 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1770 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1771 .setMaxLineLength(100)
1772 .build());
1773 client.start();
1774
1775 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1776 "localhost", serverEndpoint.getPort(), TIMEOUT);
1777 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1778
1779 final HttpRequest request1 = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
1780 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1781 "1234567890123456789012345678901234567890");
1782 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1783 new BasicRequestProducer(request1, null),
1784 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1785 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1786 Assertions.assertNotNull(result1);
1787 final HttpResponse response1 = result1.getHead();
1788 Assertions.assertNotNull(response1);
1789 Assertions.assertEquals(431, response1.getCode());
1790 Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1791 }
1792
1793 @Test
1794 public void testHeaderTooLargePost() throws Exception {
1795 final Http1TestServer server = resources.server();
1796 final Http1TestClient client = resources.client();
1797
1798 server.register("/hello", () -> new SingleLineResponseHandler("Hi there"));
1799 final InetSocketAddress serverEndpoint = server.start(null, Http1Config.custom()
1800 .setMaxLineLength(100)
1801 .build());
1802 client.start(
1803 new DefaultHttpProcessor(RequestContent.INSTANCE, RequestTargetHost.INSTANCE, RequestConnControl.INSTANCE), null);
1804
1805 final Future<ClientSessionEndpoint> connectFuture = client.connect(
1806 "localhost", serverEndpoint.getPort(), TIMEOUT);
1807 final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1808
1809 final HttpRequest request1 = new BasicHttpRequest(Method.POST, createRequestURI(serverEndpoint, "/hello"));
1810 request1.setHeader("big-f-header", "1234567890123456789012345678901234567890123456789012345678901234567890" +
1811 "1234567890123456789012345678901234567890");
1812
1813 final byte[] b = new byte[2048];
1814 for (int i = 0; i < b.length; i++) {
1815 b[i] = (byte) ('a' + i % 10);
1816 }
1817
1818 final Future<Message<HttpResponse, String>> future1 = streamEndpoint.execute(
1819 new BasicRequestProducer(request1, AsyncEntityProducers.create(b, ContentType.TEXT_PLAIN)),
1820 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
1821 final Message<HttpResponse, String> result1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
1822 Assertions.assertNotNull(result1);
1823 final HttpResponse response1 = result1.getHead();
1824 Assertions.assertNotNull(response1);
1825 Assertions.assertEquals(431, response1.getCode());
1826 Assertions.assertEquals("Maximum line length limit exceeded", result1.getBody());
1827 }
1828
1829 }