View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.Arrays;
33  import java.util.Collection;
34  import java.util.LinkedList;
35  import java.util.Queue;
36  import java.util.concurrent.Future;
37  
38  import org.apache.hc.core5.function.Supplier;
39  import org.apache.hc.core5.http.ContentType;
40  import org.apache.hc.core5.http.EntityDetails;
41  import org.apache.hc.core5.http.HeaderElements;
42  import org.apache.hc.core5.http.HttpException;
43  import org.apache.hc.core5.http.HttpHeaders;
44  import org.apache.hc.core5.http.HttpHost;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.Message;
49  import org.apache.hc.core5.http.Method;
50  import org.apache.hc.core5.http.URIScheme;
51  import org.apache.hc.core5.http.impl.bootstrap.AsyncRequesterBootstrap;
52  import org.apache.hc.core5.http.impl.bootstrap.AsyncServerBootstrap;
53  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
54  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
55  import org.apache.hc.core5.http.impl.bootstrap.StandardFilter;
56  import org.apache.hc.core5.http.message.BasicHttpRequest;
57  import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
58  import org.apache.hc.core5.http.nio.AsyncDataConsumer;
59  import org.apache.hc.core5.http.nio.AsyncEntityProducer;
60  import org.apache.hc.core5.http.nio.AsyncFilterChain;
61  import org.apache.hc.core5.http.nio.AsyncFilterHandler;
62  import org.apache.hc.core5.http.nio.AsyncPushProducer;
63  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
64  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
65  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
66  import org.apache.hc.core5.http.nio.ssl.BasicClientTlsStrategy;
67  import org.apache.hc.core5.http.nio.ssl.BasicServerTlsStrategy;
68  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
69  import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
70  import org.apache.hc.core5.http.protocol.HttpContext;
71  import org.apache.hc.core5.http.protocol.UriPatternMatcher;
72  import org.apache.hc.core5.io.CloseMode;
73  import org.apache.hc.core5.reactor.IOReactorConfig;
74  import org.apache.hc.core5.reactor.ListenerEndpoint;
75  import org.apache.hc.core5.testing.SSLTestContexts;
76  import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
77  import org.apache.hc.core5.util.Timeout;
78  import org.hamcrest.CoreMatchers;
79  import org.hamcrest.MatcherAssert;
80  import org.junit.Rule;
81  import org.junit.Test;
82  import org.junit.rules.ExternalResource;
83  import org.junit.runner.RunWith;
84  import org.junit.runners.Parameterized;
85  import org.slf4j.Logger;
86  import org.slf4j.LoggerFactory;
87  
88  @RunWith(Parameterized.class)
89  public class Http1ServerAndRequesterTest {
90  
91      private final Logger log = LoggerFactory.getLogger(getClass());
92  
93      @Parameterized.Parameters(name = "{0}")
94      public static Collection<Object[]> protocols() {
95          return Arrays.asList(new Object[][]{
96                  { URIScheme.HTTP },
97                  { URIScheme.HTTPS }
98          });
99      }
100     private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
101 
102     private final URIScheme scheme;
103 
104     public Http1ServerAndRequesterTest(final URIScheme scheme) {
105         this.scheme = scheme;
106     }
107 
108     private HttpAsyncServer server;
109 
110     @Rule
111     public ExternalResource serverResource = new ExternalResource() {
112 
113         @Override
114         protected void before() throws Throwable {
115             log.debug("Starting up test server");
116             server = AsyncServerBootstrap.bootstrap()
117                     .setLookupRegistry(new UriPatternMatcher<Supplier<AsyncServerExchangeHandler>>())
118                     .setIOReactorConfig(
119                             IOReactorConfig.custom()
120                                     .setSoTimeout(TIMEOUT)
121                                     .build())
122                     .register("*", new Supplier<AsyncServerExchangeHandler>() {
123 
124                         @Override
125                         public AsyncServerExchangeHandler get() {
126                             return new EchoHandler(2048);
127                         }
128 
129                     })
130                     .addFilterBefore(StandardFilter.MAIN_HANDLER.name(), "no-keepalive", new AsyncFilterHandler() {
131 
132                         @Override
133                         public AsyncDataConsumer handle(
134                                 final HttpRequest request,
135                                 final EntityDetails entityDetails,
136                                 final HttpContext context,
137                                 final AsyncFilterChain.ResponseTrigger responseTrigger,
138                                 final AsyncFilterChain chain) throws HttpException, IOException {
139                             return chain.proceed(request, entityDetails, context, new AsyncFilterChain.ResponseTrigger() {
140 
141                                 @Override
142                                 public void sendInformation(
143                                         final HttpResponse response) throws HttpException, IOException {
144                                     responseTrigger.sendInformation(response);
145                                 }
146 
147                                 @Override
148                                 public void submitResponse(
149                                         final HttpResponse response,
150                                         final AsyncEntityProducer entityProducer) throws HttpException, IOException {
151                                     if (request.getPath().startsWith("/no-keep-alive")) {
152                                         response.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
153                                     }
154                                     responseTrigger.submitResponse(response, entityProducer);
155                                 }
156 
157                                 @Override
158                                 public void pushPromise(
159                                         final HttpRequest promise,
160                                         final AsyncPushProducer responseProducer) throws HttpException, IOException {
161                                     responseTrigger.pushPromise(promise, responseProducer);
162                                 }
163 
164                             });
165                         }
166                     })
167                     .setTlsStrategy(scheme == URIScheme.HTTPS  ?
168                             new BasicServerTlsStrategy(SSLTestContexts.createServerSSLContext()) : null)
169                     .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
170                     .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
171                     .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
172                     .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
173                     .create();
174         }
175 
176         @Override
177         protected void after() {
178             log.debug("Shutting down test server");
179             if (server != null) {
180                 server.close(CloseMode.GRACEFUL);
181             }
182         }
183 
184     };
185 
186     private HttpAsyncRequester requester;
187 
188     @Rule
189     public ExternalResource clientResource = new ExternalResource() {
190 
191         @Override
192         protected void before() throws Throwable {
193             log.debug("Starting up test client");
194             requester = AsyncRequesterBootstrap.bootstrap()
195                     .setIOReactorConfig(IOReactorConfig.custom()
196                             .setSoTimeout(TIMEOUT)
197                             .build())
198                     .setTlsStrategy(new BasicClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
199                     .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
200                     .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
201                     .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
202                     .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
203                     .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
204                     .create();
205         }
206 
207         @Override
208         protected void after() {
209             log.debug("Shutting down test client");
210             if (requester != null) {
211                 requester.close(CloseMode.GRACEFUL);
212             }
213         }
214 
215     };
216 
217     @Test
218     public void testSequentialRequests() throws Exception {
219         server.start();
220         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
221         final ListenerEndpoint listener = future.get();
222         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
223         requester.start();
224 
225         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
226         final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
227                 new BasicRequestProducer(Method.POST, target, "/stuff",
228                         new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
229                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
230         final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
231         MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
232         final HttpResponse response1 = message1.getHead();
233         MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
234         final String body1 = message1.getBody();
235         MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
236 
237         final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
238                 new BasicRequestProducer(Method.POST, target, "/other-stuff",
239                         new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
240                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
241         final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
242         MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
243         final HttpResponse response2 = message2.getHead();
244         MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
245         final String body2 = message2.getBody();
246         MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
247 
248         final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
249                 new BasicRequestProducer(Method.POST, target, "/more-stuff",
250                         new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
251                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
252         final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
253         MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
254         final HttpResponse response3 = message3.getHead();
255         MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
256         final String body3 = message3.getBody();
257         MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
258     }
259 
260     @Test
261     public void testSequentialRequestsNonPersistentConnection() throws Exception {
262         server.start();
263         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
264         final ListenerEndpoint listener = future.get();
265         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
266         requester.start();
267 
268         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
269         final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
270                 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/stuff",
271                         new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
272                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
273         final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
274         MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
275         final HttpResponse response1 = message1.getHead();
276         MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
277         final String body1 = message1.getBody();
278         MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
279 
280         final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
281                 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/other-stuff",
282                         new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
283                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
284         final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
285         MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
286         final HttpResponse response2 = message2.getHead();
287         MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
288         final String body2 = message2.getBody();
289         MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
290 
291         final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
292                 new BasicRequestProducer(Method.POST, target, "/no-keep-alive/more-stuff",
293                         new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
294                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
295         final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
296         MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
297         final HttpResponse response3 = message3.getHead();
298         MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
299         final String body3 = message3.getBody();
300         MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
301     }
302 
303     @Test
304     public void testSequentialRequestsSameEndpoint() throws Exception {
305         server.start();
306         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
307         final ListenerEndpoint listener = future.get();
308         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
309         requester.start();
310 
311         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
312         final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
313         final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
314         try {
315 
316             final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
317                     new BasicRequestProducer(Method.POST, target, "/stuff",
318                             new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
319                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
320             final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
321             MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
322             final HttpResponse response1 = message1.getHead();
323             MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
324             final String body1 = message1.getBody();
325             MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
326 
327             final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
328                     new BasicRequestProducer(Method.POST, target, "/other-stuff",
329                             new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
330                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
331             final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
332             MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
333             final HttpResponse response2 = message2.getHead();
334             MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
335             final String body2 = message2.getBody();
336             MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
337 
338             final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
339                     new BasicRequestProducer(Method.POST, target, "/more-stuff",
340                             new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
341                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
342             final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
343             MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
344             final HttpResponse response3 = message3.getHead();
345             MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
346             final String body3 = message3.getBody();
347             MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
348 
349         } finally {
350             endpoint.releaseAndReuse();
351         }
352     }
353 
354     @Test
355     public void testPipelinedRequests() throws Exception {
356         server.start();
357         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
358         final ListenerEndpoint listener = future.get();
359         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
360         requester.start();
361 
362         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
363         final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
364         final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
365         try {
366 
367             final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
368 
369             queue.add(endpoint.execute(
370                     new BasicRequestProducer(Method.POST, target, "/stuff",
371                             new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
372                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
373             queue.add(endpoint.execute(
374                     new BasicRequestProducer(Method.POST, target, "/other-stuff",
375                             new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
376                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
377             queue.add(endpoint.execute(
378                     new BasicRequestProducer(Method.POST, target, "/more-stuff",
379                             new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
380                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
381 
382             while (!queue.isEmpty()) {
383                 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
384                 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
385                 MatcherAssert.assertThat(message, CoreMatchers.notNullValue());
386                 final HttpResponse response = message.getHead();
387                 MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
388                 final String body = message.getBody();
389                 MatcherAssert.assertThat(body, CoreMatchers.containsString("stuff"));
390             }
391 
392         } finally {
393             endpoint.releaseAndReuse();
394         }
395     }
396 
397     @Test
398     public void testNonPersistentHeads() throws Exception {
399         server.start();
400         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
401         final ListenerEndpoint listener = future.get();
402         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
403         requester.start();
404 
405         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
406         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
407 
408         for (int i = 0; i < 20; i++) {
409             final HttpRequest head = new BasicHttpRequest(Method.HEAD, target, "/no-keep-alive/stuff?p=" + i);
410             queue.add(requester.execute(
411                     new BasicRequestProducer(head, null),
412                     new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
413         }
414 
415         while (!queue.isEmpty()) {
416             final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
417             final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
418             MatcherAssert.assertThat(message, CoreMatchers.notNullValue());
419             final HttpResponse response = message.getHead();
420             MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
421             MatcherAssert.assertThat(message.getBody(), CoreMatchers.nullValue());
422         }
423     }
424 
425 }