View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HeaderElements;
38  import org.apache.hc.core5.http.HttpException;
39  import org.apache.hc.core5.http.HttpHeaders;
40  import org.apache.hc.core5.http.HttpRequest;
41  import org.apache.hc.core5.http.HttpResponse;
42  import org.apache.hc.core5.http.HttpStatus;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.ProtocolException;
45  import org.apache.hc.core5.http.ProtocolVersion;
46  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
47  import org.apache.hc.core5.http.config.Http1Config;
48  import org.apache.hc.core5.http.message.StatusLine;
49  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
50  import org.apache.hc.core5.http.nio.CapacityChannel;
51  import org.apache.hc.core5.http.nio.DataStreamChannel;
52  import org.apache.hc.core5.http.nio.ResourceHolder;
53  import org.apache.hc.core5.http.protocol.HttpCoreContext;
54  import org.apache.hc.core5.http.protocol.HttpProcessor;
55  import org.apache.hc.core5.util.Timeout;
56  
57  class ClientHttp1StreamHandler implements ResourceHolder {
58  
59      private final Http1StreamChannel<HttpRequest> outputChannel;
60      private final DataStreamChannel internalDataChannel;
61      private final HttpProcessor httpProcessor;
62      private final Http1Config http1Config;
63      private final ConnectionReuseStrategy connectionReuseStrategy;
64      private final AsyncClientExchangeHandler exchangeHandler;
65      private final HttpCoreContext context;
66      private final AtomicBoolean requestCommitted;
67      private final AtomicBoolean done;
68  
69      private volatile boolean keepAlive;
70      private volatile Timeout timeout;
71      private volatile HttpRequest committedRequest;
72      private volatile MessageState requestState;
73      private volatile MessageState responseState;
74  
75      ClientHttp1StreamHandler(
76              final Http1StreamChannel<HttpRequest> outputChannel,
77              final HttpProcessor httpProcessor,
78              final Http1Config http1Config,
79              final ConnectionReuseStrategy connectionReuseStrategy,
80              final AsyncClientExchangeHandler exchangeHandler,
81              final HttpCoreContext context) {
82          this.outputChannel = outputChannel;
83          this.internalDataChannel = new DataStreamChannel() {
84  
85              @Override
86              public void requestOutput() {
87                  outputChannel.requestOutput();
88              }
89  
90              @Override
91              public void endStream(final List<? extends Header> trailers) throws IOException {
92                  outputChannel.complete(trailers);
93                  requestState = MessageState.COMPLETE;
94              }
95  
96              @Override
97              public int write(final ByteBuffer src) throws IOException {
98                  return outputChannel.write(src);
99              }
100 
101             @Override
102             public void endStream() throws IOException {
103                 endStream(null);
104             }
105 
106         };
107 
108         this.httpProcessor = httpProcessor;
109         this.http1Config = http1Config;
110         this.connectionReuseStrategy = connectionReuseStrategy;
111         this.exchangeHandler = exchangeHandler;
112         this.context = context;
113         this.requestCommitted = new AtomicBoolean(false);
114         this.done = new AtomicBoolean(false);
115         this.keepAlive = true;
116         this.requestState = MessageState.IDLE;
117         this.responseState = MessageState.HEADERS;
118     }
119 
120     boolean isResponseFinal() {
121         return responseState == MessageState.COMPLETE;
122     }
123 
124     boolean isCompleted() {
125         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
126     }
127 
128     String getRequestMethod() {
129         return committedRequest != null ? committedRequest.getMethod() : null;
130     }
131 
132     boolean isOutputReady() {
133         switch (requestState) {
134             case IDLE:
135             case ACK:
136                 return true;
137             case BODY:
138                 return exchangeHandler.available() > 0;
139             default:
140                 return false;
141         }
142     }
143 
144     private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws IOException, HttpException {
145         if (requestCommitted.compareAndSet(false, true)) {
146             final ProtocolVersion transportVersion = request.getVersion();
147             if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
148                 throw new UnsupportedHttpVersionException(transportVersion);
149             }
150             context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
151             context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
152 
153             httpProcessor.process(request, entityDetails, context);
154 
155             final boolean endStream = entityDetails == null;
156             if (endStream) {
157                 outputChannel.submit(request, true, FlushMode.IMMEDIATE);
158                 committedRequest = request;
159                 requestState = MessageState.COMPLETE;
160             } else {
161                 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
162                 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
163                 outputChannel.submit(request, false, expectContinue ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
164                 committedRequest = request;
165                 if (expectContinue) {
166                     requestState = MessageState.ACK;
167                     timeout = outputChannel.getSocketTimeout();
168                     outputChannel.setSocketTimeout(http1Config.getWaitForContinueTimeout());
169                 } else {
170                     requestState = MessageState.BODY;
171                     exchangeHandler.produce(internalDataChannel);
172                 }
173             }
174         } else {
175             throw new HttpException("Request already committed");
176         }
177     }
178 
179     void produceOutput() throws HttpException, IOException {
180         switch (requestState) {
181             case IDLE:
182                 requestState = MessageState.HEADERS;
183                 exchangeHandler.produceRequest((request, entityDetails, httpContext) -> commitRequest(request, entityDetails), context);
184                 break;
185             case ACK:
186                 outputChannel.suspendOutput();
187                 break;
188             case BODY:
189                 exchangeHandler.produce(internalDataChannel);
190                 break;
191         }
192     }
193 
194     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
195         if (done.get() || responseState != MessageState.HEADERS) {
196             throw new ProtocolException("Unexpected message head");
197         }
198         final ProtocolVersion transportVersion = response.getVersion();
199         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
200             throw new UnsupportedHttpVersionException(transportVersion);
201         }
202 
203         final int status = response.getCode();
204         if (status < HttpStatus.SC_INFORMATIONAL) {
205             throw new ProtocolException("Invalid response: " + new StatusLine(response));
206         }
207         if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
208             exchangeHandler.consumeInformation(response, context);
209         } else {
210             if (!connectionReuseStrategy.keepAlive(committedRequest, response, context)) {
211                 keepAlive = false;
212             }
213         }
214         if (requestState == MessageState.ACK) {
215             if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
216                 outputChannel.setSocketTimeout(timeout);
217                 requestState = MessageState.BODY;
218                 if (status < HttpStatus.SC_CLIENT_ERROR) {
219                     exchangeHandler.produce(internalDataChannel);
220                 }
221             }
222         }
223         if (status < HttpStatus.SC_SUCCESS) {
224             return;
225         }
226         if (requestState == MessageState.BODY) {
227             if (status >= HttpStatus.SC_CLIENT_ERROR) {
228                 requestState = MessageState.COMPLETE;
229                 if (!outputChannel.abortGracefully()) {
230                     keepAlive = false;
231                 }
232             }
233         }
234 
235         context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
236         context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
237         httpProcessor.process(response, entityDetails, context);
238 
239         if (entityDetails == null && !keepAlive) {
240             outputChannel.close();
241         }
242 
243         exchangeHandler.consumeResponse(response, entityDetails, context);
244         if (entityDetails == null) {
245             responseState = MessageState.COMPLETE;
246         } else {
247             responseState = MessageState.BODY;
248         }
249     }
250 
251     void consumeData(final ByteBuffer src) throws HttpException, IOException {
252         if (done.get() || responseState != MessageState.BODY) {
253             throw new ProtocolException("Unexpected message data");
254         }
255         exchangeHandler.consume(src);
256     }
257 
258     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
259         exchangeHandler.updateCapacity(capacityChannel);
260     }
261 
262     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
263         if (done.get() || responseState != MessageState.BODY) {
264             throw new ProtocolException("Unexpected message data");
265         }
266         if (!keepAlive) {
267             outputChannel.close();
268         }
269         responseState = MessageState.COMPLETE;
270         exchangeHandler.streamEnd(trailers);
271     }
272 
273     boolean handleTimeout() {
274         if (requestState == MessageState.ACK) {
275             requestState = MessageState.BODY;
276             outputChannel.setSocketTimeout(timeout);
277             outputChannel.requestOutput();
278             return true;
279         }
280         return false;
281     }
282 
283     void failed(final Exception cause) {
284         if (!done.get()) {
285             exchangeHandler.failed(cause);
286         }
287     }
288 
289     @Override
290     public void releaseResources() {
291         if (done.compareAndSet(false, true)) {
292             responseState = MessageState.COMPLETE;
293             requestState = MessageState.COMPLETE;
294             exchangeHandler.releaseResources();
295         }
296     }
297 
298     void appendState(final StringBuilder buf) {
299         buf.append("requestState=").append(requestState)
300                 .append(", responseState=").append(responseState)
301                 .append(", responseCommitted=").append(requestCommitted)
302                 .append(", keepAlive=").append(keepAlive)
303                 .append(", done=").append(done);
304     }
305 
306     @Override
307     public String toString() {
308         final StringBuilder buf = new StringBuilder();
309         buf.append("[");
310         appendState(buf);
311         buf.append("]");
312         return buf.toString();
313     }
314 
315 }
316