View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.testing.async;
28  
29  import static org.hamcrest.MatcherAssert.assertThat;
30  
31  import java.util.LinkedList;
32  import java.util.Queue;
33  import java.util.Random;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Executors;
38  import java.util.concurrent.Future;
39  import java.util.concurrent.atomic.AtomicInteger;
40  
41  import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
42  import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
43  import org.apache.hc.client5.http.protocol.HttpClientContext;
44  import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
45  import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
46  import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
47  import org.apache.hc.core5.concurrent.FutureCallback;
48  import org.apache.hc.core5.http.ContentType;
49  import org.apache.hc.core5.http.HttpHost;
50  import org.apache.hc.core5.http.HttpResponse;
51  import org.apache.hc.core5.http.Message;
52  import org.apache.hc.core5.http.Method;
53  import org.apache.hc.core5.http.URIScheme;
54  import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
55  import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
56  import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
57  import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
58  import org.hamcrest.CoreMatchers;
59  import org.junit.jupiter.api.Test;
60  
61  public abstract class AbstractHttpAsyncFundamentalsTest extends AbstractIntegrationTestBase {
62  
63      protected AbstractHttpAsyncFundamentalsTest(final URIScheme scheme, final ClientProtocolLevel clientProtocolLevel, final ServerProtocolLevel serverProtocolLevel) {
64          super(scheme, clientProtocolLevel, serverProtocolLevel);
65      }
66  
67      @Test
68      public void testSequentialGetRequests() throws Exception {
69          configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
70          final HttpHost target = startServer();
71  
72          final TestAsyncClient client = startClient();
73  
74          for (int i = 0; i < 3; i++) {
75              final Future<SimpleHttpResponse> future = client.execute(
76                      SimpleRequestBuilder.get()
77                              .setHttpHost(target)
78                              .setPath("/random/2048")
79                              .build(), null);
80              final SimpleHttpResponse response = future.get();
81              assertThat(response, CoreMatchers.notNullValue());
82              assertThat(response.getCode(), CoreMatchers.equalTo(200));
83              final String body = response.getBodyText();
84              assertThat(body, CoreMatchers.notNullValue());
85              assertThat(body.length(), CoreMatchers.equalTo(2048));
86          }
87      }
88  
89      @Test
90      public void testSequentialHeadRequests() throws Exception {
91          configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
92          final HttpHost target = startServer();
93          final TestAsyncClient client = startClient();
94          for (int i = 0; i < 3; i++) {
95              final Future<SimpleHttpResponse> future = client.execute(
96                      SimpleRequestBuilder.head()
97                              .setHttpHost(target)
98                              .setPath("/random/2048")
99                              .build(), null);
100             final SimpleHttpResponse response = future.get();
101             assertThat(response, CoreMatchers.notNullValue());
102             assertThat(response.getCode(), CoreMatchers.equalTo(200));
103             final String body = response.getBodyText();
104             assertThat(body, CoreMatchers.nullValue());
105         }
106     }
107 
108     @Test
109     public void testSequentialPostRequests() throws Exception {
110         configureServer(bootstrap -> bootstrap.register("/echo/*", AsyncEchoHandler::new));
111         final HttpHost target = startServer();
112         final TestAsyncClient client = startClient();
113         for (int i = 0; i < 3; i++) {
114             final byte[] b1 = new byte[1024];
115             final Random rnd = new Random(System.currentTimeMillis());
116             rnd.nextBytes(b1);
117             final Future<Message<HttpResponse, byte[]>> future = client.execute(
118                     new BasicRequestProducer(Method.GET, target, "/echo/",
119                             AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
120                     new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
121             final Message<HttpResponse, byte[]> responseMessage = future.get();
122             assertThat(responseMessage, CoreMatchers.notNullValue());
123             final HttpResponse response = responseMessage.getHead();
124             assertThat(response.getCode(), CoreMatchers.equalTo(200));
125             final byte[] b2 = responseMessage.getBody();
126             assertThat(b1, CoreMatchers.equalTo(b2));
127         }
128     }
129 
130     @Test
131     public void testConcurrentPostRequests() throws Exception {
132         configureServer(bootstrap -> bootstrap.register("/echo/*", AsyncEchoHandler::new));
133         final HttpHost target = startServer();
134         final TestAsyncClient client = startClient();
135         final byte[] b1 = new byte[1024];
136         final Random rnd = new Random(System.currentTimeMillis());
137         rnd.nextBytes(b1);
138 
139         final int reqCount = 20;
140 
141         final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
142         for (int i = 0; i < reqCount; i++) {
143             final Future<Message<HttpResponse, byte[]>> future = client.execute(
144                     new BasicRequestProducer(Method.POST, target, "/echo/",
145                             AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
146                     new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
147             queue.add(future);
148         }
149 
150         while (!queue.isEmpty()) {
151             final Future<Message<HttpResponse, byte[]>> future = queue.remove();
152             final Message<HttpResponse, byte[]> responseMessage = future.get();
153             assertThat(responseMessage, CoreMatchers.notNullValue());
154             final HttpResponse response = responseMessage.getHead();
155             assertThat(response.getCode(), CoreMatchers.equalTo(200));
156             final byte[] b2 = responseMessage.getBody();
157             assertThat(b1, CoreMatchers.equalTo(b2));
158         }
159     }
160 
161     @Test
162     public void testRequestExecutionFromCallback() throws Exception {
163         configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
164         final HttpHost target = startServer();
165         final TestAsyncClient client = startClient();
166         final int requestNum = 50;
167         final AtomicInteger count = new AtomicInteger(requestNum);
168         final Queue<SimpleHttpResponse> resultQueue = new ConcurrentLinkedQueue<>();
169         final CountDownLatch countDownLatch = new CountDownLatch(requestNum);
170 
171         final FutureCallback<SimpleHttpResponse> callback = new FutureCallback<SimpleHttpResponse>() {
172 
173             @Override
174             public void completed(final SimpleHttpResponse result) {
175                 try {
176                     resultQueue.add(result);
177                     if (count.decrementAndGet() > 0) {
178                         client.execute(
179                                 SimpleRequestBuilder.get()
180                                         .setHttpHost(target)
181                                         .setPath("/random/2048")
182                                         .build(), this);
183                     }
184                 } finally {
185                     countDownLatch.countDown();
186                 }
187             }
188 
189             @Override
190             public void failed(final Exception ex) {
191                 countDownLatch.countDown();
192             }
193 
194             @Override
195             public void cancelled() {
196                 countDownLatch.countDown();
197             }
198         };
199 
200         final int threadNum = 5;
201         final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
202         for (int i = 0; i < threadNum; i++) {
203             executorService.execute(() -> {
204                 if (!Thread.currentThread().isInterrupted()) {
205                     client.execute(
206                             SimpleRequestBuilder.get()
207                                     .setHttpHost(target)
208                                     .setPath("/random/2048")
209                                     .build(), callback);
210                 }
211             });
212         }
213 
214         assertThat(countDownLatch.await(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()), CoreMatchers.equalTo(true));
215 
216         executorService.shutdownNow();
217         executorService.awaitTermination(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
218 
219         for (;;) {
220             final SimpleHttpResponse response = resultQueue.poll();
221             if (response == null) {
222                 break;
223             }
224             assertThat(response.getCode(), CoreMatchers.equalTo(200));
225         }
226     }
227 
228     @Test
229     public void testBadRequest() throws Exception {
230         configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
231         final HttpHost target = startServer();
232         final TestAsyncClient client = startClient();
233         final Future<SimpleHttpResponse> future = client.execute(
234                 SimpleRequestBuilder.get()
235                         .setHttpHost(target)
236                         .setPath("/random/boom")
237                         .build(), null);
238         final SimpleHttpResponse response = future.get();
239         assertThat(response, CoreMatchers.notNullValue());
240         assertThat(response.getCode(), CoreMatchers.equalTo(400));
241     }
242 
243 }