1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.io.IOException;
31 import java.net.InetSocketAddress;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.LinkedList;
35 import java.util.Queue;
36 import java.util.concurrent.Future;
37
38 import org.apache.hc.core5.function.Supplier;
39 import org.apache.hc.core5.http.ContentType;
40 import org.apache.hc.core5.http.EntityDetails;
41 import org.apache.hc.core5.http.HeaderElements;
42 import org.apache.hc.core5.http.HttpException;
43 import org.apache.hc.core5.http.HttpHeaders;
44 import org.apache.hc.core5.http.HttpHost;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.Message;
49 import org.apache.hc.core5.http.Method;
50 import org.apache.hc.core5.http.URIScheme;
51 import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
52 import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
53 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
54 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
55 import org.apache.hc.core5.http.impl.bootstrap.StandardFilter;
56 import org.apache.hc.core5.http.message.BasicHttpRequest;
57 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
58 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
59 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
60 import org.apache.hc.core5.http.nio.AsyncFilterChain;
61 import org.apache.hc.core5.http.nio.AsyncFilterHandler;
62 import org.apache.hc.core5.http.nio.AsyncPushProducer;
63 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
64 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
65 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
66 import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy;
67 import org.apache.hc.core5.http.nio.ssl.BasicServerTlsStrategy;
68 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
69 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
70 import org.apache.hc.core5.http.protocol.HttpContext;
71 import org.apache.hc.core5.http.protocol.UriPatternMatcher;
72 import org.apache.hc.core5.io.CloseMode;
73 import org.apache.hc.core5.reactor.IOReactorConfig;
74 import org.apache.hc.core5.reactor.ListenerEndpoint;
75 import org.apache.hc.core5.testing.SSLTestContexts;
76 import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
77 import org.apache.hc.core5.util.Timeout;
78 import org.hamcrest.CoreMatchers;
79 import org.hamcrest.MatcherAssert;
80 import org.junit.Rule;
81 import org.junit.Test;
82 import org.junit.rules.ExternalResource;
83 import org.junit.runner.RunWith;
84 import org.junit.runners.Parameterized;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
87
88 @RunWith(Parameterized.class)
89 public class Http1ServerAndRequesterTest {
90
91 private final Logger log = LoggerFactory.getLogger(getClass());
92
93 @Parameterized.Parameters(name = "{0}")
94 public static Collection<Object[]> protocols() {
95 return Arrays.asList(new Object[][]{
96 { URIScheme.HTTP },
97 { URIScheme.HTTPS }
98 });
99 }
100 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
101
102 private final URIScheme scheme;
103
104 public Http1ServerAndRequesterTest(final URIScheme scheme) {
105 this.scheme = scheme;
106 }
107
108 private HttpAsyncServer server;
109
110 @Rule
111 public ExternalResource serverResource = new ExternalResource() {
112
113 @Override
114 protected void before() throws Throwable {
115 log.debug("Starting up test server");
116 server = AsyncServerBootstrap.bootstrap()
117 .setLookupRegistry(new UriPatternMatcher<Supplier<AsyncServerExchangeHandler>>())
118 .setIOReactorConfig(
119 IOReactorConfig.custom()
120 .setSoTimeout(TIMEOUT)
121 .build())
122 .register("*", new Supplier<AsyncServerExchangeHandler>() {
123
124 @Override
125 public AsyncServerExchangeHandler get() {
126 return new EchoHandler(2048);
127 }
128
129 })
130 .addFilterBefore(StandardFilter.MAIN_HANDLER.name(), "no-keepalive", new AsyncFilterHandler() {
131
132 @Override
133 public AsyncDataConsumer handle(
134 final HttpRequest request,
135 final EntityDetails entityDetails,
136 final HttpContext context,
137 final AsyncFilterChain.ResponseTrigger responseTrigger,
138 final AsyncFilterChain chain) throws HttpException, IOException {
139 return chain.proceed(request, entityDetails, context, new AsyncFilterChain.ResponseTrigger() {
140
141 @Override
142 public void sendInformation(
143 final HttpResponse response) throws HttpException, IOException {
144 responseTrigger.sendInformation(response);
145 }
146
147 @Override
148 public void submitResponse(
149 final HttpResponse response,
150 final AsyncEntityProducer entityProducer) throws HttpException, IOException {
151 if (request.getPath().startsWith("/no-keep-alive")) {
152 response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
153 }
154 responseTrigger.submitResponse(response, entityProducer);
155 }
156
157 @Override
158 public void pushPromise(
159 final HttpRequest promise,
160 final AsyncPushProducer responseProducer) throws HttpException, IOException {
161 responseTrigger.pushPromise(promise, responseProducer);
162 }
163
164 });
165 }
166 })
167 .setTlsStrategy(scheme == URIScheme.HTTPS ?
168 new BasicServerTlsStrategy(SSLTestContexts.createServerSSLContext()) : null)
169 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
170 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
171 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
172 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
173 .create();
174 }
175
176 @Override
177 protected void after() {
178 log.debug("Shutting down test server");
179 if (server != null) {
180 server.close(CloseMode.GRACEFUL);
181 }
182 }
183
184 };
185
186 private HttpAsyncRequester requester;
187
188 @Rule
189 public ExternalResource clientResource = new ExternalResource() {
190
191 @Override
192 protected void before() throws Throwable {
193 log.debug("Starting up test client");
194 requester = AsyncRequesterBootstrap.bootstrap()
195 .setIOReactorConfig(IOReactorConfig.custom()
196 .setSoTimeout(TIMEOUT)
197 .build())
198 .setTlsStrategy(new BasicClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
199 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
200 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
201 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
202 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
203 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
204 .create();
205 }
206
207 @Override
208 protected void after() {
209 log.debug("Shutting down test client");
210 if (requester != null) {
211 requester.close(CloseMode.GRACEFUL);
212 }
213 }
214
215 };
216
217 @Test
218 public void testSequentialRequests() throws Exception {
219 server.start();
220 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
221 final ListenerEndpoint listener = future.get();
222 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
223 requester.start();
224
225 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
226 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
227 new BasicRequestProducer(Method.POST, target, "/stuff",
228 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
229 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
230 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
231 MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
232 final HttpResponse response1 = message1.getHead();
233 MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
234 final String body1 = message1.getBody();
235 MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
236
237 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
238 new BasicRequestProducer(Method.POST, target, "/other-stuff",
239 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
240 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
241 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
242 MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
243 final HttpResponse response2 = message2.getHead();
244 MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
245 final String body2 = message2.getBody();
246 MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
247
248 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
249 new BasicRequestProducer(Method.POST, target, "/more-stuff",
250 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
251 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
252 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
253 MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
254 final HttpResponse response3 = message3.getHead();
255 MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
256 final String body3 = message3.getBody();
257 MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
258 }
259
260 @Test
261 public void testSequentialRequestsNonPersistentConnection() throws Exception {
262 server.start();
263 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
264 final ListenerEndpoint listener = future.get();
265 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
266 requester.start();
267
268 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
269 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
270 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/stuff",
271 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
272 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
273 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
274 MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
275 final HttpResponse response1 = message1.getHead();
276 MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
277 final String body1 = message1.getBody();
278 MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
279
280 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
281 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/other-stuff",
282 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
283 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
284 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
285 MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
286 final HttpResponse response2 = message2.getHead();
287 MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
288 final String body2 = message2.getBody();
289 MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
290
291 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
292 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/more-stuff",
293 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
294 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
295 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
296 MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
297 final HttpResponse response3 = message3.getHead();
298 MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
299 final String body3 = message3.getBody();
300 MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
301 }
302
303 @Test
304 public void testSequentialRequestsSameEndpoint() throws Exception {
305 server.start();
306 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
307 final ListenerEndpoint listener = future.get();
308 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
309 requester.start();
310
311 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
312 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
313 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
314 try {
315
316 final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
317 new BasicRequestProducer(Method.POST, target, "/stuff",
318 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
319 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
320 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
321 MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
322 final HttpResponse response1 = message1.getHead();
323 MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
324 final String body1 = message1.getBody();
325 MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
326
327 final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
328 new BasicRequestProducer(Method.POST, target, "/other-stuff",
329 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
330 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
331 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
332 MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
333 final HttpResponse response2 = message2.getHead();
334 MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
335 final String body2 = message2.getBody();
336 MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
337
338 final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
339 new BasicRequestProducer(Method.POST, target, "/more-stuff",
340 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
341 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
342 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
343 MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
344 final HttpResponse response3 = message3.getHead();
345 MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
346 final String body3 = message3.getBody();
347 MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
348
349 } finally {
350 endpoint.releaseAndReuse();
351 }
352 }
353
354 @Test
355 public void testPipelinedRequests() throws Exception {
356 server.start();
357 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
358 final ListenerEndpoint listener = future.get();
359 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
360 requester.start();
361
362 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
363 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
364 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
365 try {
366
367 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
368
369 queue.add(endpoint.execute(
370 new BasicRequestProducer(Method.POST, target, "/stuff",
371 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
372 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
373 queue.add(endpoint.execute(
374 new BasicRequestProducer(Method.POST, target, "/other-stuff",
375 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
376 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
377 queue.add(endpoint.execute(
378 new BasicRequestProducer(Method.POST, target, "/more-stuff",
379 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
380 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
381
382 while (!queue.isEmpty()) {
383 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
384 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
385 MatcherAssert.assertThat(message, CoreMatchers.notNullValue());
386 final HttpResponse response = message.getHead();
387 MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
388 final String body = message.getBody();
389 MatcherAssert.assertThat(body, CoreMatchers.containsString("stuff"));
390 }
391
392 } finally {
393 endpoint.releaseAndReuse();
394 }
395 }
396
397 @Test
398 public void testNonPersistentHeads() throws Exception {
399 server.start();
400 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
401 final ListenerEndpoint listener = future.get();
402 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
403 requester.start();
404
405 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
406 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
407
408 for (int i = 0; i < 20; i++) {
409 final HttpRequest head = new BasicHttpRequest(Method.HEAD, target, "/no-keep-alive/stuff?p=" + i);
410 queue.add(requester.execute(
411 new BasicRequestProducer(head, null),
412 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
413 }
414
415 while (!queue.isEmpty()) {
416 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
417 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
418 MatcherAssert.assertThat(message, CoreMatchers.notNullValue());
419 final HttpResponse response = message.getHead();
420 MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
421 MatcherAssert.assertThat(message.getBody(), CoreMatchers.nullValue());
422 }
423 }
424
425 }