1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.testing.nio;
29
30 import java.net.InetSocketAddress;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.LinkedList;
34 import java.util.Queue;
35 import java.util.concurrent.Future;
36
37 import org.apache.hc.core5.function.Supplier;
38 import org.apache.hc.core5.http.ContentType;
39 import org.apache.hc.core5.http.HttpHost;
40 import org.apache.hc.core5.http.HttpResponse;
41 import org.apache.hc.core5.http.HttpStatus;
42 import org.apache.hc.core5.http.Message;
43 import org.apache.hc.core5.http.Method;
44 import org.apache.hc.core5.http.URIScheme;
45 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
46 import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
47 import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
48 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
49 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
50 import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
51 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
52 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
53 import org.apache.hc.core5.http.protocol.UriPatternMatcher;
54 import org.apache.hc.core5.http2.HttpVersionPolicy;
55 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2RequesterBootstrap;
56 import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
57 import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy;
58 import org.apache.hc.core5.http2.ssl.H2ServerTlsStrategy;
59 import org.apache.hc.core5.io.CloseMode;
60 import org.apache.hc.core5.reactor.IOReactorConfig;
61 import org.apache.hc.core5.reactor.ListenerEndpoint;
62 import org.apache.hc.core5.testing.SSLTestContexts;
63 import org.apache.hc.core5.testing.classic.LoggingConnPoolListener;
64 import org.apache.hc.core5.util.ReflectionUtils;
65 import org.apache.hc.core5.util.Timeout;
66 import org.hamcrest.CoreMatchers;
67 import org.hamcrest.MatcherAssert;
68 import org.junit.Assume;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Rule;
72 import org.junit.Test;
73 import org.junit.rules.ExternalResource;
74 import org.junit.runner.RunWith;
75 import org.junit.runners.Parameterized;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78
79 @RunWith(Parameterized.class)
80 public class H2ServerAndRequesterTest {
81
82 private final Logger log = LoggerFactory.getLogger(getClass());
83
84 @Parameterized.Parameters(name = "{0}")
85 public static Collection<Object[]> protocols() {
86 return Arrays.asList(new Object[][]{
87 { URIScheme.HTTP },
88 { URIScheme.HTTPS }
89 });
90 }
91 private static final Timeout TIMEOUT = Timeout.ofSeconds(30);
92
93 private final URIScheme scheme;
94
95 public H2ServerAndRequesterTest(final URIScheme scheme) {
96 this.scheme = scheme;
97 }
98
99 private HttpAsyncServer server;
100
101 @Rule
102 public ExternalResource serverResource = new ExternalResource() {
103
104 @Override
105 protected void before() throws Throwable {
106 log.debug("Starting up test server");
107 server = H2ServerBootstrap.bootstrap()
108 .setLookupRegistry(new UriPatternMatcher<Supplier<AsyncServerExchangeHandler>>())
109 .setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
110 .setIOReactorConfig(
111 IOReactorConfig.custom()
112 .setSoTimeout(TIMEOUT)
113 .build())
114 .setTlsStrategy(scheme == URIScheme.HTTPS ?
115 new H2ServerTlsStrategy(SSLTestContexts.createServerSSLContext()) : null)
116 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_SERVER)
117 .setStreamListener(LoggingH2StreamListener.INSTANCE)
118 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
119 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
120 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
121 .register("*", new Supplier<AsyncServerExchangeHandler>() {
122
123 @Override
124 public AsyncServerExchangeHandler get() {
125 return new EchoHandler(2048);
126 }
127
128 })
129 .create();
130 }
131
132 @Override
133 protected void after() {
134 log.debug("Shutting down test server");
135 if (server != null) {
136 server.close(CloseMode.GRACEFUL);
137 }
138 }
139
140 };
141
142 private HttpAsyncRequester requester;
143
144 @Rule
145 public ExternalResource clientResource = new ExternalResource() {
146
147 @Override
148 protected void before() throws Throwable {
149 log.debug("Starting up test client");
150 requester = H2RequesterBootstrap.bootstrap()
151 .setVersionPolicy(HttpVersionPolicy.NEGOTIATE)
152 .setIOReactorConfig(IOReactorConfig.custom()
153 .setSoTimeout(TIMEOUT)
154 .build())
155 .setTlsStrategy(new H2ClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
156 .setStreamListener(LoggingHttp1StreamListener.INSTANCE_CLIENT)
157 .setStreamListener(LoggingH2StreamListener.INSTANCE)
158 .setConnPoolListener(LoggingConnPoolListener.INSTANCE)
159 .setIOSessionDecorator(LoggingIOSessionDecorator.INSTANCE)
160 .setExceptionCallback(LoggingExceptionCallback.INSTANCE)
161 .setIOSessionListener(LoggingIOSessionListener.INSTANCE)
162 .create();
163 }
164
165 @Override
166 protected void after() {
167 log.debug("Shutting down test client");
168 if (requester != null) {
169 requester.close(CloseMode.GRACEFUL);
170 }
171 }
172
173 };
174
175 private static int javaVersion;
176
177 @BeforeClass
178 public static void determineJavaVersion() {
179 javaVersion = ReflectionUtils.determineJRELevel();
180 }
181
182 @Before
183 public void checkVersion() {
184 if (scheme == URIScheme.HTTPS) {
185 Assume.assumeTrue("Java version must be 1.8 or greater", javaVersion > 7);
186 }
187 }
188
189 @Test
190 public void testSequentialRequests() throws Exception {
191 server.start();
192 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
193 final ListenerEndpoint listener = future.get();
194 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
195 requester.start();
196
197 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
198 final Future<Message<HttpResponse, String>> resultFuture1 = requester.execute(
199 new BasicRequestProducer(Method.POST, target, "/stuff",
200 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
201 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
202 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
203 MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
204 final HttpResponse response1 = message1.getHead();
205 MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
206 final String body1 = message1.getBody();
207 MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
208
209 final Future<Message<HttpResponse, String>> resultFuture2 = requester.execute(
210 new BasicRequestProducer(Method.POST, target, "/other-stuff",
211 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
212 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
213 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
214 MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
215 final HttpResponse response2 = message2.getHead();
216 MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
217 final String body2 = message2.getBody();
218 MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
219
220 final Future<Message<HttpResponse, String>> resultFuture3 = requester.execute(
221 new BasicRequestProducer(Method.POST, target, "/more-stuff",
222 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
223 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), TIMEOUT, null);
224 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
225 MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
226 final HttpResponse response3 = message3.getHead();
227 MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
228 final String body3 = message3.getBody();
229 MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
230 }
231
232 @Test
233 public void testSequentialRequestsSameEndpoint() throws Exception {
234 server.start();
235 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
236 final ListenerEndpoint listener = future.get();
237 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
238 requester.start();
239
240 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
241 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
242 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
243 try {
244
245 final Future<Message<HttpResponse, String>> resultFuture1 = endpoint.execute(
246 new BasicRequestProducer(Method.POST, target, "/stuff",
247 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
248 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
249 final Message<HttpResponse, String> message1 = resultFuture1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
250 MatcherAssert.assertThat(message1, CoreMatchers.notNullValue());
251 final HttpResponse response1 = message1.getHead();
252 MatcherAssert.assertThat(response1.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
253 final String body1 = message1.getBody();
254 MatcherAssert.assertThat(body1, CoreMatchers.equalTo("some stuff"));
255
256 final Future<Message<HttpResponse, String>> resultFuture2 = endpoint.execute(
257 new BasicRequestProducer(Method.POST, target, "/other-stuff",
258 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
259 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
260 final Message<HttpResponse, String> message2 = resultFuture2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
261 MatcherAssert.assertThat(message2, CoreMatchers.notNullValue());
262 final HttpResponse response2 = message2.getHead();
263 MatcherAssert.assertThat(response2.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
264 final String body2 = message2.getBody();
265 MatcherAssert.assertThat(body2, CoreMatchers.equalTo("some other stuff"));
266
267 final Future<Message<HttpResponse, String>> resultFuture3 = endpoint.execute(
268 new BasicRequestProducer(Method.POST, target, "/more-stuff",
269 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
270 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
271 final Message<HttpResponse, String> message3 = resultFuture3.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
272 MatcherAssert.assertThat(message3, CoreMatchers.notNullValue());
273 final HttpResponse response3 = message3.getHead();
274 MatcherAssert.assertThat(response3.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
275 final String body3 = message3.getBody();
276 MatcherAssert.assertThat(body3, CoreMatchers.equalTo("some more stuff"));
277
278 } finally {
279 endpoint.releaseAndReuse();
280 }
281 }
282
283 @Test
284 public void testPipelinedRequests() throws Exception {
285 server.start();
286 final Future<ListenerEndpoint> future = server.listen(new InetSocketAddress(0), scheme);
287 final ListenerEndpoint listener = future.get();
288 final InetSocketAddress address = (InetSocketAddress) listener.getAddress();
289 requester.start();
290
291 final HttpHost target = new HttpHost(scheme.id, "localhost", address.getPort());
292 final Future<AsyncClientEndpoint> endpointFuture = requester.connect(target, Timeout.ofSeconds(5));
293 final AsyncClientEndpoint endpoint = endpointFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
294 try {
295
296 final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
297
298 queue.add(endpoint.execute(
299 new BasicRequestProducer(Method.POST, target, "/stuff",
300 new StringAsyncEntityProducer("some stuff", ContentType.TEXT_PLAIN)),
301 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
302 queue.add(endpoint.execute(
303 new BasicRequestProducer(Method.POST, target, "/other-stuff",
304 new StringAsyncEntityProducer("some other stuff", ContentType.TEXT_PLAIN)),
305 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
306 queue.add(endpoint.execute(
307 new BasicRequestProducer(Method.POST, target, "/more-stuff",
308 new StringAsyncEntityProducer("some more stuff", ContentType.TEXT_PLAIN)),
309 new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
310
311 while (!queue.isEmpty()) {
312 final Future<Message<HttpResponse, String>> resultFuture = queue.remove();
313 final Message<HttpResponse, String> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
314 MatcherAssert.assertThat(message, CoreMatchers.notNullValue());
315 final HttpResponse response = message.getHead();
316 MatcherAssert.assertThat(response.getCode(), CoreMatchers.equalTo(HttpStatus.SC_OK));
317 final String body = message.getBody();
318 MatcherAssert.assertThat(body, CoreMatchers.containsString("stuff"));
319 }
320
321 } finally {
322 endpoint.releaseAndReuse();
323 }
324 }
325
326 }