View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.ConnectionReuseStrategy;
35  import org.apache.hc.core5.http.EntityDetails;
36  import org.apache.hc.core5.http.Header;
37  import org.apache.hc.core5.http.HeaderElements;
38  import org.apache.hc.core5.http.HttpException;
39  import org.apache.hc.core5.http.HttpHeaders;
40  import org.apache.hc.core5.http.HttpRequest;
41  import org.apache.hc.core5.http.HttpResponse;
42  import org.apache.hc.core5.http.HttpStatus;
43  import org.apache.hc.core5.http.HttpVersion;
44  import org.apache.hc.core5.http.Method;
45  import org.apache.hc.core5.http.MisdirectedRequestException;
46  import org.apache.hc.core5.http.ProtocolException;
47  import org.apache.hc.core5.http.ProtocolVersion;
48  import org.apache.hc.core5.http.UnsupportedHttpVersionException;
49  import org.apache.hc.core5.http.impl.ServerSupport;
50  import org.apache.hc.core5.http.message.BasicHttpResponse;
51  import org.apache.hc.core5.http.nio.AsyncPushProducer;
52  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
53  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
54  import org.apache.hc.core5.http.nio.CapacityChannel;
55  import org.apache.hc.core5.http.nio.DataStreamChannel;
56  import org.apache.hc.core5.http.nio.HandlerFactory;
57  import org.apache.hc.core5.http.nio.ResourceHolder;
58  import org.apache.hc.core5.http.nio.ResponseChannel;
59  import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
60  import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
61  import org.apache.hc.core5.http.protocol.HttpContext;
62  import org.apache.hc.core5.http.protocol.HttpCoreContext;
63  import org.apache.hc.core5.http.protocol.HttpProcessor;
64  
65  class ServerHttp1StreamHandler implements ResourceHolder {
66  
67      private final Http1StreamChannel<HttpResponse> outputChannel;
68      private final DataStreamChannel internalDataChannel;
69      private final ResponseChannel responseChannel;
70      private final HttpProcessor httpProcessor;
71      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
72      private final ConnectionReuseStrategy connectionReuseStrategy;
73      private final HttpCoreContext context;
74      private final AtomicBoolean responseCommitted;
75      private final AtomicBoolean done;
76  
77      private volatile boolean keepAlive;
78      private volatile AsyncServerExchangeHandler exchangeHandler;
79      private volatile HttpRequest receivedRequest;
80      private volatile MessageState requestState;
81      private volatile MessageState responseState;
82  
83      ServerHttp1StreamHandler(
84              final Http1StreamChannel<HttpResponse> outputChannel,
85              final HttpProcessor httpProcessor,
86              final ConnectionReuseStrategy connectionReuseStrategy,
87              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
88              final HttpCoreContext context) {
89          this.outputChannel = outputChannel;
90          this.internalDataChannel = new DataStreamChannel() {
91  
92              @Override
93              public void requestOutput() {
94                  outputChannel.requestOutput();
95              }
96  
97              @Override
98              public void endStream(final List<? extends Header> trailers) throws IOException {
99                  outputChannel.complete(trailers);
100                 if (!keepAlive) {
101                     outputChannel.close();
102                 }
103                 responseState = MessageState.COMPLETE;
104             }
105 
106             @Override
107             public int write(final ByteBuffer src) throws IOException {
108                 return outputChannel.write(src);
109             }
110 
111             @Override
112             public void endStream() throws IOException {
113                 endStream(null);
114             }
115 
116         };
117 
118         this.responseChannel = new ResponseChannel() {
119 
120             @Override
121             public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
122                 commitInformation(response);
123             }
124 
125             @Override
126             public void sendResponse(
127                     final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
128                 ServerSupport.validateResponse(response, responseEntityDetails);
129                 commitResponse(response, responseEntityDetails);
130             }
131 
132             @Override
133             public void pushPromise(
134                     final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
135                 commitPromise();
136             }
137 
138             @Override
139             public String toString() {
140                 return super.toString() + " " + ServerHttp1StreamHandler.this;
141             }
142 
143         };
144 
145         this.httpProcessor = httpProcessor;
146         this.connectionReuseStrategy = connectionReuseStrategy;
147         this.exchangeHandlerFactory = exchangeHandlerFactory;
148         this.context = context;
149         this.responseCommitted = new AtomicBoolean(false);
150         this.done = new AtomicBoolean(false);
151         this.keepAlive = true;
152         this.requestState = MessageState.HEADERS;
153         this.responseState = MessageState.IDLE;
154     }
155 
156     private void commitResponse(
157             final HttpResponse response,
158             final EntityDetails responseEntityDetails) throws HttpException, IOException {
159         if (responseCommitted.compareAndSet(false, true)) {
160 
161             final ProtocolVersion transportVersion = response.getVersion();
162             if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
163                 throw new UnsupportedHttpVersionException(transportVersion);
164             }
165 
166             final int status = response.getCode();
167             if (status < HttpStatus.SC_SUCCESS) {
168                 throw new HttpException("Invalid response: " + status);
169             }
170 
171             context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
172             context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
173             httpProcessor.process(response, responseEntityDetails, context);
174 
175             final boolean endStream = responseEntityDetails == null ||
176                     (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
177 
178             if (!connectionReuseStrategy.keepAlive(receivedRequest, response, context)) {
179                 keepAlive = false;
180             }
181 
182             outputChannel.submit(response, endStream, endStream ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
183             if (endStream) {
184                 if (!keepAlive) {
185                     outputChannel.close();
186                 }
187                 responseState = MessageState.COMPLETE;
188             } else {
189                 responseState = MessageState.BODY;
190                 exchangeHandler.produce(internalDataChannel);
191             }
192         } else {
193             throw new HttpException("Response already committed");
194         }
195     }
196 
197     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
198         if (responseCommitted.get()) {
199             throw new HttpException("Response already committed");
200         }
201         final int status = response.getCode();
202         if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
203             throw new HttpException("Invalid intermediate response: " + status);
204         }
205         outputChannel.submit(response, true, FlushMode.IMMEDIATE);
206     }
207 
208     private void commitPromise() throws HttpException {
209         throw new HttpException("HTTP/1.1 does not support server push");
210     }
211 
212     void activateChannel() throws IOException, HttpException {
213         outputChannel.activate();
214     }
215 
216     boolean isResponseFinal() {
217         return responseState == MessageState.COMPLETE;
218     }
219 
220     boolean keepAlive() {
221         return keepAlive;
222     }
223 
224     boolean isCompleted() {
225         return requestState == MessageState.COMPLETE && responseState == MessageState.COMPLETE;
226     }
227 
228     void terminateExchange(final HttpException ex) throws HttpException, IOException {
229         if (done.get() || requestState != MessageState.HEADERS) {
230             throw new ProtocolException("Unexpected message head");
231         }
232         receivedRequest = null;
233         requestState = MessageState.COMPLETE;
234         final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
235         response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
236         final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
237         exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
238         exchangeHandler.handleRequest(null, null, responseChannel, context);
239     }
240 
241     void consumeHeader(final HttpRequest request, final EntityDetails requestEntityDetails) throws HttpException, IOException {
242         if (done.get() || requestState != MessageState.HEADERS) {
243             throw new ProtocolException("Unexpected message head");
244         }
245         receivedRequest = request;
246         requestState = requestEntityDetails == null ? MessageState.COMPLETE : MessageState.BODY;
247 
248         AsyncServerExchangeHandler handler;
249         try {
250             handler = exchangeHandlerFactory.create(request, context);
251         } catch (final MisdirectedRequestException ex) {
252             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_MISDIRECTED_REQUEST, ex.getMessage());
253         } catch (final HttpException ex) {
254             handler =  new ImmediateResponseExchangeHandler(HttpStatus.SC_INTERNAL_SERVER_ERROR, ex.getMessage());
255         }
256         if (handler == null) {
257             handler = new ImmediateResponseExchangeHandler(HttpStatus.SC_NOT_FOUND, "Cannot handle request");
258         }
259 
260         exchangeHandler = handler;
261 
262         final ProtocolVersion transportVersion = request.getVersion();
263         if (transportVersion != null && transportVersion.greaterEquals(HttpVersion.HTTP_2)) {
264             throw new UnsupportedHttpVersionException(transportVersion);
265         }
266         context.setProtocolVersion(transportVersion != null ? transportVersion : HttpVersion.HTTP_1_1);
267         context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
268 
269         try {
270             httpProcessor.process(request, requestEntityDetails, context);
271             exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
272         } catch (final HttpException ex) {
273             if (!responseCommitted.get()) {
274                 final HttpResponse response = new BasicHttpResponse(ServerSupport.toStatusCode(ex));
275                 response.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
276                 final AsyncResponseProducer responseProducer = new BasicResponseProducer(response, ServerSupport.toErrorMessage(ex));
277                 exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
278                 exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
279             } else {
280                 throw ex;
281             }
282         }
283 
284     }
285 
286     boolean isOutputReady() {
287         switch (responseState) {
288             case BODY:
289                 return exchangeHandler.available() > 0;
290             default:
291                 return false;
292         }
293     }
294 
295     void produceOutput() throws HttpException, IOException {
296         switch (responseState) {
297             case BODY:
298                 exchangeHandler.produce(internalDataChannel);
299                 break;
300         }
301     }
302 
303     void consumeData(final ByteBuffer src) throws HttpException, IOException {
304         if (done.get() || requestState != MessageState.BODY) {
305             throw new ProtocolException("Unexpected message data");
306         }
307         if (responseState == MessageState.ACK) {
308             outputChannel.requestOutput();
309         }
310         exchangeHandler.consume(src);
311     }
312 
313     void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
314         exchangeHandler.updateCapacity(capacityChannel);
315     }
316 
317     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
318         if (done.get() || requestState != MessageState.BODY) {
319             throw new ProtocolException("Unexpected message data");
320         }
321         requestState = MessageState.COMPLETE;
322         exchangeHandler.streamEnd(trailers);
323     }
324 
325     void failed(final Exception cause) {
326         if (!done.get()) {
327             exchangeHandler.failed(cause);
328         }
329     }
330 
331     @Override
332     public void releaseResources() {
333         if (done.compareAndSet(false, true)) {
334             requestState = MessageState.COMPLETE;
335             responseState = MessageState.COMPLETE;
336             exchangeHandler.releaseResources();
337         }
338     }
339 
340     void appendState(final StringBuilder buf) {
341         buf.append("requestState=").append(requestState)
342                 .append(", responseState=").append(responseState)
343                 .append(", responseCommitted=").append(responseCommitted)
344                 .append(", keepAlive=").append(keepAlive)
345                 .append(", done=").append(done);
346     }
347 
348     @Override
349     public String toString() {
350         final StringBuilder buf = new StringBuilder();
351         buf.append("[");
352         appendState(buf);
353         buf.append("]");
354         return buf.toString();
355     }
356 
357 }
358