1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.LinkedList;
34 import java.util.Queue;
35 import java.util.concurrent.Future;
36
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.ContentType;
39 import org.apache.hc.core5.http.HttpHost;
40 import org.apache.hc.core5.http.HttpResponse;
41 import org.apache.hc.core5.http.HttpStatus;
42 import org.apache.hc.core5.http.Message;
43 import org.apache.hc.core5.http.Method;
44 import org.apache.hc.core5.http.URIScheme;
45 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
46 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
47 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
48 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
49 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
50 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
51 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
52 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
53 import org.apache.hc.core5.http.protocol.UriPatternMatcher;
54 import org.apache.hc.core5.http2.HttpVersionPolicy;
55 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
56 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
57 import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
58 import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
59 import org.apache.hc.core5.io.CloseMode;
60 import org.apache.hc.core5.reactor.IOReactorConfig;
61 import org.apache.hc.core5.reactor.ListenerEndpoint;
62 import org.apache.hc.core5.testing.SSLTestContexts;
63 import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
64 import org.apache.hc.core5.util.ReflectionUtils;
65 import org.apache.hc.core5.util.Timeout;
66 import org.hamcrest.CoreMatchers;
67 import org.junit.Assert;
68 import org.junit.Assume;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Rule;
72 import org.junit.Test;
73 import org.junit.rules.ExternalResource;
74 import org.junit.runner.RunWith;
75 import org.junit.runners.Parameterized;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 @RunWith(Parameterized.class)
80 public class H2ServerAndRequesterTest {
81
82 private final Logger log = LoggerFactory.getLogger(getClass());
83
84 @Parameterized.Parameters(name = "{0}")
85 public static Collection<Object[]> protocols() {
86 return Arrays.asList(new Object[][]{
87 { URIScheme.HTTP },
88 { URIScheme.HTTPS }
89 });
90 }
91 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
92
93 private final URIScheme scheme;
94
95 public H2ServerAndRequesterTest(final URIScheme scheme) {
96 this.scheme = scheme;
97 }
98
99 private HttpAsyncServer server;
100
101 @Rule
102 public ExternalResource serverResource = new ExternalResource() {
103
104 @Override
105 protected void before() throws Throwable {
106 log.debug("Starting up test server");
107 server = H2ServerBootstrap.bootstrap()
108 .setLookupRegistry(new UriPatternMatcher<Supplier<AsyncServerExchangeHandler>>())
109 .setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
110 .setIOReactorConfig(
111 IOReactorConfig.custom()
112 .setSoTimeout(TIMEOUT)
113 .build())
114 .setTlsStrategy(scheme == URIScheme.HTTPS ? new H2ServerTlsStrategy(
115 SSLTestContexts.createServerSSLContext(),
116 SecureAllPortsStrategy.INSTANCE) : null)
117 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
118 .setStreamListener(LoggingH2StreamListener.INSTANCE)
119 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
120 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
121 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
122 .register("*", new Supplier<AsyncServerExchangeHandler>() {
123
124 @Override
125 public AsyncServerExchangeHandler get() {
126 return new EchoHandler(2048);
127 }
128
129 })
130 .create();
131 }
132
133 @Override
134 protected void after() {
135 log.debug("Shutting down test server");
136 if (server != null) {
137 server.close(CloseMode.GRACEFUL);
138 }
139 }
140
141 };
142
143 private HttpAsyncRequester requester;
144
145 @Rule
146 public ExternalResource clientResource = new ExternalResource() {
147
148 @Override
149 protected void before() throws Throwable {
150 log.debug("Starting up test client");
151 requester = H2RequesterBootstrap.bootstrap()
152 .setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
153 .setIOReactorConfig(IOReactorConfig.custom()
154 .setSoTimeout(TIMEOUT)
155 .build())
156 .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
157 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
158 .setStreamListener(LoggingH2StreamListener.INSTANCE)
159 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
160 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
161 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
162 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
163 .create();
164 }
165
166 @Override
167 protected void after() {
168 log.debug("Shutting down test client");
169 if (requester != null) {
170 requester.close(CloseMode.GRACEFUL);
171 }
172 }
173
174 };
175
176 private static int javaVersion;
177
178 @BeforeClass
179 public static void determineJavaVersion() {
180 javaVersion = ReflectionUtils.determineJRELevel();
181 }
182
183 @Before
184 public void checkVersion() {
185 if (scheme == URIScheme.HTTPS) {
186 Assume.assumeTrue("Java version must be 1.8 or greater", javaVersion > 7);
187 }
188 }
189
190 @Test
191 public void testSequentialRequests() throws Exception {
192 server.start();
193 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
194 final ListenerEndpoint listener = future.get();
195 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
196 requester.start();
197
198 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
199 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
200 new BasicRequestProducer(Method.POST, target, "/stuff",
201 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
202 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
203 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
204 Assert.assertThat(message1, CoreMatchers.notNullValue());
205 final HttpResponse response1 = message1.getHead();
206 Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
207 final String body1 = message1.getBody();
208 Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
209
210 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
211 new BasicRequestProducer(Method.POST, target, "/other-stuff",
212 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
213 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
214 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
215 Assert.assertThat(message2, CoreMatchers.notNullValue());
216 final HttpResponse response2 = message2.getHead();
217 Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
218 final String body2 = message2.getBody();
219 Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
220
221 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
222 new BasicRequestProducer(Method.POST, target, "/more-stuff",
223 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
224 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
225 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
226 Assert.assertThat(message3, CoreMatchers.notNullValue());
227 final HttpResponse response3 = message3.getHead();
228 Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
229 final String body3 = message3.getBody();
230 Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
231 }
232
233 @Test
234 public void testSequentialRequestsSameEndpoint() throws Exception {
235 server.start();
236 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
237 final ListenerEndpoint listener = future.get();
238 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
239 requester.start();
240
241 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
242 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
243 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
244 try {
245
246 final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
247 new BasicRequestProducer(Method.POST, target, "/stuff",
248 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
249 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
250 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
251 Assert.assertThat(message1, CoreMatchers.notNullValue());
252 final HttpResponse response1 = message1.getHead();
253 Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
254 final String body1 = message1.getBody();
255 Assert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
256
257 final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
258 new BasicRequestProducer(Method.POST, target, "/other-stuff",
259 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
260 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
261 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
262 Assert.assertThat(message2, CoreMatchers.notNullValue());
263 final HttpResponse response2 = message2.getHead();
264 Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
265 final String body2 = message2.getBody();
266 Assert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
267
268 final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
269 new BasicRequestProducer(Method.POST, target, "/more-stuff",
270 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
271 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
272 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
273 Assert.assertThat(message3, CoreMatchers.notNullValue());
274 final HttpResponse response3 = message3.getHead();
275 Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
276 final String body3 = message3.getBody();
277 Assert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
278
279 } finally {
280 endpoint.releaseAndReuse();
281 }
282 }
283
284 @Test
285 public void testPipelinedRequests() throws Exception {
286 server.start();
287 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0));
288 final ListenerEndpoint listener = future.get();
289 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
290 requester.start();
291
292 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
293 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
294 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
295 try {
296
297 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
298
299 queue.add(endpoint.execute(
300 new BasicRequestProducer(Method.POST, target, "/stuff",
301 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
302 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
303 queue.add(endpoint.execute(
304 new BasicRequestProducer(Method.POST, target, "/other-stuff",
305 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
306 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
307 queue.add(endpoint.execute(
308 new BasicRequestProducer(Method.POST, target, "/more-stuff",
309 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
310 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
311
312 while (!queue.isEmpty()) {
313 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
314 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
315 Assert.assertThat(message, CoreMatchers.notNullValue());
316 final HttpResponse response = message.getHead();
317 Assert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
318 final String body = message.getBody();
319 Assert.assertThat(body, CoreMatchers.containsString("stuff"));
320 }
321
322 } finally {
323 endpoint.releaseAndReuse();
324 }
325 }
326
327 }