View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.testserver;
29  
30  import java.io.IOException;
31  import java.net.InetSocketAddress;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.List;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.TimeUnit;
37  
38  import org.apache.http.HttpHost;
39  import org.apache.http.HttpRequest;
40  import org.apache.http.HttpResponse;
41  import org.apache.http.OoopsieRuntimeException;
42  import org.apache.http.concurrent.FutureCallback;
43  import org.apache.http.config.ConnectionConfig;
44  import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
45  import org.apache.http.impl.nio.DefaultNHttpClientConnection;
46  import org.apache.http.impl.nio.DefaultNHttpClientConnectionFactory;
47  import org.apache.http.impl.nio.pool.BasicNIOConnPool;
48  import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
49  import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
50  import org.apache.http.impl.nio.reactor.ExceptionEvent;
51  import org.apache.http.nio.NHttpClientConnection;
52  import org.apache.http.nio.NHttpClientEventHandler;
53  import org.apache.http.nio.pool.NIOConnFactory;
54  import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
55  import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
56  import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
57  import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
58  import org.apache.http.nio.protocol.HttpAsyncRequester;
59  import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
60  import org.apache.http.nio.reactor.ConnectingIOReactor;
61  import org.apache.http.nio.reactor.IOEventDispatch;
62  import org.apache.http.nio.reactor.IOReactorExceptionHandler;
63  import org.apache.http.nio.reactor.IOReactorStatus;
64  import org.apache.http.nio.reactor.IOSession;
65  import org.apache.http.nio.reactor.SessionRequest;
66  import org.apache.http.protocol.HttpContext;
67  import org.apache.http.protocol.HttpCoreContext;
68  import org.apache.http.protocol.HttpProcessor;
69  import org.apache.http.protocol.ImmutableHttpProcessor;
70  import org.apache.http.protocol.RequestConnControl;
71  import org.apache.http.protocol.RequestContent;
72  import org.apache.http.protocol.RequestExpectContinue;
73  import org.apache.http.protocol.RequestTargetHost;
74  import org.apache.http.protocol.RequestUserAgent;
75  
76  public class HttpClientNio {
77  
78      public static final HttpProcessor DEFAULT_HTTP_PROC = new ImmutableHttpProcessor(
79              new RequestContent(),
80              new RequestTargetHost(),
81              new RequestConnControl(),
82              new RequestUserAgent("TEST-CLIENT/1.1"),
83              new RequestExpectContinue(true));
84  
85      private final DefaultConnectingIOReactor ioReactor;
86      private final BasicNIOConnPool connpool;
87  
88      private volatile HttpProcessor httpProcessor;
89      private volatile HttpAsyncRequester executor;
90      private volatile IOReactorThread thread;
91      private volatile int timeout;
92  
93      public HttpClientNio(
94              final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory) throws IOException {
95          super();
96          this.ioReactor = new DefaultConnectingIOReactor();
97          this.ioReactor.setExceptionHandler(new SimpleIOReactorExceptionHandler());
98          this.connpool = new BasicNIOConnPool(this.ioReactor, new NIOConnFactory<HttpHost, NHttpClientConnection>() {
99  
100             @Override
101             public NHttpClientConnection create(
102                 final HttpHost route, final IOSession session) throws IOException {
103                 final NHttpClientConnection conn = connFactory.create(route, session);
104                 conn.setSocketTimeout(timeout);
105                 return conn;
106             }
107 
108         }, 0);
109     }
110 
111     public int getTimeout() {
112         return this.timeout;
113     }
114 
115     public void setTimeout(final int timeout) {
116         this.timeout = timeout;
117     }
118 
119     public void setMaxTotal(final int max) {
120         this.connpool.setMaxTotal(max);
121     }
122 
123     public void setMaxPerRoute(final int max) {
124         this.connpool.setDefaultMaxPerRoute(max);
125     }
126 
127     public void setHttpProcessor(final HttpProcessor httpProcessor) {
128         this.httpProcessor = httpProcessor;
129     }
130 
131     public Future<BasicNIOPoolEntry> lease(
132             final HttpHost host,
133             final FutureCallback<BasicNIOPoolEntry> callback) {
134         return this.connpool.lease(host, null, this.timeout, TimeUnit.MILLISECONDS, callback);
135     }
136 
137     public void release(final BasicNIOPoolEntry poolEntry, final boolean reusable) {
138         this.connpool.release(poolEntry, reusable);
139     }
140 
141     public <T> Future<T> execute(
142             final HttpAsyncRequestProducer requestProducer,
143             final HttpAsyncResponseConsumer<T> responseConsumer,
144             final HttpContext context,
145             final FutureCallback<T> callback) {
146         return executor.execute(requestProducer, responseConsumer, this.connpool,
147                 context != null ? context : HttpCoreContext.create(), callback);
148     }
149 
150     public <T> Future<List<T>> executePipelined(
151             final HttpHost target,
152             final List<HttpAsyncRequestProducer> requestProducers,
153             final List<HttpAsyncResponseConsumer<T>> responseConsumers,
154             final HttpContext context,
155             final FutureCallback<List<T>> callback) {
156         return executor.executePipelined(target, requestProducers, responseConsumers, this.connpool,
157                 context != null ? context : HttpCoreContext.create(), callback);
158     }
159 
160     public Future<HttpResponse> execute(
161             final HttpHost target,
162             final HttpRequest request,
163             final HttpContext context,
164             final FutureCallback<HttpResponse> callback) {
165         return execute(
166                 new BasicAsyncRequestProducer(target, request),
167                 new BasicAsyncResponseConsumer(),
168                 context != null ? context : HttpCoreContext.create(),
169                 callback);
170     }
171 
172     public Future<List<HttpResponse>> executePipelined(
173             final HttpHost target,
174             final List<HttpRequest> requests,
175             final HttpContext context,
176             final FutureCallback<List<HttpResponse>> callback) {
177         final List<HttpAsyncRequestProducer> requestProducers =
178                 new ArrayList<HttpAsyncRequestProducer>(requests.size());
179         final List<HttpAsyncResponseConsumer<HttpResponse>> responseConsumers =
180                 new ArrayList<HttpAsyncResponseConsumer<HttpResponse>>(requests.size());
181         for (final HttpRequest request: requests) {
182             requestProducers.add(new BasicAsyncRequestProducer(target, request));
183             responseConsumers.add(new BasicAsyncResponseConsumer());
184         }
185         return executor.executePipelined(target, requestProducers, responseConsumers, this.connpool,
186                 context != null ? context : HttpCoreContext.create(), callback);
187     }
188 
189     public Future<HttpResponse> execute(
190             final HttpHost target,
191             final HttpRequest request,
192             final HttpContext context) {
193         return execute(target, request, context, null);
194     }
195 
196     public Future<List<HttpResponse>> executePipelined(
197             final HttpHost target,
198             final List<HttpRequest> requests,
199             final HttpContext context) {
200         return executePipelined(target, requests, context, null);
201     }
202 
203     public Future<HttpResponse> execute(
204             final HttpHost target,
205             final HttpRequest request) {
206         return execute(target, request, null, null);
207     }
208 
209     public Future<List<HttpResponse>> executePipelined(
210             final HttpHost target,
211             final HttpRequest... requests) {
212         return executePipelined(target, Arrays.asList(requests), null, null);
213     }
214 
215     private void execute(final NHttpClientEventHandler clientHandler) throws IOException {
216         final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(clientHandler,
217             new DefaultNHttpClientConnectionFactory(ConnectionConfig.DEFAULT)) {
218 
219             @Override
220             protected DefaultNHttpClientConnection createConnection(final IOSession session) {
221                 final DefaultNHttpClientConnection conn = super.createConnection(session);
222                 conn.setSocketTimeout(timeout);
223                 return conn;
224             }
225 
226         };
227         this.ioReactor.execute(ioEventDispatch);
228     }
229 
230     public SessionRequest openConnection(final InetSocketAddress address, final Object attachment) {
231         final SessionRequest sessionRequest = this.ioReactor.connect(address, null, attachment, null);
232         sessionRequest.setConnectTimeout(this.timeout);
233         return sessionRequest;
234     }
235 
236     public void start() {
237         this.executor = new HttpAsyncRequester(this.httpProcessor != null ? this.httpProcessor : DEFAULT_HTTP_PROC);
238         this.thread = new IOReactorThread(new HttpAsyncRequestExecutor());
239         this.thread.start();
240     }
241 
242     public ConnectingIOReactor getIoReactor() {
243         return this.ioReactor;
244     }
245 
246     public IOReactorStatus getStatus() {
247         return this.ioReactor.getStatus();
248     }
249 
250     public List<ExceptionEvent> getAuditLog() {
251         return this.ioReactor.getAuditLog();
252     }
253 
254     public void join(final long timeout) throws InterruptedException {
255         if (this.thread != null) {
256             this.thread.join(timeout);
257         }
258     }
259 
260     public Exception getException() {
261         if (this.thread != null) {
262             return this.thread.getException();
263         } else {
264             return null;
265         }
266     }
267 
268     public void shutdown() throws IOException {
269         this.connpool.shutdown(2000);
270         try {
271             join(500);
272         } catch (final InterruptedException ignore) {
273         }
274     }
275 
276     private class IOReactorThread extends Thread {
277 
278         private final NHttpClientEventHandler clientHandler;
279 
280         private volatile Exception ex;
281 
282         public IOReactorThread(final NHttpClientEventHandler clientHandler) {
283             super();
284             this.clientHandler = clientHandler;
285         }
286 
287         @Override
288         public void run() {
289             try {
290                 execute(this.clientHandler);
291             } catch (final Exception ex) {
292                 this.ex = ex;
293             }
294         }
295 
296         public Exception getException() {
297             return this.ex;
298         }
299 
300     }
301 
302     static class SimpleIOReactorExceptionHandler implements IOReactorExceptionHandler {
303 
304         @Override
305         public boolean handle(final RuntimeException ex) {
306             if (!(ex instanceof OoopsieRuntimeException)) {
307                 ex.printStackTrace(System.out);
308             }
309             return false;
310         }
311 
312         @Override
313         public boolean handle(final IOException ex) {
314             ex.printStackTrace(System.out);
315             return false;
316         }
317 
318     }
319 
320 }