View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.testing.nio;
29  
30  import java.net.InetSocketAddress;
31  import java.util.Arrays;
32  import java.util.Collection;
33  import java.util.LinkedList;
34  import java.util.Queue;
35  import java.util.Random;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.Future;
38  
39  import org.apache.hc.core5.concurrent.Cancellable;
40  import org.apache.hc.core5.concurrent.FutureCallback;
41  import org.apache.hc.core5.function.Supplier;
42  import org.apache.hc.core5.http.ContentType;
43  import org.apache.hc.core5.http.HttpHost;
44  import org.apache.hc.core5.http.HttpResponse;
45  import org.apache.hc.core5.http.HttpStatus;
46  import org.apache.hc.core5.http.Message;
47  import org.apache.hc.core5.http.Method;
48  import org.apache.hc.core5.http.URIScheme;
49  import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
50  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
52  import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
53  import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
54  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
55  import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
56  import org.apache.hc.core5.http.protocol.HttpCoreContext;
57  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
58  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap;
59  import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
60  import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
61  import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
62  import org.apache.hc.core5.io.CloseMode;
63  import org.apache.hc.core5.reactor.IOReactorConfig;
64  import org.apache.hc.core5.reactor.ListenerEndpoint;
65  import org.apache.hc.core5.testing.SSLTestContexts;
66  import org.apache.hc.core5.util.ReflectionUtils;
67  import org.apache.hc.core5.util.TimeValue;
68  import org.apache.hc.core5.util.Timeout;
69  import org.hamcrest.CoreMatchers;
70  import org.hamcrest.MatcherAssert;
71  import org.junit.Assume;
72  import org.junit.Before;
73  import org.junit.BeforeClass;
74  import org.junit.Rule;
75  import org.junit.Test;
76  import org.junit.rules.ExternalResource;
77  import org.junit.runner.RunWith;
78  import org.junit.runners.Parameterized;
79  import org.slf4j.Logger;
80  import org.slf4j.LoggerFactory;
81  
82  @RunWith(Parameterized.class)
83  public class H2ServerAndMultiplexingRequesterTest {
84  
85      private final Logger log = LoggerFactory.getLogger(getClass());
86  
87      @Parameterized.Parameters(name = "{0}")
88      public static Collection<Object[]> protocols() {
89          return Arrays.asList(new Object[][]{
90                  { URIScheme.HTTP },
91                  { URIScheme.HTTPS }
92          });
93      }
94      private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
95  
96      private final URIScheme scheme;
97  
98      public H2ServerAndMultiplexingRequesterTest(final URIScheme scheme) {
99          this.scheme = scheme;
100     }
101 
102     private HttpAsyncServer server;
103 
104     @Rule
105     public ExternalResource serverResource = new ExternalResource() {
106 
107         @Override
108         protected void before() throws Throwable {
109             log.debug("Starting up test server");
110             server = H2ServerBootstrap.bootstrap()
111                     .setIOReactorConfig(
112                             IOReactorConfig.custom()
113                                     .setSoTimeout(TIMEOUT)
114                                     .build())
115                     .setTlsStrategy(scheme == URIScheme.HTTPS  ?
116                             new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext()) : null)
117                     .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
118                     .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
119                     .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
120                     .setStreamListener(LoggingH2StreamListener.INSTANCE)
121                     .register("*", new Supplier<AsyncServerExchangeHandler>() {
122 
123                         @Override
124                         public AsyncServerExchangeHandler get() {
125                             return new EchoHandler(2048);
126                         }
127 
128                     })
129                     .create();
130         }
131 
132         @Override
133         protected void after() {
134             log.debug("Shutting down test server");
135             if (server != null) {
136                 server.close(CloseMode.GRACEFUL);
137             }
138         }
139 
140     };
141 
142     private H2MultiplexingRequester requester;
143 
144     @Rule
145     public ExternalResource clientResource = new ExternalResource() {
146 
147         @Override
148         protected void before() throws Throwable {
149             log.debug("Starting up test client");
150             requester = H2MultiplexingRequesterBootstrap.bootstrap()
151                     .setIOReactorConfig(IOReactorConfig.custom()
152                             .setSoTimeout(TIMEOUT)
153                             .build())
154                     .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
155                     .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
156                     .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
157                     .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
158                     .setStreamListener(LoggingH2StreamListener.INSTANCE)
159                     .create();
160         }
161 
162         @Override
163         protected void after() {
164             log.debug("Shutting down test client");
165             if (requester != null) {
166                 requester.close(CloseMode.GRACEFUL);
167             }
168         }
169 
170     };
171 
172     private static int javaVersion;
173 
174     @BeforeClass
175     public static void determineJavaVersion() {
176         javaVersion = ReflectionUtils.determineJRELevel();
177     }
178 
179     @Before
180     public void checkVersion() {
181         if (scheme == URIScheme.HTTPS) {
182             Assume.assumeTrue("Java version must be 1.8 or greater",  javaVersion > 7);
183         }
184     }
185 
186     @Test
187     public void testSequentialRequests() throws Exception {
188         server.start();
189         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
190         final ListenerEndpoint listener = future.get();
191         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
192         requester.start();
193 
194         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
195         final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
196                 new BasicRequestProducer(Method.POST, target, "/stuff",
197                         new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
198                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
199         final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
200         MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
201         final HttpResponse response1 = message1.getHead();
202         MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
203         final String body1 = message1.getBody();
204         MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
205 
206         final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
207                 new BasicRequestProducer(Method.POST, target, "/other-stuff",
208                         new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
209                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
210         final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
211         MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
212         final HttpResponse response2 = message2.getHead();
213         MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
214         final String body2 = message2.getBody();
215         MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
216 
217         final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
218                 new BasicRequestProducer(Method.POST, target, "/more-stuff",
219                         new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
220                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
221         final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
222         MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
223         final HttpResponse response3 = message3.getHead();
224         MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
225         final String body3 = message3.getBody();
226         MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
227     }
228 
229     @Test
230     public void testMultiplexedRequests() throws Exception {
231         server.start();
232         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
233         final ListenerEndpoint listener = future.get();
234         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
235         requester.start();
236 
237         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
238         final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
239 
240         queue.add(requester.execute(
241                 new BasicRequestProducer(Method.POST, target, "/stuff",
242                         new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
243                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
244         queue.add(requester.execute(
245                 new BasicRequestProducer(Method.POST, target, "/other-stuff",
246                         new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
247                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
248         queue.add(requester.execute(
249                 new BasicRequestProducer(Method.POST, target, "/more-stuff",
250                         new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
251                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
252 
253         while (!queue.isEmpty()) {
254             final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
255             final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
256             MatcherAssert.assertThat(message, CoreMatchers.notNullValue());
257             final HttpResponse response = message.getHead();
258             MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
259             final String body = message.getBody();
260             MatcherAssert.assertThat(body, CoreMatchers.containsString("stuff"));
261         }
262     }
263 
264     @Test
265     public void testValidityCheck() throws Exception {
266         server.start();
267         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
268         final ListenerEndpoint listener = future.get();
269         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
270         requester.start();
271         requester.setValidateAfterInactivity(TimeValue.ofMilliseconds(10));
272 
273         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
274         final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
275                 new BasicRequestProducer(Method.POST, target, "/stuff",
276                         new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
277                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
278         final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
279         MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
280         final HttpResponse response1 = message1.getHead();
281         MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
282         final String body1 = message1.getBody();
283         MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
284 
285         Thread.sleep(100);
286 
287         final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
288                 new BasicRequestProducer(Method.POST, target, "/other-stuff",
289                         new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
290                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
291         final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
292         MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
293         final HttpResponse response2 = message2.getHead();
294         MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
295         final String body2 = message2.getBody();
296         MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
297 
298         Thread.sleep(100);
299 
300         final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
301                 new BasicRequestProducer(Method.POST, target, "/more-stuff",
302                         new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
303                 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
304         final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
305         MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
306         final HttpResponse response3 = message3.getHead();
307         MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
308         final String body3 = message3.getBody();
309         MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
310     }
311 
312     @Test
313     public void testMultiplexedRequestCancellation() throws Exception {
314         server.start();
315         final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
316         final ListenerEndpoint listener = future.get();
317         final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
318         requester.start();
319 
320         final int reqNo = 20;
321 
322         final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
323         final Random random = new Random();
324         final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
325         for (int i = 0; i < reqNo; i++) {
326             final Cancellable cancellable = requester.execute(
327                     new BasicClientExchangeHandler<>(new BasicRequestProducer(Method.POST, target, "/stuff",
328                             new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
329                             new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
330                             new FutureCallback<Message<HttpResponse, String>>() {
331 
332                                 @Override
333                                 public void completed(final Message<HttpResponse, String> result) {
334                                     countDownLatch.countDown();
335                                 }
336 
337                                 @Override
338                                 public void failed(final Exception ex) {
339                                     countDownLatch.countDown();
340                                 }
341 
342                                 @Override
343                                 public void cancelled() {
344                                     countDownLatch.countDown();
345                                 }
346 
347                             }),
348                     TIMEOUT,
349                     HttpCoreContext.create());
350             Thread.sleep(random.nextInt(10));
351             cancellable.cancel();
352         }
353         MatcherAssert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
354     }
355 
356 }