View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HeaderElements;
38  import org.apache.hc.core5.http.HttpException;
39  import org.apache.hc.core5.http.HttpHeaders;
40  import org.apache.hc.core5.http.HttpRequest;
41  import org.apache.hc.core5.http.HttpResponse;
42  import org.apache.hc.core5.http.HttpStatus;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.ProtocolException;
45  import org.apache.hc.core5.http.ProtocolVersion;
46  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
47  import org.apache.hc.core5.http.config.Http1Config;
48  import org.apache.hc.core5.http.message.StatusLine;
49  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
50  import org.apache.hc.core5.http.nio.CapacityChannel;
51  import org.apache.hc.core5.http.nio.DataStreamChannel;
52  import org.apache.hc.core5.http.nio.RequestChannel;
53  import org.apache.hc.core5.http.nio.ResourceHolder;
54  import org.apache.hc.core5.http.protocol.HttpContext;
55  import org.apache.hc.core5.http.protocol.HttpCoreContext;
56  import org.apache.hc.core5.http.protocol.HttpProcessor;
57  import org.apache.hc.core5.util.Timeout;
58  
59  class ClientHttp1StreamHandler implements ResourceHolder {
60  
61      private final Http1StreamChannel<HttpRequest> outputChannel;
62      private final DataStreamChannel internalDataChannel;
63      private final HttpProcessor httpProcessor;
64      private final Http1Config http1Config;
65      private final ConnectionReuseStrategy connectionReuseStrategy;
66      private final AsyncClientExchangeHandler exchangeHandler;
67      private final HttpCoreContext context;
68      private final AtomicBoolean requestCommitted;
69      private final AtomicBoolean done;
70  
71      private volatile boolean keepAlive;
72      private volatile Timeout timeout;
73      private volatile HttpRequest committedRequest;
74      private volatile MessageState requestState;
75      private volatile MessageState responseState;
76  
77      ClientHttp1StreamHandler(
78              final Http1StreamChannel<HttpRequest> outputChannel,
79              final HttpProcessor httpProcessor,
80              final Http1Config http1Config,
81              final ConnectionReuseStrategy connectionReuseStrategy,
82              final AsyncClientExchangeHandler exchangeHandler,
83              final HttpCoreContext context) {
84          this.outputChannel = outputChannel;
85          this.internalDataChannel = new DataStreamChannel() {
86  
87              @Override
88              public void requestOutput() {
89                  outputChannel.requestOutput();
90              }
91  
92              @Override
93              public void endStream(final List<? extends Header> trailers) throws IOException {
94                  outputChannel.complete(trailers);
95                  requestState = MessageState.COMPLETE;
96              }
97  
98              @Override
99              public int write(final ByteBuffer src) throws IOException {
100                 return outputChannel.write(src);
101             }
102 
103             @Override
104             public void endStream() throws IOException {
105                 endStream(null);
106             }
107 
108         };
109 
110         this.httpProcessor = httpProcessor;
111         this.http1Config = http1Config;
112         this.connectionReuseStrategy = connectionReuseStrategy;
113         this.exchangeHandler = exchangeHandler;
114         this.context = context;
115         this.requestCommitted = new AtomicBoolean(false);
116         this.done = new AtomicBoolean(false);
117         this.keepAlive = true;
118         this.requestState = MessageState.IDLE;
119         this.responseState = MessageState.HEADERS;
120     }
121 
122     boolean isResponseFinal() {
123         return responseState == MessageState.COMPLETE;
124     }
125 
126     boolean isCompleted() {
127         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
128     }
129 
130     String getRequestMethod() {
131         return committedRequest != null ? committedRequest.getMethod() : null;
132     }
133 
134     boolean isOutputReady() {
135         switch (requestState) {
136             case IDLE:
137             case ACK:
138                 return true;
139             case BODY:
140                 return exchangeHandler.available() > 0;
141             default:
142                 return false;
143         }
144     }
145 
146     private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
147         if (requestCommitted.compareAndSet(false, true)) {
148             final ProtocolVersion transportVersion = request.getVersion();
149             if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
150                 throw new UnsupportedHttpVersionException(transportVersion);
151             }
152             context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
153             context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
154 
155             httpProcessor.process(request, entityDetails, context);
156 
157             final boolean endStream = entityDetails == null;
158             if (endStream) {
159                 outputChannel.submit(request, endStream, FlushMode.IMMEDIATE);
160                 committedRequest = request;
161                 requestState = MessageState.COMPLETE;
162             } else {
163                 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
164                 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
165                 outputChannel.submit(request, endStream, expectContinue ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
166                 committedRequest = request;
167                 if (expectContinue) {
168                     requestState = MessageState.ACK;
169                     timeout = outputChannel.getSocketTimeout();
170                     outputChannel.setSocketTimeout(http1Config.getWaitForContinueTimeout());
171                 } else {
172                     requestState = MessageState.BODY;
173                     exchangeHandler.produce(internalDataChannel);
174                 }
175             }
176         } else {
177             throw new HttpException("Request already committed");
178         }
179     }
180 
181     void produceOutput() throws HttpException, IOException {
182         switch (requestState) {
183             case IDLE:
184                 requestState = MessageState.HEADERS;
185                 exchangeHandler.produceRequest(new RequestChannel() {
186 
187                     @Override
188                     public void sendRequest(
189                             final HttpRequest request, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
190                         commitRequest(request, entityDetails);
191                     }
192 
193                 }, context);
194                 break;
195             case ACK:
196                 outputChannel.suspendOutput();
197                 break;
198             case BODY:
199                 exchangeHandler.produce(internalDataChannel);
200                 break;
201         }
202     }
203 
204     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
205         if (done.get() || responseState != MessageState.HEADERS) {
206             throw new ProtocolException("Unexpected message head");
207         }
208         final ProtocolVersion transportVersion = response.getVersion();
209         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
210             throw new UnsupportedHttpVersionException(transportVersion);
211         }
212 
213         final int status = response.getCode();
214         if (status < HttpStatus.SC_INFORMATIONAL) {
215             throw new ProtocolException("Invalid response: " + new StatusLine(response));
216         }
217         if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
218             exchangeHandler.consumeInformation(response, context);
219         } else {
220             if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
221                 keepAlive = false;
222             }
223         }
224         if (requestState == MessageState.ACK) {
225             if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
226                 outputChannel.setSocketTimeout(timeout);
227                 requestState = MessageState.BODY;
228                 if (status < HttpStatus.SC_CLIENT_ERROR) {
229                     exchangeHandler.produce(internalDataChannel);
230                 }
231             }
232         }
233         if (status < HttpStatus.SC_SUCCESS) {
234             return;
235         }
236         if (requestState == MessageState.BODY) {
237             if (status >= HttpStatus.SC_CLIENT_ERROR) {
238                 requestState = MessageState.COMPLETE;
239                 if (!outputChannel.abortGracefully()) {
240                     keepAlive = false;
241                 }
242             }
243         }
244 
245         context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
246         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
247         httpProcessor.process(response, entityDetails, context);
248 
249         if (entityDetails == null && !keepAlive) {
250             outputChannel.close();
251         }
252 
253         exchangeHandler.consumeResponse(response, entityDetails, context);
254         if (entityDetails == null) {
255             responseState = MessageState.COMPLETE;
256         } else {
257             responseState = MessageState.BODY;
258         }
259     }
260 
261     void consumeData(final ByteBuffer src) throws HttpException, IOException {
262         if (done.get() || responseState != MessageState.BODY) {
263             throw new ProtocolException("Unexpected message data");
264         }
265         exchangeHandler.consume(src);
266     }
267 
268     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
269         exchangeHandler.updateCapacity(capacityChannel);
270     }
271 
272     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
273         if (done.get() || responseState != MessageState.BODY) {
274             throw new ProtocolException("Unexpected message data");
275         }
276         if (!keepAlive) {
277             outputChannel.close();
278         }
279         responseState = MessageState.COMPLETE;
280         exchangeHandler.streamEnd(trailers);
281     }
282 
283     boolean handleTimeout() {
284         if (requestState == MessageState.ACK) {
285             requestState = MessageState.BODY;
286             outputChannel.setSocketTimeout(timeout);
287             outputChannel.requestOutput();
288             return true;
289         }
290         return false;
291     }
292 
293     void failed(final Exception cause) {
294         if (!done.get()) {
295             exchangeHandler.failed(cause);
296         }
297     }
298 
299     @Override
300     public void releaseResources() {
301         if (done.compareAndSet(false, true)) {
302             responseState = MessageState.COMPLETE;
303             requestState = MessageState.COMPLETE;
304             exchangeHandler.releaseResources();
305         }
306     }
307 
308     void appendState(final StringBuilder buf) {
309         buf.append("requestState=").append(requestState)
310                 .append(", responseState=").append(responseState)
311                 .append(", responseCommitted=").append(requestCommitted)
312                 .append(", keepAlive=").append(keepAlive)
313                 .append(", done=").append(done);
314     }
315 
316     @Override
317     public String toString() {
318         final StringBuilder buf = new StringBuilder();
319         buf.append("[");
320         appendState(buf);
321         buf.append("]");
322         return buf.toString();
323     }
324 
325 }
326