View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.http.nio.protocol;
29  
30  import java.io.IOException;
31  import java.net.SocketTimeoutException;
32  import java.util.Queue;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  
35  import org.apache.http.ConnectionClosedException;
36  import org.apache.http.ExceptionLogger;
37  import org.apache.http.HttpEntity;
38  import org.apache.http.HttpEntityEnclosingRequest;
39  import org.apache.http.HttpException;
40  import org.apache.http.HttpRequest;
41  import org.apache.http.HttpResponse;
42  import org.apache.http.HttpStatus;
43  import org.apache.http.HttpVersion;
44  import org.apache.http.ProtocolException;
45  import org.apache.http.ProtocolVersion;
46  import org.apache.http.annotation.Contract;
47  import org.apache.http.annotation.ThreadingBehavior;
48  import org.apache.http.nio.ContentDecoder;
49  import org.apache.http.nio.ContentEncoder;
50  import org.apache.http.nio.NHttpClientConnection;
51  import org.apache.http.nio.NHttpClientEventHandler;
52  import org.apache.http.nio.NHttpConnection;
53  import org.apache.http.protocol.HttpContext;
54  import org.apache.http.util.Args;
55  import org.apache.http.util.Asserts;
56  
57  /**
58   * {@code HttpAsyncRequestExecutor} is a fully asynchronous HTTP client side
59   * protocol handler based on the NIO (non-blocking) I/O model.
60   * {@code HttpAsyncRequestExecutor} translates individual events fired through
61   * the {@link NHttpClientEventHandler} interface into logically related HTTP
62   * message exchanges.
63   * <p> The caller is expected to pass an instance of
64   * {@link HttpAsyncClientExchangeHandler} to be used for the next series
65   * of HTTP message exchanges through the connection context using
66   * {@link #HTTP_HANDLER} attribute. HTTP exchange sequence is considered
67   * complete when the {@link HttpAsyncClientExchangeHandler#isDone()} method
68   * returns {@code true}. The {@link HttpAsyncRequester} utility class can
69   * be used to facilitate initiation of asynchronous HTTP request execution.
70   * <p>
71   * Individual {@code HttpAsyncClientExchangeHandler} are expected to make use of
72   * a {@link org.apache.http.protocol.HttpProcessor} to generate mandatory protocol
73   * headers for all outgoing messages and apply common, cross-cutting message
74   * transformations to all incoming and outgoing messages.
75   * {@code HttpAsyncClientExchangeHandler}s can delegate implementation of
76   * application specific content generation and processing to
77   * a {@link HttpAsyncRequestProducer} and a {@link HttpAsyncResponseConsumer}.
78   *
79   * @see HttpAsyncClientExchangeHandler
80   *
81   * @since 4.2
82   */
83  @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
84  public class HttpAsyncRequestExecutor implements NHttpClientEventHandler {
85  
86      public static final int DEFAULT_WAIT_FOR_CONTINUE = 3000;
87      public static final String HTTP_HANDLER = "http.nio.exchange-handler";
88  
89      private final int waitForContinue;
90      private final ExceptionLogger exceptionLogger;
91  
92      /**
93       * Creates new instance of {@code HttpAsyncRequestExecutor}.
94       * @param waitForContinue wait for continue time period.
95       * @param exceptionLogger Exception logger. If {@code null}
96       *   {@link ExceptionLogger#NO_OP} will be used. Please note that the exception
97       *   logger will be only used to log I/O exception thrown while closing
98       *   {@link java.io.Closeable} objects (such as {@link org.apache.http.HttpConnection}).
99       *
100      * @since 4.4
101      */
102     public HttpAsyncRequestExecutor(
103             final int waitForContinue,
104             final ExceptionLogger exceptionLogger) {
105         super();
106         this.waitForContinue = Args.positive(waitForContinue, "Wait for continue time");
107         this.exceptionLogger = exceptionLogger != null ? exceptionLogger : ExceptionLogger.NO_OP;
108     }
109 
110     /**
111      * Creates new instance of HttpAsyncRequestExecutor.
112      *
113      * @since 4.3
114      */
115     public HttpAsyncRequestExecutor(final int waitForContinue) {
116         this(waitForContinue, null);
117     }
118 
119     public HttpAsyncRequestExecutor() {
120         this(DEFAULT_WAIT_FOR_CONTINUE, null);
121     }
122 
123     private static boolean pipelining(final HttpAsyncClientExchangeHandler handler) {
124         return handler.getClass().getAnnotation(Pipelined.class) != null;
125     }
126 
127     @Override
128     public void connected(
129             final NHttpClientConnection conn,
130             final Object attachment) throws IOException, HttpException {
131         final State state = new State();
132         final HttpContext context = conn.getContext();
133         context.setAttribute(HTTP_EXCHANGE_STATE, state);
134         requestReady(conn);
135     }
136 
137     @Override
138     public void closed(final NHttpClientConnection conn) {
139         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
140         if (handler == null) {
141             return;
142         }
143         final State state = getState(conn);
144         if (state != null) {
145             if (state.getRequestState() != MessageState.READY || state.getResponseState() != MessageState.READY) {
146                 handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
147             }
148         }
149         if (!handler.isDone() && pipelining(handler)) {
150             handler.failed(new ConnectionClosedException("Connection closed unexpectedly"));
151         }
152         if (state == null || handler.isDone()) {
153             closeHandler(handler);
154         }
155     }
156 
157     @Override
158     public void exception(
159             final NHttpClientConnection conn, final Exception cause) {
160         shutdownConnection(conn);
161         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
162         if (handler != null) {
163             handler.failed(cause);
164         } else {
165             log(cause);
166         }
167     }
168 
169     @Override
170     public void requestReady(
171             final NHttpClientConnection conn) throws IOException, HttpException {
172         final State state = getState(conn);
173         Asserts.notNull(state, "Connection state");
174         Asserts.check(state.getRequestState() == MessageState.READY ||
175                         state.getRequestState() == MessageState.COMPLETED,
176                 "Unexpected request state %s", state.getRequestState());
177 
178         if (state.getRequestState() == MessageState.COMPLETED) {
179             conn.suspendOutput();
180             return;
181         }
182         final HttpContext context = conn.getContext();
183         final HttpAsyncClientExchangeHandler handler;
184         synchronized (context) {
185             handler = getHandler(conn);
186             if (handler == null || handler.isDone()) {
187                 conn.suspendOutput();
188                 return;
189             }
190         }
191         final boolean pipelined = pipelining(handler);
192 
193         final HttpRequest request = handler.generateRequest();
194         if (request == null) {
195             conn.suspendOutput();
196             return;
197         }
198         final ProtocolVersion version = request.getRequestLine().getProtocolVersion();
199         if (pipelined && version.lessEquals(HttpVersion.HTTP_1_0)) {
200             throw new ProtocolException(version + " cannot be used with request pipelining");
201         }
202         state.setRequest(request);
203         if (pipelined) {
204             state.getRequestQueue().add(request);
205         }
206         if (request instanceof HttpEntityEnclosingRequest) {
207             final boolean expectContinue = ((HttpEntityEnclosingRequest) request).expectContinue();
208             if (expectContinue && pipelined) {
209                 throw new ProtocolException("Expect-continue handshake cannot be used with request pipelining");
210             }
211             conn.submitRequest(request);
212             if (expectContinue) {
213                 final int timeout = conn.getSocketTimeout();
214                 state.setTimeout(timeout);
215                 conn.setSocketTimeout(this.waitForContinue);
216                 state.setRequestState(MessageState.ACK_EXPECTED);
217             } else {
218                 final HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
219                 if (entity != null) {
220                     state.setRequestState(MessageState.BODY_STREAM);
221                 } else {
222                     handler.requestCompleted();
223                     state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
224                 }
225             }
226         } else {
227             conn.submitRequest(request);
228             handler.requestCompleted();
229             state.setRequestState(pipelined ? MessageState.READY : MessageState.COMPLETED);
230         }
231     }
232 
233     @Override
234     public void outputReady(
235             final NHttpClientConnection conn,
236             final ContentEncoder encoder) throws IOException, HttpException {
237         final State state = getState(conn);
238         Asserts.notNull(state, "Connection state");
239         Asserts.check(state.getRequestState() == MessageState.BODY_STREAM ||
240                         state.getRequestState() == MessageState.ACK_EXPECTED,
241                 "Unexpected request state %s", state.getRequestState());
242 
243         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
244         Asserts.notNull(handler, "Client exchange handler");
245         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
246             conn.suspendOutput();
247             return;
248         }
249         handler.produceContent(encoder, conn);
250         if (encoder.isCompleted()) {
251             handler.requestCompleted();
252             state.setRequestState(pipelining(handler) ? MessageState.READY : MessageState.COMPLETED);
253         }
254     }
255 
256     @Override
257     public void responseReceived(
258             final NHttpClientConnection conn) throws HttpException, IOException {
259         final State state = getState(conn);
260         Asserts.notNull(state, "Connection state");
261         Asserts.check(state.getResponseState() == MessageState.READY,
262                 "Unexpected request state %s", state.getResponseState());
263 
264         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
265         Asserts.notNull(handler, "Client exchange handler");
266 
267         final HttpRequest request;
268         if (pipelining(handler)) {
269             request = state.getRequestQueue().poll();
270             Asserts.notNull(request, "HTTP request");
271         } else {
272             request = state.getRequest();
273             if (request == null) {
274                 throw new HttpException("Out of sequence response");
275             }
276         }
277 
278         final HttpResponse response = conn.getHttpResponse();
279 
280         final int statusCode = response.getStatusLine().getStatusCode();
281         if (statusCode < HttpStatus.SC_CONTINUE) {
282             throw new ProtocolException("Invalid response: " + response.getStatusLine());
283         }
284         if (statusCode < HttpStatus.SC_OK) {
285             // 1xx intermediate response
286             if (statusCode != HttpStatus.SC_CONTINUE) {
287                 throw new ProtocolException(
288                         "Unexpected response: " + response.getStatusLine());
289             }
290             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
291                 final int timeout = state.getTimeout();
292                 conn.setSocketTimeout(timeout);
293                 conn.requestOutput();
294                 state.setRequestState(MessageState.BODY_STREAM);
295             }
296             return;
297         }
298         state.setResponse(response);
299         if (state.getRequestState() == MessageState.ACK_EXPECTED) {
300             final int timeout = state.getTimeout();
301             conn.setSocketTimeout(timeout);
302             conn.resetOutput();
303             state.setRequestState(MessageState.COMPLETED);
304         } else if (state.getRequestState() == MessageState.BODY_STREAM) {
305             // Early response
306             if (statusCode >= 400) {
307                 conn.resetOutput();
308                 conn.suspendOutput();
309                 state.setRequestState(MessageState.COMPLETED);
310                 state.invalidate();
311             }
312         }
313 
314         if (canResponseHaveBody(request, response)) {
315             handler.responseReceived(response);
316             state.setResponseState(MessageState.BODY_STREAM);
317         } else {
318             response.setEntity(null);
319             handler.responseReceived(response);
320             conn.resetInput();
321             processResponse(conn, state, handler);
322         }
323     }
324 
325     @Override
326     public void inputReady(
327             final NHttpClientConnection conn,
328             final ContentDecoder decoder) throws IOException, HttpException {
329         final State state = getState(conn);
330         Asserts.notNull(state, "Connection state");
331         Asserts.check(state.getResponseState() == MessageState.BODY_STREAM,
332                 "Unexpected request state %s", state.getResponseState());
333 
334         final HttpAsyncClientExchangeHandler handler = getHandler(conn);
335         Asserts.notNull(handler, "Client exchange handler");
336         handler.consumeContent(decoder, conn);
337         if (decoder.isCompleted()) {
338             processResponse(conn, state, handler);
339         }
340     }
341 
342     @Override
343     public void endOfInput(final NHttpClientConnection conn) throws IOException {
344         final State state = getState(conn);
345         final HttpContext context = conn.getContext();
346         synchronized (context) {
347             if (state != null) {
348                 if (state.getRequestState().compareTo(MessageState.READY) != 0) {
349                     state.invalidate();
350                 }
351                 final HttpAsyncClientExchangeHandler handler = getHandler(conn);
352                 if (handler != null) {
353                     if (state.isValid()) {
354                         handler.inputTerminated();
355                     } else {
356                         handler.failed(new ConnectionClosedException());
357                     }
358                 }
359             }
360             // Closing connection in an orderly manner and
361             // waiting for output buffer to get flushed.
362             // Do not want to wait indefinitely, though, in case
363             // the opposite end is not reading
364             if (conn.getSocketTimeout() <= 0) {
365                 conn.setSocketTimeout(1000);
366             }
367             conn.close();
368         }
369     }
370 
371     @Override
372     public void timeout(
373             final NHttpClientConnection conn) throws IOException {
374         final State state = getState(conn);
375         if (state != null) {
376             if (state.getRequestState() == MessageState.ACK_EXPECTED) {
377                 final int timeout = state.getTimeout();
378                 conn.setSocketTimeout(timeout);
379                 conn.requestOutput();
380                 state.setRequestState(MessageState.BODY_STREAM);
381                 state.setTimeout(0);
382                 return;
383             }
384             state.invalidate();
385             final HttpAsyncClientExchangeHandler handler = getHandler(conn);
386             if (handler != null) {
387                 handler.failed(new SocketTimeoutException(
388                         String.format("%,d milliseconds timeout on connection %s", conn.getSocketTimeout(), conn)));
389                 handler.close();
390             }
391         }
392         if (conn.getStatus() == NHttpConnection.ACTIVE) {
393             conn.close();
394             if (conn.getStatus() == NHttpConnection.CLOSING) {
395                 // Give the connection some grace time to
396                 // close itself nicely
397                 conn.setSocketTimeout(250);
398             }
399         } else {
400             conn.shutdown();
401         }
402     }
403 
404     /**
405      * This method can be used to log I/O exception thrown while closing
406      * {@link java.io.Closeable} objects (such as
407      * {@link org.apache.http.HttpConnection}}).
408      *
409      * @param ex I/O exception thrown by {@link java.io.Closeable#close()}
410      */
411     protected void log(final Exception ex) {
412         this.exceptionLogger.log(ex);
413     }
414 
415     private static State getState(final NHttpConnection conn) {
416         return (State) conn.getContext().getAttribute(HTTP_EXCHANGE_STATE);
417     }
418 
419     private static HttpAsyncClientExchangeHandler getHandler(final NHttpConnection conn) {
420         return (HttpAsyncClientExchangeHandler) conn.getContext().getAttribute(HTTP_HANDLER);
421     }
422 
423     private void shutdownConnection(final NHttpConnection conn) {
424         try {
425             conn.shutdown();
426         } catch (final IOException ex) {
427             log(ex);
428         }
429     }
430 
431     private void closeHandler(final HttpAsyncClientExchangeHandler handler) {
432         if (handler != null) {
433             try {
434                 handler.close();
435             } catch (final IOException ioex) {
436                 log(ioex);
437             }
438         }
439     }
440 
441     private void processResponse(
442             final NHttpClientConnection conn,
443             final State state,
444             final HttpAsyncClientExchangeHandler handler) throws IOException, HttpException {
445         if (!state.isValid()) {
446             conn.close();
447         }
448         handler.responseCompleted();
449 
450         if (!pipelining(handler)) {
451             state.setRequestState(MessageState.READY);
452             state.setRequest(null);
453         }
454         state.setResponseState(MessageState.READY);
455         state.setResponse(null);
456         if (!handler.isDone() && conn.isOpen()) {
457             conn.requestOutput();
458         }
459     }
460 
461     private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
462 
463         final String method = request.getRequestLine().getMethod();
464         final int status = response.getStatusLine().getStatusCode();
465 
466         if (method.equalsIgnoreCase("HEAD")) {
467             return false;
468         }
469         if (method.equalsIgnoreCase("CONNECT") && status < 300) {
470             return false;
471         }
472         return status >= HttpStatus.SC_OK
473             && status != HttpStatus.SC_NO_CONTENT
474             && status != HttpStatus.SC_NOT_MODIFIED
475             && status != HttpStatus.SC_RESET_CONTENT;
476     }
477 
478     static final String HTTP_EXCHANGE_STATE = "http.nio.http-exchange-state";
479 
480     static class State {
481 
482         private final Queue<HttpRequest> requestQueue;
483         private volatile MessageState requestState;
484         private volatile MessageState responseState;
485         private volatile HttpRequest request;
486         private volatile HttpResponse response;
487         private volatile boolean valid;
488         private volatile int timeout;
489 
490         State() {
491             super();
492             this.requestQueue = new ConcurrentLinkedQueue<HttpRequest>();
493             this.valid = true;
494             this.requestState = MessageState.READY;
495             this.responseState = MessageState.READY;
496         }
497 
498         public MessageState getRequestState() {
499             return this.requestState;
500         }
501 
502         public void setRequestState(final MessageState state) {
503             this.requestState = state;
504         }
505 
506         public MessageState getResponseState() {
507             return this.responseState;
508         }
509 
510         public void setResponseState(final MessageState state) {
511             this.responseState = state;
512         }
513 
514         public HttpRequest getRequest() {
515             return this.request;
516         }
517 
518         public void setRequest(final HttpRequest request) {
519             this.request = request;
520         }
521 
522         public HttpResponse getResponse() {
523             return this.response;
524         }
525 
526         public void setResponse(final HttpResponse response) {
527             this.response = response;
528         }
529 
530         public Queue<HttpRequest> getRequestQueue() {
531             return this.requestQueue;
532         }
533 
534         public int getTimeout() {
535             return this.timeout;
536         }
537 
538         public void setTimeout(final int timeout) {
539             this.timeout = timeout;
540         }
541 
542         public boolean isValid() {
543             return this.valid;
544         }
545 
546         public void invalidate() {
547             this.valid = false;
548         }
549 
550         @Override
551         public String toString() {
552             final StringBuilder buf = new StringBuilder();
553             buf.append("request state: ");
554             buf.append(this.requestState);
555             buf.append("; request: ");
556             if (this.request != null) {
557                 buf.append(this.request.getRequestLine());
558             }
559             buf.append("; response state: ");
560             buf.append(this.responseState);
561             buf.append("; response: ");
562             if (this.response != null) {
563                 buf.append(this.response.getStatusLine());
564             }
565             buf.append("; valid: ");
566             buf.append(this.valid);
567             buf.append(";");
568             return buf.toString();
569         }
570 
571     }
572 
573 }