1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.LinkedList;
34 import java.util.Queue;
35 import java.util.Random;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.Future;
38
39 import org.apache.hc.core5.concurrent.Cancellable;
40 import org.apache.hc.core5.concurrent.FutureCallback;
41 import org.apache.hc.core5.function.Supplier;
42 import org.apache.hc.core5.http.ContentType;
43 import org.apache.hc.core5.http.HttpHost;
44 import org.apache.hc.core5.http.HttpResponse;
45 import org.apache.hc.core5.http.HttpStatus;
46 import org.apache.hc.core5.http.Message;
47 import org.apache.hc.core5.http.Method;
48 import org.apache.hc.core5.http.URIScheme;
49 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
50 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
52 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
53 import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
54 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
55 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
56 import org.apache.hc.core5.http.protocol.HttpCoreContext;
57 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
58 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap;
59 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
60 import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
61 import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
62 import org.apache.hc.core5.io.CloseMode;
63 import org.apache.hc.core5.reactor.IOReactorConfig;
64 import org.apache.hc.core5.reactor.ListenerEndpoint;
65 import org.apache.hc.core5.testing.SSLTestContexts;
66 import org.apache.hc.core5.util.ReflectionUtils;
67 import org.apache.hc.core5.util.TimeValue;
68 import org.apache.hc.core5.util.Timeout;
69 import org.hamcrest.CoreMatchers;
70 import org.hamcrest.MatcherAssert;
71 import org.junit.Assume;
72 import org.junit.Before;
73 import org.junit.BeforeClass;
74 import org.junit.Rule;
75 import org.junit.Test;
76 import org.junit.rules.ExternalResource;
77 import org.junit.runner.RunWith;
78 import org.junit.runners.Parameterized;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 @RunWith(Parameterized.class)
83 public class H2ServerAndMultiplexingRequesterTest {
84
85 private final Logger log = LoggerFactory.getLogger(getClass());
86
87 @Parameterized.Parameters(name = "{0}")
88 public static Collection<Object[]> protocols() {
89 return Arrays.asList(new Object[][]{
90 { URIScheme.HTTP },
91 { URIScheme.HTTPS }
92 });
93 }
94 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
95
96 private final URIScheme scheme;
97
98 public H2ServerAndMultiplexingRequesterTest(final URIScheme scheme) {
99 this.scheme = scheme;
100 }
101
102 private HttpAsyncServer server;
103
104 @Rule
105 public ExternalResource serverResource = new ExternalResource() {
106
107 @Override
108 protected void before() throws Throwable {
109 log.debug("Starting up test server");
110 server = H2ServerBootstrap.bootstrap()
111 .setIOReactorConfig(
112 IOReactorConfig.custom()
113 .setSoTimeout(TIMEOUT)
114 .build())
115 .setTlsStrategy(scheme == URIScheme.HTTPS ?
116 new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext()) : null)
117 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
118 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
119 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
120 .setStreamListener(LoggingH2StreamListener.INSTANCE)
121 .register("*", new Supplier<AsyncServerExchangeHandler>() {
122
123 @Override
124 public AsyncServerExchangeHandler get() {
125 return new EchoHandler(2048);
126 }
127
128 })
129 .create();
130 }
131
132 @Override
133 protected void after() {
134 log.debug("Shutting down test server");
135 if (server != null) {
136 server.close(CloseMode.GRACEFUL);
137 }
138 }
139
140 };
141
142 private H2MultiplexingRequester requester;
143
144 @Rule
145 public ExternalResource clientResource = new ExternalResource() {
146
147 @Override
148 protected void before() throws Throwable {
149 log.debug("Starting up test client");
150 requester = H2MultiplexingRequesterBootstrap.bootstrap()
151 .setIOReactorConfig(IOReactorConfig.custom()
152 .setSoTimeout(TIMEOUT)
153 .build())
154 .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
155 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
156 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
157 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
158 .setStreamListener(LoggingH2StreamListener.INSTANCE)
159 .create();
160 }
161
162 @Override
163 protected void after() {
164 log.debug("Shutting down test client");
165 if (requester != null) {
166 requester.close(CloseMode.GRACEFUL);
167 }
168 }
169
170 };
171
172 private static int javaVersion;
173
174 @BeforeClass
175 public static void determineJavaVersion() {
176 javaVersion = ReflectionUtils.determineJRELevel();
177 }
178
179 @Before
180 public void checkVersion() {
181 if (scheme == URIScheme.HTTPS) {
182 Assume.assumeTrue("Java version must be 1.8 or greater", javaVersion > 7);
183 }
184 }
185
186 @Test
187 public void testSequentialRequests() throws Exception {
188 server.start();
189 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
190 final ListenerEndpoint listener = future.get();
191 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
192 requester.start();
193
194 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
195 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
196 new BasicRequestProducer(Method.POST, target, "/stuff",
197 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
198 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
199 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
200 MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
201 final HttpResponse response1 = message1.getHead();
202 MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
203 final String body1 = message1.getBody();
204 MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
205
206 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
207 new BasicRequestProducer(Method.POST, target, "/other-stuff",
208 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
209 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
210 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
211 MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
212 final HttpResponse response2 = message2.getHead();
213 MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
214 final String body2 = message2.getBody();
215 MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
216
217 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
218 new BasicRequestProducer(Method.POST, target, "/more-stuff",
219 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
220 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
221 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
222 MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
223 final HttpResponse response3 = message3.getHead();
224 MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
225 final String body3 = message3.getBody();
226 MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
227 }
228
229 @Test
230 public void testMultiplexedRequests() throws Exception {
231 server.start();
232 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
233 final ListenerEndpoint listener = future.get();
234 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
235 requester.start();
236
237 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
238 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
239
240 queue.add(requester.execute(
241 new BasicRequestProducer(Method.POST, target, "/stuff",
242 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
243 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
244 queue.add(requester.execute(
245 new BasicRequestProducer(Method.POST, target, "/other-stuff",
246 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
247 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
248 queue.add(requester.execute(
249 new BasicRequestProducer(Method.POST, target, "/more-stuff",
250 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
251 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null));
252
253 while (!queue.isEmpty()) {
254 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
255 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
256 MatcherAssert.assertThat(message, CoreMatchers.notNullValue());
257 final HttpResponse response = message.getHead();
258 MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
259 final String body = message.getBody();
260 MatcherAssert.assertThat(body, CoreMatchers.containsString("stuff"));
261 }
262 }
263
264 @Test
265 public void testValidityCheck() throws Exception {
266 server.start();
267 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
268 final ListenerEndpoint listener = future.get();
269 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
270 requester.start();
271 requester.setValidateAfterInactivity(TimeValue.ofMilliseconds(10));
272
273 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
274 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
275 new BasicRequestProducer(Method.POST, target, "/stuff",
276 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
277 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
278 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
279 MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
280 final HttpResponse response1 = message1.getHead();
281 MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
282 final String body1 = message1.getBody();
283 MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
284
285 Thread.sleep(100);
286
287 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
288 new BasicRequestProducer(Method.POST, target, "/other-stuff",
289 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
290 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
291 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
292 MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
293 final HttpResponse response2 = message2.getHead();
294 MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
295 final String body2 = message2.getBody();
296 MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
297
298 Thread.sleep(100);
299
300 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
301 new BasicRequestProducer(Method.POST, target, "/more-stuff",
302 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
303 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
304 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
305 MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
306 final HttpResponse response3 = message3.getHead();
307 MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
308 final String body3 = message3.getBody();
309 MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
310 }
311
312 @Test
313 public void testMultiplexedRequestCancellation() throws Exception {
314 server.start();
315 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
316 final ListenerEndpoint listener = future.get();
317 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
318 requester.start();
319
320 final int reqNo = 20;
321
322 final CountDownLatch countDownLatch = new CountDownLatch(reqNo);
323 final Random random = new Random();
324 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
325 for (int i = 0; i < reqNo; i++) {
326 final Cancellable cancellable = requester.execute(
327 new BasicClientExchangeHandler<>(new BasicRequestProducer(Method.POST, target, "/stuff",
328 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
329 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
330 new FutureCallback<Message<HttpResponse, String>>() {
331
332 @Override
333 public void completed(final Message<HttpResponse, String> result) {
334 countDownLatch.countDown();
335 }
336
337 @Override
338 public void failed(final Exception ex) {
339 countDownLatch.countDown();
340 }
341
342 @Override
343 public void cancelled() {
344 countDownLatch.countDown();
345 }
346
347 }),
348 TIMEOUT,
349 HttpCoreContext.create());
350 Thread.sleep(random.nextInt(10));
351 cancellable.cancel();
352 }
353 MatcherAssert.assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
354 }
355
356 }