View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.testing.async;
28  
29  import static org.hamcrest.MatcherAssert.assertThat;
30  
31  import java.util.LinkedList;
32  import java.util.Queue;
33  import java.util.Random;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Executors;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.atomic.AtomicInteger;
40  
41  import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
42  import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
43  import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
44  import org.apache.hc.client5.http.protocol.HttpClientContext;
45  import org.apache.hc.core5.concurrent.FutureCallback;
46  import org.apache.hc.core5.http.ContentType;
47  import org.apache.hc.core5.http.HttpHost;
48  import org.apache.hc.core5.http.HttpResponse;
49  import org.apache.hc.core5.http.Message;
50  import org.apache.hc.core5.http.Method;
51  import org.apache.hc.core5.http.URIScheme;
52  import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
53  import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
54  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
55  import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
56  import org.apache.hc.core5.testing.nio.H2TestServer;
57  import org.hamcrest.CoreMatchers;
58  import org.junit.jupiter.api.Test;
59  
60  public abstract class AbstractHttpAsyncFundamentalsTest<T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase {
61  
62      protected AbstractHttpAsyncFundamentalsTest(final URIScheme scheme) {
63          super(scheme);
64      }
65  
66      abstract protected H2TestServer startServer() throws Exception;
67  
68      abstract protected T startClient() throws Exception;
69  
70      @Test
71      public void testSequentialGetRequests() throws Exception {
72          final H2TestServer server = startServer();
73          server.register("/random/*", AsyncRandomHandler::new);
74          final HttpHost target = targetHost();
75          final T client = startClient();
76          for (int i = 0; i < 3; i++) {
77              final Future<SimpleHttpResponse> future = client.execute(
78                      SimpleRequestBuilder.get()
79                              .setHttpHost(target)
80                              .setPath("/random/2048")
81                              .build(), null);
82              final SimpleHttpResponse response = future.get();
83              assertThat(response, CoreMatchers.notNullValue());
84              assertThat(response.getCode(), CoreMatchers.equalTo(200));
85              final String body = response.getBodyText();
86              assertThat(body, CoreMatchers.notNullValue());
87              assertThat(body.length(), CoreMatchers.equalTo(2048));
88          }
89      }
90  
91      @Test
92      public void testSequentialHeadRequests() throws Exception {
93          final H2TestServer server = startServer();
94          server.register("/random/*", AsyncRandomHandler::new);
95          final HttpHost target = targetHost();
96          final T client = startClient();
97          for (int i = 0; i < 3; i++) {
98              final Future<SimpleHttpResponse> future = client.execute(
99                      SimpleRequestBuilder.head()
100                             .setHttpHost(target)
101                             .setPath("/random/2048")
102                             .build(), null);
103             final SimpleHttpResponse response = future.get();
104             assertThat(response, CoreMatchers.notNullValue());
105             assertThat(response.getCode(), CoreMatchers.equalTo(200));
106             final String body = response.getBodyText();
107             assertThat(body, CoreMatchers.nullValue());
108         }
109     }
110 
111     @Test
112     public void testSequentialPostRequests() throws Exception {
113         final H2TestServer server = startServer();
114         server.register("/echo/*", AsyncEchoHandler::new);
115         final HttpHost target = targetHost();
116         final T client = startClient();
117         for (int i = 0; i < 3; i++) {
118             final byte[] b1 = new byte[1024];
119             final Random rnd = new Random(System.currentTimeMillis());
120             rnd.nextBytes(b1);
121             final Future<Message<HttpResponse, byte[]>> future = client.execute(
122                     new BasicRequestProducer(Method.GET, target, "/echo/",
123                             AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
124                     new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
125             final Message<HttpResponse, byte[]> responseMessage = future.get();
126             assertThat(responseMessage, CoreMatchers.notNullValue());
127             final HttpResponse response = responseMessage.getHead();
128             assertThat(response.getCode(), CoreMatchers.equalTo(200));
129             final byte[] b2 = responseMessage.getBody();
130             assertThat(b1, CoreMatchers.equalTo(b2));
131         }
132     }
133 
134     @Test
135     public void testConcurrentPostRequests() throws Exception {
136         final H2TestServer server = startServer();
137         server.register("/echo/*", AsyncEchoHandler::new);
138         final HttpHost target = targetHost();
139         final T client = startClient();
140         final byte[] b1 = new byte[1024];
141         final Random rnd = new Random(System.currentTimeMillis());
142         rnd.nextBytes(b1);
143 
144         final int reqCount = 20;
145 
146         final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
147         for (int i = 0; i < reqCount; i++) {
148             final Future<Message<HttpResponse, byte[]>> future = client.execute(
149                     new BasicRequestProducer(Method.POST, target, "/echo/",
150                             AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
151                     new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
152             queue.add(future);
153         }
154 
155         while (!queue.isEmpty()) {
156             final Future<Message<HttpResponse, byte[]>> future = queue.remove();
157             final Message<HttpResponse, byte[]> responseMessage = future.get();
158             assertThat(responseMessage, CoreMatchers.notNullValue());
159             final HttpResponse response = responseMessage.getHead();
160             assertThat(response.getCode(), CoreMatchers.equalTo(200));
161             final byte[] b2 = responseMessage.getBody();
162             assertThat(b1, CoreMatchers.equalTo(b2));
163         }
164     }
165 
166     @Test
167     public void testRequestExecutionFromCallback() throws Exception {
168         final H2TestServer server = startServer();
169         server.register("/random/*", AsyncRandomHandler::new);
170         final HttpHost target = targetHost();
171         final T client = startClient();
172         final int requestNum = 50;
173         final AtomicInteger count = new AtomicInteger(requestNum);
174         final Queue<SimpleHttpResponse> resultQueue = new ConcurrentLinkedQueue<>();
175         final CountDownLatch countDownLatch = new CountDownLatch(requestNum);
176 
177         final FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
178 
179             @Override
180             public void completed(final SimpleHttpResponse result) {
181                 try {
182                     resultQueue.add(result);
183                     if (count.decrementAndGet() > 0) {
184                         client.execute(
185                                 SimpleRequestBuilder.get()
186                                         .setHttpHost(target)
187                                         .setPath("/random/2048")
188                                         .build(), this);
189                     }
190                 } finally {
191                     countDownLatch.countDown();
192                 }
193             }
194 
195             @Override
196             public void failed(final Exception ex) {
197                 countDownLatch.countDown();
198             }
199 
200             @Override
201             public void cancelled() {
202                 countDownLatch.countDown();
203             }
204         };
205 
206         final int threadNum = 5;
207         final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
208         for (int i = 0; i < threadNum; i++) {
209             executorService.execute(() -> {
210                 if (!Thread.currentThread().isInterrupted()) {
211                     client.execute(
212                             SimpleRequestBuilder.get()
213                                     .setHttpHost(target)
214                                     .setPath("/random/2048")
215                                     .build(), callback);
216                 }
217             });
218         }
219 
220         assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
221 
222         executorService.shutdownNow();
223         executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
224 
225         for (;;) {
226             final SimpleHttpResponse response = resultQueue.poll();
227             if (response == null) {
228                 break;
229             }
230             assertThat(response.getCode(), CoreMatchers.equalTo(200));
231         }
232     }
233 
234     @Test
235     public void testBadRequest() throws Exception {
236         final H2TestServer server = startServer();
237         server.register("/random/*", AsyncRandomHandler::new);
238         final HttpHost target = targetHost();
239         final T client = startClient();
240         final Future<SimpleHttpResponse> future = client.execute(
241                 SimpleRequestBuilder.get()
242                         .setHttpHost(target)
243                         .setPath("/random/boom")
244                         .build(), null);
245         final SimpleHttpResponse response = future.get();
246         assertThat(response, CoreMatchers.notNullValue());
247         assertThat(response.getCode(), CoreMatchers.equalTo(400));
248     }
249 
250 }