1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import java.nio.ByteBuffer;
30 import java.util.Arrays;
31 import java.util.Collection;
32
33 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
34 import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
35 import org.apache.hc.client5.http.config.RequestConfig;
36 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
37 import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
38 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
39 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
40 import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
41 import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
42 import org.apache.hc.client5.testing.SSLTestContexts;
43 import org.apache.hc.core5.http.HeaderElements;
44 import org.apache.hc.core5.http.HttpHeaders;
45 import org.apache.hc.core5.http.HttpHost;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.Message;
48 import org.apache.hc.core5.http.URIScheme;
49 import org.apache.hc.core5.http.config.Http1Config;
50 import org.apache.hc.core5.http.nio.AsyncRequestProducer;
51 import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
52 import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
53 import org.hamcrest.CoreMatchers;
54 import org.junit.Assert;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExternalResource;
58 import org.junit.runner.RunWith;
59 import org.junit.runners.Parameterized;
60 import org.reactivestreams.Publisher;
61
62 @RunWith(Parameterized.class)
63 public class TestHttp1Reactive extends AbstractHttpReactiveFundamentalsTest<CloseableHttpAsyncClient> {
64
65 @Parameterized.Parameters(name = "HTTP/1.1 {0}")
66 public static Collection<Object[]> protocols() {
67 return Arrays.asList(new Object[][]{
68 { URIScheme.HTTP },
69 { URIScheme.HTTPS },
70 });
71 }
72
73 protected HttpAsyncClientBuilder clientBuilder;
74 protected PoolingAsyncClientConnectionManager connManager;
75
76 @Rule
77 public ExternalResource connManagerResource = new ExternalResource() {
78
79 @Override
80 protected void before() throws Throwable {
81 connManager = PoolingAsyncClientConnectionManagerBuilder.create()
82 .setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
83 .build();
84 }
85
86 @Override
87 protected void after() {
88 if (connManager != null) {
89 connManager.close();
90 connManager = null;
91 }
92 }
93
94 };
95
96 @Rule
97 public ExternalResource clientResource = new ExternalResource() {
98
99 @Override
100 protected void before() throws Throwable {
101 clientBuilder = HttpAsyncClientBuilder.create()
102 .setDefaultRequestConfig(RequestConfig.custom()
103 .setConnectionRequestTimeout(TIMEOUT)
104 .setConnectTimeout(TIMEOUT)
105 .build())
106 .setConnectionManager(connManager);
107 }
108
109 };
110
111 public TestHttp1Reactive(final URIScheme scheme) {
112 super(scheme);
113 }
114
115 @Override
116 protected CloseableHttpAsyncClient createClient() {
117 return clientBuilder.build();
118 }
119
120 @Override
121 public HttpHost start() throws Exception {
122 return super.start(null, Http1Config.DEFAULT);
123 }
124
125 @Test(timeout = 60_000)
126 public void testSequentialGetRequestsCloseConnection() throws Exception {
127 final HttpHost target = start();
128 for (int i = 0; i < 3; i++) {
129 final SimpleHttpRequest get = SimpleHttpRequests.get(target, "/random/2048");
130 get.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
131 final AsyncRequestProducer request = AsyncRequestBuilder.get(target + "/random/2048").build();
132 final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
133
134 httpclient.execute(request, consumer, null);
135
136 final Message<HttpResponse, Publisher<ByteBuffer>> response = consumer.getResponseFuture().get();
137 Assert.assertThat(response, CoreMatchers.notNullValue());
138 Assert.assertThat(response.getHead().getCode(), CoreMatchers.equalTo(200));
139 final String body = publisherToString(response.getBody());
140 Assert.assertThat(body, CoreMatchers.notNullValue());
141 Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
142 }
143 }
144
145 @Test(timeout = 60_000)
146 public void testConcurrentPostsOverMultipleConnections() throws Exception {
147 connManager.setDefaultMaxPerRoute(20);
148 connManager.setMaxTotal(100);
149 super.testConcurrentPostRequests();
150 }
151
152 @Test(timeout = 60_000)
153 public void testConcurrentPostsOverSingleConnection() throws Exception {
154 connManager.setDefaultMaxPerRoute(1);
155 connManager.setMaxTotal(100);
156 super.testConcurrentPostRequests();
157 }
158
159 @Test(timeout = 60_000)
160 public void testSharedPool() throws Exception {
161 final HttpHost target = start();
162 final AsyncRequestProducer request1 = AsyncRequestBuilder.get(target + "/random/2048").build();
163 final ReactiveResponseConsumer consumer1 = new ReactiveResponseConsumer();
164
165 httpclient.execute(request1, consumer1, null);
166
167 final Message<HttpResponse, Publisher<ByteBuffer>> response1 = consumer1.getResponseFuture().get();
168 Assert.assertThat(response1, CoreMatchers.notNullValue());
169 Assert.assertThat(response1.getHead(), CoreMatchers.notNullValue());
170 Assert.assertThat(response1.getHead().getCode(), CoreMatchers.equalTo(200));
171 final String body1 = publisherToString(response1.getBody());
172 Assert.assertThat(body1, CoreMatchers.notNullValue());
173 Assert.assertThat(body1.length(), CoreMatchers.equalTo(2048));
174
175
176 try (final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
177 .setConnectionManager(connManager)
178 .setConnectionManagerShared(true)
179 .build()) {
180 httpclient2.start();
181 final AsyncRequestProducer request2 = AsyncRequestBuilder.get(target + "/random/2048").build();
182 final ReactiveResponseConsumer consumer2 = new ReactiveResponseConsumer();
183
184 httpclient2.execute(request2, consumer2, null);
185
186 final Message<HttpResponse, Publisher<ByteBuffer>> response2 = consumer2.getResponseFuture().get();
187 Assert.assertThat(response2, CoreMatchers.notNullValue());
188 Assert.assertThat(response2.getHead().getCode(), CoreMatchers.equalTo(200));
189 final String body2 = publisherToString(response2.getBody());
190 Assert.assertThat(body2, CoreMatchers.notNullValue());
191 Assert.assertThat(body2.length(), CoreMatchers.equalTo(2048));
192 }
193
194 final AsyncRequestProducer request3 = AsyncRequestBuilder.get(target + "/random/2048").build();
195 final ReactiveResponseConsumer consumer3 = new ReactiveResponseConsumer();
196
197 httpclient.execute(request3, consumer3, null);
198
199 final Message<HttpResponse, Publisher<ByteBuffer>> response3 = consumer3.getResponseFuture().get();
200 Assert.assertThat(response3, CoreMatchers.notNullValue());
201 Assert.assertThat(response3.getHead().getCode(), CoreMatchers.equalTo(200));
202 final String body3 = publisherToString(response3.getBody());
203 Assert.assertThat(body3, CoreMatchers.notNullValue());
204 Assert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
205 }
206
207 }