1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.testing.async;
28
29 import static org.hamcrest.MatcherAssert.assertThat;
30
31 import java.util.LinkedList;
32 import java.util.Queue;
33 import java.util.Random;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.CountDownLatch;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicInteger;
40
41 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
42 import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
43 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
44 import org.apache.hc.client5.http.protocol.HttpClientContext;
45 import org.apache.hc.core5.concurrent.FutureCallback;
46 import org.apache.hc.core5.http.ContentType;
47 import org.apache.hc.core5.http.HttpHost;
48 import org.apache.hc.core5.http.HttpResponse;
49 import org.apache.hc.core5.http.Message;
50 import org.apache.hc.core5.http.Method;
51 import org.apache.hc.core5.http.URIScheme;
52 import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
53 import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
54 import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
55 import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
56 import org.apache.hc.core5.testing.nio.H2TestServer;
57 import org.hamcrest.CoreMatchers;
58 import org.junit.jupiter.api.Test;
59
60 public abstract class AbstractHttpAsyncFundamentalsTest<T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase {
61
62 protected AbstractHttpAsyncFundamentalsTest(final URIScheme scheme) {
63 super(scheme);
64 }
65
66 abstract protected H2TestServer startServer() throws Exception;
67
68 abstract protected T startClient() throws Exception;
69
70 @Test
71 public void testSequentialGetRequests() throws Exception {
72 final H2TestServer server = startServer();
73 server.register("/random/*", AsyncRandomHandler::new);
74 final HttpHost target = targetHost();
75 final T client = startClient();
76 for (int i = 0; i < 3; i++) {
77 final Future<SimpleHttpResponse> future = client.execute(
78 SimpleRequestBuilder.get()
79 .setHttpHost(target)
80 .setPath("/random/2048")
81 .build(), null);
82 final SimpleHttpResponse response = future.get();
83 assertThat(response, CoreMatchers.notNullValue());
84 assertThat(response.getCode(), CoreMatchers.equalTo(200));
85 final String body = response.getBodyText();
86 assertThat(body, CoreMatchers.notNullValue());
87 assertThat(body.length(), CoreMatchers.equalTo(2048));
88 }
89 }
90
91 @Test
92 public void testSequentialHeadRequests() throws Exception {
93 final H2TestServer server = startServer();
94 server.register("/random/*", AsyncRandomHandler::new);
95 final HttpHost target = targetHost();
96 final T client = startClient();
97 for (int i = 0; i < 3; i++) {
98 final Future<SimpleHttpResponse> future = client.execute(
99 SimpleRequestBuilder.head()
100 .setHttpHost(target)
101 .setPath("/random/2048")
102 .build(), null);
103 final SimpleHttpResponse response = future.get();
104 assertThat(response, CoreMatchers.notNullValue());
105 assertThat(response.getCode(), CoreMatchers.equalTo(200));
106 final String body = response.getBodyText();
107 assertThat(body, CoreMatchers.nullValue());
108 }
109 }
110
111 @Test
112 public void testSequentialPostRequests() throws Exception {
113 final H2TestServer server = startServer();
114 server.register("/echo/*", AsyncEchoHandler::new);
115 final HttpHost target = targetHost();
116 final T client = startClient();
117 for (int i = 0; i < 3; i++) {
118 final byte[] b1 = new byte[1024];
119 final Random rnd = new Random(System.currentTimeMillis());
120 rnd.nextBytes(b1);
121 final Future<Message<HttpResponse, byte[]>> future = client.execute(
122 new BasicRequestProducer(Method.GET, target, "/echo/",
123 AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
124 new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
125 final Message<HttpResponse, byte[]> responseMessage = future.get();
126 assertThat(responseMessage, CoreMatchers.notNullValue());
127 final HttpResponse response = responseMessage.getHead();
128 assertThat(response.getCode(), CoreMatchers.equalTo(200));
129 final byte[] b2 = responseMessage.getBody();
130 assertThat(b1, CoreMatchers.equalTo(b2));
131 }
132 }
133
134 @Test
135 public void testConcurrentPostRequests() throws Exception {
136 final H2TestServer server = startServer();
137 server.register("/echo/*", AsyncEchoHandler::new);
138 final HttpHost target = targetHost();
139 final T client = startClient();
140 final byte[] b1 = new byte[1024];
141 final Random rnd = new Random(System.currentTimeMillis());
142 rnd.nextBytes(b1);
143
144 final int reqCount = 20;
145
146 final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
147 for (int i = 0; i < reqCount; i++) {
148 final Future<Message<HttpResponse, byte[]>> future = client.execute(
149 new BasicRequestProducer(Method.POST, target, "/echo/",
150 AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
151 new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
152 queue.add(future);
153 }
154
155 while (!queue.isEmpty()) {
156 final Future<Message<HttpResponse, byte[]>> future = queue.remove();
157 final Message<HttpResponse, byte[]> responseMessage = future.get();
158 assertThat(responseMessage, CoreMatchers.notNullValue());
159 final HttpResponse response = responseMessage.getHead();
160 assertThat(response.getCode(), CoreMatchers.equalTo(200));
161 final byte[] b2 = responseMessage.getBody();
162 assertThat(b1, CoreMatchers.equalTo(b2));
163 }
164 }
165
166 @Test
167 public void testRequestExecutionFromCallback() throws Exception {
168 final H2TestServer server = startServer();
169 server.register("/random/*", AsyncRandomHandler::new);
170 final HttpHost target = targetHost();
171 final T client = startClient();
172 final int requestNum = 50;
173 final AtomicInteger count = new AtomicInteger(requestNum);
174 final Queue<SimpleHttpResponse> resultQueue = new ConcurrentLinkedQueue<>();
175 final CountDownLatch countDownLatch = new CountDownLatch(requestNum);
176
177 final FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
178
179 @Override
180 public void completed(final SimpleHttpResponse result) {
181 try {
182 resultQueue.add(result);
183 if (count.decrementAndGet() > 0) {
184 client.execute(
185 SimpleRequestBuilder.get()
186 .setHttpHost(target)
187 .setPath("/random/2048")
188 .build(), this);
189 }
190 } finally {
191 countDownLatch.countDown();
192 }
193 }
194
195 @Override
196 public void failed(final Exception ex) {
197 countDownLatch.countDown();
198 }
199
200 @Override
201 public void cancelled() {
202 countDownLatch.countDown();
203 }
204 };
205
206 final int threadNum = 5;
207 final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
208 for (int i = 0; i < threadNum; i++) {
209 executorService.execute(() -> {
210 if (!Thread.currentThread().isInterrupted()) {
211 client.execute(
212 SimpleRequestBuilder.get()
213 .setHttpHost(target)
214 .setPath("/random/2048")
215 .build(), callback);
216 }
217 });
218 }
219
220 assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
221
222 executorService.shutdownNow();
223 executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
224
225 for (;;) {
226 final SimpleHttpResponse response = resultQueue.poll();
227 if (response == null) {
228 break;
229 }
230 assertThat(response.getCode(), CoreMatchers.equalTo(200));
231 }
232 }
233
234 @Test
235 public void testBadRequest() throws Exception {
236 final H2TestServer server = startServer();
237 server.register("/random/*", AsyncRandomHandler::new);
238 final HttpHost target = targetHost();
239 final T client = startClient();
240 final Future<SimpleHttpResponse> future = client.execute(
241 SimpleRequestBuilder.get()
242 .setHttpHost(target)
243 .setPath("/random/boom")
244 .build(), null);
245 final SimpleHttpResponse response = future.get();
246 assertThat(response, CoreMatchers.notNullValue());
247 assertThat(response.getCode(), CoreMatchers.equalTo(400));
248 }
249
250 }