View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.client5.http.impl.classic;
28  
29  import static org.hamcrest.MatcherAssert.assertThat;
30  
31  import java.io.IOException;
32  import java.util.LinkedList;
33  import java.util.Queue;
34  import java.util.concurrent.CancellationException;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.ExecutorService;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.FutureTask;
41  import java.util.concurrent.TimeUnit;
42  import java.util.concurrent.TimeoutException;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  
45  import org.apache.hc.client5.http.classic.methods.HttpGet;
46  import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
47  import org.apache.hc.client5.http.io.HttpClientConnectionManager;
48  import org.apache.hc.client5.http.protocol.HttpClientContext;
49  import org.apache.hc.core5.concurrent.FutureCallback;
50  import org.apache.hc.core5.http.ClassicHttpResponse;
51  import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
52  import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
53  import org.apache.hc.core5.http.io.HttpClientResponseHandler;
54  import org.hamcrest.CoreMatchers;
55  import org.junit.jupiter.api.AfterEach;
56  import org.junit.jupiter.api.Assertions;
57  import org.junit.jupiter.api.BeforeEach;
58  import org.junit.jupiter.api.Test;
59  
60  @SuppressWarnings("boxing") // test code
61  public class TestFutureRequestExecutionService {
62  
63      private HttpServer localServer;
64      private String uri;
65      private FutureRequestExecutionService httpAsyncClientWithFuture;
66  
67      private final AtomicBoolean blocked = new AtomicBoolean(false);
68  
69      @BeforeEach
70      public void before() throws Exception {
71          this.localServer = ServerBootstrap.bootstrap()
72                  .register("/wait", (request, response, context) -> {
73                      try {
74                          while(blocked.get()) {
75                              Thread.sleep(10);
76                          }
77                      } catch (final InterruptedException e) {
78                          throw new IllegalStateException(e);
79                      }
80                      response.setCode(200);
81                  }).create();
82  
83          this.localServer.start();
84          uri = "http://localhost:" + this.localServer.getLocalPort() + "/wait";
85          final HttpClientConnectionManager cm = PoolingHttpClientConnectionManagerBuilder.create()
86                  .setMaxConnPerRoute(5)
87                  .build();
88          final CloseableHttpClient httpClient = HttpClientBuilder.create()
89                  .setConnectionManager(cm)
90                  .build();
91          final ExecutorService executorService = Executors.newFixedThreadPool(5);
92          httpAsyncClientWithFuture = new FutureRequestExecutionService(httpClient, executorService);
93      }
94  
95      @AfterEach
96      public void after() throws Exception {
97          blocked.set(false); // any remaining requests should unblock
98          this.localServer.stop();
99          httpAsyncClientWithFuture.close();
100     }
101 
102     @Test
103     public void shouldExecuteSingleCall() throws InterruptedException, ExecutionException {
104         final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
105             new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
106         Assertions.assertTrue(task.get(), "request should have returned OK");
107     }
108 
109     @Test
110     public void shouldCancel() throws InterruptedException, ExecutionException {
111         final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
112             new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
113         task.cancel(true);
114         final Exception exception = Assertions.assertThrows(Exception.class, task::get);
115         assertThat(exception, CoreMatchers.anyOf(
116                 CoreMatchers.instanceOf(CancellationException.class),
117                 CoreMatchers.instanceOf(ExecutionException.class)));
118     }
119 
120     @Test
121     public void shouldTimeout() throws InterruptedException, ExecutionException, TimeoutException {
122         blocked.set(true);
123         final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
124             new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
125         Assertions.assertThrows(TimeoutException.class, () ->
126                 task.get(10, TimeUnit.MILLISECONDS));
127     }
128 
129     @Test
130     public void shouldExecuteMultipleCalls() throws Exception {
131         final int reqNo = 100;
132         final Queue<Future<Boolean>> tasks = new LinkedList<>();
133         for(int i = 0; i < reqNo; i++) {
134             final Future<Boolean> task = httpAsyncClientWithFuture.execute(
135                     new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
136             tasks.add(task);
137         }
138         for (final Future<Boolean> task : tasks) {
139             final Boolean b = task.get();
140             Assertions.assertNotNull(b);
141             Assertions.assertTrue(b, "request should have returned OK");
142         }
143     }
144 
145     @Test
146     public void shouldExecuteMultipleCallsAndCallback() throws Exception {
147         final int reqNo = 100;
148         final Queue<Future<Boolean>> tasks = new LinkedList<>();
149         final CountDownLatch latch = new CountDownLatch(reqNo);
150         for(int i = 0; i < reqNo; i++) {
151             final Future<Boolean> task = httpAsyncClientWithFuture.execute(
152                     new HttpGet(uri), HttpClientContext.create(),
153                     new OkidokiHandler(), new CountingCallback(latch));
154             tasks.add(task);
155         }
156         Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
157         for (final Future<Boolean> task : tasks) {
158             final Boolean b = task.get();
159             Assertions.assertNotNull(b);
160             Assertions.assertTrue(b, "request should have returned OK");
161         }
162     }
163 
164     private final class CountingCallback implements FutureCallback<Boolean> {
165 
166         private final CountDownLatch latch;
167 
168         CountingCallback(final CountDownLatch latch) {
169             super();
170             this.latch = latch;
171         }
172 
173         @Override
174         public void failed(final Exception ex) {
175             latch.countDown();
176         }
177 
178         @Override
179         public void completed(final Boolean result) {
180             latch.countDown();
181         }
182 
183         @Override
184         public void cancelled() {
185             latch.countDown();
186         }
187     }
188 
189 
190     private final class OkidokiHandler implements HttpClientResponseHandler<Boolean> {
191         @Override
192         public Boolean handleResponse(
193                 final ClassicHttpResponse response) throws IOException {
194             return response.getCode() == 200;
195         }
196     }
197 
198 }