View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.integration;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.Collection;
35  import java.util.List;
36  import java.util.Queue;
37  import java.util.concurrent.ConcurrentLinkedQueue;
38  import java.util.concurrent.ExecutionException;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  
42  import org.apache.http.ConnectionClosedException;
43  import org.apache.http.HttpEntityEnclosingRequest;
44  import org.apache.http.HttpException;
45  import org.apache.http.HttpHeaders;
46  import org.apache.http.HttpHost;
47  import org.apache.http.HttpRequest;
48  import org.apache.http.HttpResponse;
49  import org.apache.http.HttpStatus;
50  import org.apache.http.HttpVersion;
51  import org.apache.http.entity.ContentType;
52  import org.apache.http.entity.StringEntity;
53  import org.apache.http.message.BasicHttpEntityEnclosingRequest;
54  import org.apache.http.message.BasicHttpRequest;
55  import org.apache.http.message.BasicHttpResponse;
56  import org.apache.http.nio.entity.NStringEntity;
57  import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
58  import org.apache.http.nio.protocol.BasicAsyncRequestHandler;
59  import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
60  import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
61  import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
62  import org.apache.http.nio.protocol.HttpAsyncExchange;
63  import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
64  import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
65  import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
66  import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
67  import org.apache.http.nio.reactor.ListenerEndpoint;
68  import org.apache.http.nio.testserver.HttpCoreNIOTestBase;
69  import org.apache.http.protocol.HttpContext;
70  import org.apache.http.protocol.HttpProcessor;
71  import org.apache.http.protocol.HttpRequestHandler;
72  import org.apache.http.protocol.ImmutableHttpProcessor;
73  import org.apache.http.protocol.RequestConnControl;
74  import org.apache.http.protocol.RequestContent;
75  import org.apache.http.protocol.RequestTargetHost;
76  import org.apache.http.protocol.RequestUserAgent;
77  import org.apache.http.util.EntityUtils;
78  import org.junit.After;
79  import org.junit.Assert;
80  import org.junit.Before;
81  import org.junit.Test;
82  import org.junit.runner.RunWith;
83  import org.junit.runners.Parameterized;
84  
85  /**
86   * HttpCore NIO integration tests for pipelined request processing.
87   */
88  @RunWith(Parameterized.class)
89  public class TestHttpAsyncHandlersPipelining extends HttpCoreNIOTestBase {
90  
91      private final static long RESULT_TIMEOUT_SEC = 30;
92  
93      @Parameterized.Parameters(name = "{0}")
94      public static Collection<Object[]> protocols() {
95          return Arrays.asList(new Object[][]{
96                  {ProtocolScheme.http},
97                  {ProtocolScheme.https},
98          });
99      }
100 
101     public TestHttpAsyncHandlersPipelining(final ProtocolScheme scheme) {
102         super(scheme);
103     }
104 
105     public static final HttpProcessor DEFAULT_HTTP_PROC = new ImmutableHttpProcessor(
106             new RequestContent(),
107             new RequestTargetHost(),
108             new RequestConnControl(),
109             new RequestUserAgent("TEST-CLIENT/1.1"));
110 
111     @Before
112     public void setUp() throws Exception {
113         initServer();
114         initClient();
115     }
116 
117     @After
118     public void tearDown() throws Exception {
119         shutDownClient();
120         shutDownServer();
121     }
122 
123     private HttpHost start() throws Exception {
124         this.server.start();
125         this.client.setHttpProcessor(DEFAULT_HTTP_PROC);
126         this.client.start();
127 
128         final ListenerEndpoint endpoint = this.server.getListenerEndpoint();
129         endpoint.waitFor();
130 
131         final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
132         return new HttpHost("localhost", address.getPort(), getScheme().name());
133     }
134 
135     private static String createRequestUri(final String pattern, final int count) {
136         return pattern + "x" + count;
137     }
138 
139     private static String createExpectedString(final String pattern, final int count) {
140         final StringBuilder buffer = new StringBuilder();
141         for (int i = 0; i < count; i++) {
142             buffer.append(pattern);
143         }
144         return buffer.toString();
145     }
146 
147     @Test
148     public void testHttpGets() throws Exception {
149         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
150         final HttpHost target = start();
151 
152         this.client.setMaxPerRoute(3);
153         this.client.setMaxTotal(3);
154 
155         final String pattern = RndTestPatternGenerator.generateText();
156         final int count = RndTestPatternGenerator.generateCount(1000);
157 
158         final String expectedPattern = createExpectedString(pattern, count);
159 
160         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
161         for (int i = 0; i < 10; i++) {
162             final String requestUri = createRequestUri(pattern, count);
163             final Future<List<HttpResponse>> future = this.client.executePipelined(target,
164                     new BasicHttpRequest("GET", requestUri),
165                     new BasicHttpRequest("GET", requestUri),
166                     new BasicHttpRequest("GET", requestUri));
167             queue.add(future);
168         }
169 
170         while (!queue.isEmpty()) {
171             final Future<List<HttpResponse>> future = queue.remove();
172             final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
173             Assert.assertNotNull(responses);
174             Assert.assertEquals(3, responses.size());
175             for (final HttpResponse response: responses) {
176                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
177                 Assert.assertEquals(expectedPattern, EntityUtils.toString(response.getEntity()));
178             }
179         }
180     }
181 
182     @Test
183     public void testHttpHeads() throws Exception {
184         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
185         final HttpHost target = start();
186 
187         this.client.setMaxPerRoute(3);
188         this.client.setMaxTotal(3);
189 
190         final String pattern = RndTestPatternGenerator.generateText();
191         final int count = RndTestPatternGenerator.generateCount(1000);
192 
193         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
194         for (int i = 0; i < 10; i++) {
195             final String requestUri = createRequestUri(pattern, count);
196             final HttpRequest head1 = new BasicHttpRequest("HEAD", requestUri);
197             final HttpRequest head2 = new BasicHttpRequest("HEAD", requestUri);
198             final BasicHttpEntityEnclosingRequest post1 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
199             post1.setEntity(new NStringEntity("stuff", ContentType.TEXT_PLAIN));
200             final Future<List<HttpResponse>> future = this.client.executePipelined(target, head1, head2, post1);
201             queue.add(future);
202         }
203 
204         while (!queue.isEmpty()) {
205             final Future<List<HttpResponse>> future = queue.remove();
206             final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
207             Assert.assertNotNull(responses);
208             Assert.assertEquals(3, responses.size());
209             for (final HttpResponse response: responses) {
210                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
211             }
212         }
213     }
214 
215     @Test
216     public void testHttpPosts() throws Exception {
217         this.server.registerHandler("*", new BasicAsyncRequestHandler(new SimpleRequestHandler()));
218         final HttpHost target = start();
219 
220         this.client.setMaxPerRoute(3);
221         this.client.setMaxTotal(3);
222 
223         final String pattern = RndTestPatternGenerator.generateText();
224         final int count = RndTestPatternGenerator.generateCount(1000);
225 
226         final String expectedPattern = createExpectedString(pattern, count);
227 
228         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
229         for (int i = 0; i < 10; i++) {
230             final String requestUri = createRequestUri(pattern, count);
231             final HttpEntityEnclosingRequest request1 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
232             final NStringEntity entity1 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
233             entity1.setChunked(RndTestPatternGenerator.generateBoolean());
234             request1.setEntity(entity1);
235             final HttpEntityEnclosingRequest request2 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
236             final NStringEntity entity2 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
237             entity2.setChunked(RndTestPatternGenerator.generateBoolean());
238             request2.setEntity(entity2);
239             final HttpEntityEnclosingRequest request3 = new BasicHttpEntityEnclosingRequest("POST", requestUri);
240             final NStringEntity entity3 = new NStringEntity(expectedPattern, ContentType.DEFAULT_TEXT);
241             entity3.setChunked(RndTestPatternGenerator.generateBoolean());
242             request3.setEntity(entity3);
243             final Future<List<HttpResponse>> future = this.client.executePipelined(target,
244                     request1, request2, request3);
245             queue.add(future);
246         }
247 
248         while (!queue.isEmpty()) {
249             final Future<List<HttpResponse>> future = queue.remove();
250             final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
251             Assert.assertNotNull(responses);
252             Assert.assertEquals(3, responses.size());
253             for (final HttpResponse response: responses) {
254                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
255                 Assert.assertEquals(expectedPattern, EntityUtils.toString(response.getEntity()));
256             }
257         }
258     }
259 
260     @Test
261     public void testHttpDelayedResponse() throws Exception {
262 
263         class DelayedRequestHandler implements HttpAsyncRequestHandler<HttpRequest> {
264 
265             private final SimpleRequestHandler requestHandler;
266 
267             public DelayedRequestHandler() {
268                 super();
269                 this.requestHandler = new SimpleRequestHandler();
270             }
271 
272             @Override
273             public HttpAsyncRequestConsumer<HttpRequest> processRequest(
274                     final HttpRequest request,
275                     final HttpContext context) {
276                 return new BasicAsyncRequestConsumer();
277             }
278 
279             @Override
280             public void handle(
281                     final HttpRequest request,
282                     final HttpAsyncExchange httpexchange,
283                     final HttpContext context) throws HttpException, IOException {
284                 final BasicHttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_1, HttpStatus.SC_OK, "OK");
285                 new Thread() {
286                     @Override
287                     public void run() {
288                         // Wait a bit, to make sure this is delayed.
289                         try { Thread.sleep(100); } catch(final InterruptedException ie) {}
290                         // Set the entity after delaying...
291                         try {
292                             requestHandler.handle(request, response, context);
293                         } catch (final Exception ex) {
294                             response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
295                         }
296                         httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
297                     }
298                 }.start();
299             }
300 
301         }
302 
303         this.server.registerHandler("*", new DelayedRequestHandler());
304         final HttpHost target = start();
305 
306         this.client.setMaxPerRoute(3);
307         this.client.setMaxTotal(3);
308 
309         final String pattern1 = RndTestPatternGenerator.generateText();
310         final String pattern2 = RndTestPatternGenerator.generateText();
311         final String pattern3 = RndTestPatternGenerator.generateText();
312         final int count = RndTestPatternGenerator.generateCount(1000);
313 
314         final String expectedPattern1 = createExpectedString(pattern1, count);
315         final String expectedPattern2 = createExpectedString(pattern2, count);
316         final String expectedPattern3 = createExpectedString(pattern3, count);
317 
318         final Queue<Future<List<HttpResponse>>> queue = new ConcurrentLinkedQueue<Future<List<HttpResponse>>>();
319         for (int i = 0; i < 1; i++) {
320             final HttpRequest request1 = new BasicHttpRequest("GET", createRequestUri(pattern1, count));
321             final HttpEntityEnclosingRequest request2 = new BasicHttpEntityEnclosingRequest("POST",
322                     createRequestUri(pattern2, count));
323             final NStringEntity entity2 = new NStringEntity(expectedPattern2, ContentType.DEFAULT_TEXT);
324             entity2.setChunked(RndTestPatternGenerator.generateBoolean());
325             request2.setEntity(entity2);
326             final HttpEntityEnclosingRequest request3 = new BasicHttpEntityEnclosingRequest("POST",
327                     createRequestUri(pattern3, count));
328             final NStringEntity entity3 = new NStringEntity(expectedPattern3, ContentType.DEFAULT_TEXT);
329             entity3.setChunked(RndTestPatternGenerator.generateBoolean());
330             request3.setEntity(entity3);
331             final Future<List<HttpResponse>> future = this.client.executePipelined(target,
332                     request1, request2, request3);
333             queue.add(future);
334         }
335 
336         while (!queue.isEmpty()) {
337             final Future<List<HttpResponse>> future = queue.remove();
338             final List<HttpResponse> responses = future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
339             Assert.assertNotNull(responses);
340             Assert.assertEquals(3, responses.size());
341             for (final HttpResponse response: responses) {
342                 Assert.assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
343             }
344             Assert.assertEquals(expectedPattern1, EntityUtils.toString(responses.get(0).getEntity()));
345             Assert.assertEquals(expectedPattern2, EntityUtils.toString(responses.get(1).getEntity()));
346             Assert.assertEquals(expectedPattern3, EntityUtils.toString(responses.get(2).getEntity()));
347         }
348     }
349 
350     @Test
351     public void testUnexpectedConnectionClosure() throws Exception {
352         this.server.registerHandler("*", new BasicAsyncRequestHandler(new HttpRequestHandler() {
353 
354             @Override
355             public void handle(
356                     final HttpRequest request,
357                     final HttpResponse response,
358                     final HttpContext context) throws HttpException, IOException {
359                 response.setStatusCode(HttpStatus.SC_OK);
360                 response.setEntity(new StringEntity("all is well", ContentType.TEXT_PLAIN));
361             }
362 
363         }));
364         this.server.registerHandler("/boom", new BasicAsyncRequestHandler(new HttpRequestHandler() {
365 
366             @Override
367             public void handle(
368                     final HttpRequest request,
369                     final HttpResponse response,
370                     final HttpContext context) throws HttpException, IOException {
371                 response.setStatusCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
372                 response.setHeader(HttpHeaders.CONNECTION, "Close");
373                 response.setEntity(new StringEntity("boooooom!!!!!", ContentType.TEXT_PLAIN));
374             }
375 
376         }));
377         final HttpHost target = start();
378 
379         this.client.setMaxPerRoute(3);
380         this.client.setMaxTotal(3);
381 
382         for (int i = 0; i < 3; i++) {
383 
384             final HttpAsyncRequestProducer p1 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/"));
385             final HttpAsyncRequestProducer p2 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/boom"));
386             final HttpAsyncRequestProducer p3 = new BasicAsyncRequestProducer(target, new BasicHttpRequest("GET", "/"));
387             final List<HttpAsyncRequestProducer> requestProducers = new ArrayList<HttpAsyncRequestProducer>();
388             requestProducers.add(p1);
389             requestProducers.add(p2);
390             requestProducers.add(p3);
391 
392             final HttpAsyncResponseConsumer<HttpResponse> c1 = new BasicAsyncResponseConsumer();
393             final HttpAsyncResponseConsumer<HttpResponse> c2 = new BasicAsyncResponseConsumer();
394             final HttpAsyncResponseConsumer<HttpResponse> c3 = new BasicAsyncResponseConsumer();
395             final List<HttpAsyncResponseConsumer<HttpResponse>> responseConsumers = new ArrayList<HttpAsyncResponseConsumer<HttpResponse>>();
396             responseConsumers.add(c1);
397             responseConsumers.add(c2);
398             responseConsumers.add(c3);
399 
400             final Future<List<HttpResponse>> future = this.client.executePipelined(target, requestProducers, responseConsumers, null, null);
401             try {
402                 future.get(RESULT_TIMEOUT_SEC, TimeUnit.SECONDS);
403             } catch (final ExecutionException ex) {
404                 final Throwable cause = ex.getCause();
405                 Assert.assertTrue(cause instanceof ConnectionClosedException);
406             }
407 
408             Assert.assertTrue(c1.isDone());
409             Assert.assertNotNull(c1.getResult());
410             Assert.assertTrue(c2.isDone());
411             Assert.assertNotNull(c2.getResult());
412             Assert.assertTrue(c3.isDone());
413             Assert.assertNull(c3.getResult());
414         }
415     }
416 
417 }