1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 package org.apache.hc.client5.http.impl.classic;
28
29 import java.io.IOException;
30 import java.util.LinkedList;
31 import java.util.Queue;
32 import java.util.concurrent.CancellationException;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.FutureTask;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41 import java.util.concurrent.atomic.AtomicBoolean;
42
43 import org.apache.hc.client5.http.classic.methods.HttpGet;
44 import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
45 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
46 import org.apache.hc.client5.http.protocol.HttpClientContext;
47 import org.apache.hc.core5.concurrent.FutureCallback;
48 import org.apache.hc.core5.http.ClassicHttpRequest;
49 import org.apache.hc.core5.http.ClassicHttpResponse;
50 import org.apache.hc.core5.http.HttpException;
51 import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
52 import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
53 import org.apache.hc.core5.http.io.HttpClientResponseHandler;
54 import org.apache.hc.core5.http.io.HttpRequestHandler;
55 import org.apache.hc.core5.http.protocol.HttpContext;
56 import org.hamcrest.CoreMatchers;
57 import org.junit.After;
58 import org.junit.Assert;
59 import org.junit.Before;
60 import org.junit.Rule;
61 import org.junit.Test;
62 import org.junit.rules.ExpectedException;
63
64 @SuppressWarnings("boxing")
65 public class TestFutureRequestExecutionService {
66
67 private HttpServer localServer;
68 private String uri;
69 private FutureRequestExecutionService httpAsyncClientWithFuture;
70
71 private final AtomicBoolean blocked = new AtomicBoolean(false);
72
73 @Rule
74 public ExpectedException thrown = ExpectedException.none();
75
76 @Before
77 public void before() throws Exception {
78 this.localServer = ServerBootstrap.bootstrap()
79 .register("/wait", new HttpRequestHandler() {
80
81 @Override
82 public void handle(
83 final ClassicHttpRequest request,
84 final ClassicHttpResponse response,
85 final HttpContext context) throws HttpException, IOException {
86 try {
87 while(blocked.get()) {
88 Thread.sleep(10);
89 }
90 } catch (final InterruptedException e) {
91 throw new IllegalStateException(e);
92 }
93 response.setCode(200);
94 }
95 }).create();
96
97 this.localServer.start();
98 uri = "http://localhost:" + this.localServer.getLocalPort() + "/wait";
99 final HttpClientConnectionManager cm = PoolingHttpClientConnectionManagerBuilder.create()
100 .setMaxConnPerRoute(5)
101 .build();
102 final CloseableHttpClient httpClient = HttpClientBuilder.create()
103 .setConnectionManager(cm)
104 .build();
105 final ExecutorService executorService = Executors.newFixedThreadPool(5);
106 httpAsyncClientWithFuture = new FutureRequestExecutionService(httpClient, executorService);
107 }
108
109 @After
110 public void after() throws Exception {
111 blocked.set(false);
112 this.localServer.stop();
113 httpAsyncClientWithFuture.close();
114 }
115
116 @Test
117 public void shouldExecuteSingleCall() throws InterruptedException, ExecutionException {
118 final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
119 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
120 Assert.assertTrue("request should have returned OK", task.get().booleanValue());
121 }
122
123 @Test
124 public void shouldCancel() throws InterruptedException, ExecutionException {
125 thrown.expect(CoreMatchers.anyOf(
126 CoreMatchers.instanceOf(CancellationException.class),
127 CoreMatchers.instanceOf(ExecutionException.class)));
128
129 final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
130 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
131 task.cancel(true);
132 task.get();
133 }
134
135 @Test
136 public void shouldTimeout() throws InterruptedException, ExecutionException, TimeoutException {
137 thrown.expect(TimeoutException.class);
138
139 blocked.set(true);
140 final FutureTask<Boolean> task = httpAsyncClientWithFuture.execute(
141 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
142 task.get(10, TimeUnit.MILLISECONDS);
143 }
144
145 @Test
146 public void shouldExecuteMultipleCalls() throws Exception {
147 final int reqNo = 100;
148 final Queue<Future<Boolean>> tasks = new LinkedList<>();
149 for(int i = 0; i < reqNo; i++) {
150 final Future<Boolean> task = httpAsyncClientWithFuture.execute(
151 new HttpGet(uri), HttpClientContext.create(), new OkidokiHandler());
152 tasks.add(task);
153 }
154 for (final Future<Boolean> task : tasks) {
155 final Boolean b = task.get();
156 Assert.assertNotNull(b);
157 Assert.assertTrue("request should have returned OK", b.booleanValue());
158 }
159 }
160
161 @Test
162 public void shouldExecuteMultipleCallsAndCallback() throws Exception {
163 final int reqNo = 100;
164 final Queue<Future<Boolean>> tasks = new LinkedList<>();
165 final CountDownLatch latch = new CountDownLatch(reqNo);
166 for(int i = 0; i < reqNo; i++) {
167 final Future<Boolean> task = httpAsyncClientWithFuture.execute(
168 new HttpGet(uri), HttpClientContext.create(),
169 new OkidokiHandler(), new CountingCallback(latch));
170 tasks.add(task);
171 }
172 Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
173 for (final Future<Boolean> task : tasks) {
174 final Boolean b = task.get();
175 Assert.assertNotNull(b);
176 Assert.assertTrue("request should have returned OK", b.booleanValue());
177 }
178 }
179
180 private final class CountingCallback implements FutureCallback<Boolean> {
181
182 private final CountDownLatch latch;
183
184 CountingCallback(final CountDownLatch latch) {
185 super();
186 this.latch = latch;
187 }
188
189 @Override
190 public void failed(final Exception ex) {
191 latch.countDown();
192 }
193
194 @Override
195 public void completed(final Boolean result) {
196 latch.countDown();
197 }
198
199 @Override
200 public void cancelled() {
201 latch.countDown();
202 }
203 }
204
205
206 private final class OkidokiHandler implements HttpClientResponseHandler<Boolean> {
207 @Override
208 public Boolean handleResponse(
209 final ClassicHttpResponse response) throws IOException {
210 return response.getCode() == 200;
211 }
212 }
213
214 }