View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.EntityDetails;
35  import org.apache.hc.core5.http.Header;
36  import org.apache.hc.core5.http.HttpException;
37  import org.apache.hc.core5.http.HttpRequest;
38  import org.apache.hc.core5.http.HttpResponse;
39  import org.apache.hc.core5.http.HttpStatus;
40  import org.apache.hc.core5.http.HttpVersion;
41  import org.apache.hc.core5.http.Method;
42  import org.apache.hc.core5.http.ProtocolException;
43  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
44  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
45  import org.apache.hc.core5.http.impl.ServerSupport;
46  import org.apache.hc.core5.http.impl.nio.MessageState;
47  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
48  import org.apache.hc.core5.http.nio.AsyncPushProducer;
49  import org.apache.hc.core5.http.nio.AsyncResponseProducer;
50  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
51  import org.apache.hc.core5.http.nio.DataStreamChannel;
52  import org.apache.hc.core5.http.nio.HandlerFactory;
53  import org.apache.hc.core5.http.nio.ResponseChannel;
54  import org.apache.hc.core5.http.nio.support.BasicResponseProducer;
55  import org.apache.hc.core5.http.nio.support.ImmediateResponseExchangeHandler;
56  import org.apache.hc.core5.http.protocol.HttpContext;
57  import org.apache.hc.core5.http.protocol.HttpCoreContext;
58  import org.apache.hc.core5.http.protocol.HttpProcessor;
59  import org.apache.hc.core5.http2.H2ConnectionException;
60  import org.apache.hc.core5.http2.H2Error;
61  import org.apache.hc.core5.http2.H2StreamResetException;
62  import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
63  import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
64  import org.apache.hc.core5.util.Asserts;
65  
66  class ServerH2StreamHandler implements H2StreamHandler {
67  
68      private final H2StreamChannel outputChannel;
69      private final DataStreamChannel dataChannel;
70      private final ResponseChannel responseChannel;
71      private final HttpProcessor httpProcessor;
72      private final BasicHttpConnectionMetrics connMetrics;
73      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
74      private final HttpCoreContext context;
75      private final AtomicBoolean responseCommitted;
76      private final AtomicBoolean failed;
77      private final AtomicBoolean done;
78  
79      private volatile AsyncServerExchangeHandler exchangeHandler;
80      private volatile HttpRequest receivedRequest;
81      private volatile MessageState requestState;
82      private volatile MessageState responseState;
83  
84      ServerH2StreamHandler(
85              final H2StreamChannel outputChannel,
86              final HttpProcessor httpProcessor,
87              final BasicHttpConnectionMetrics connMetrics,
88              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
89              final HttpCoreContext context) {
90          this.outputChannel = outputChannel;
91          this.dataChannel = new DataStreamChannel() {
92  
93              @Override
94              public void requestOutput() {
95                  outputChannel.requestOutput();
96              }
97  
98              @Override
99              public int write(final ByteBuffer src) throws IOException {
100                 return outputChannel.write(src);
101             }
102 
103             @Override
104             public void endStream(final List<? extends Header> trailers) throws IOException {
105                 outputChannel.endStream(trailers);
106                 responseState = MessageState.COMPLETE;
107             }
108 
109             @Override
110             public void endStream() throws IOException {
111                 outputChannel.endStream();
112                 responseState = MessageState.COMPLETE;
113             }
114 
115         };
116         this.responseChannel = new ResponseChannel() {
117 
118             @Override
119             public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
120                 commitInformation(response);
121             }
122 
123             @Override
124             public void sendResponse(
125                     final HttpResponse response, final EntityDetails responseEntityDetails, final HttpContext httpContext) throws HttpException, IOException {
126                 ServerSupport.validateResponse(response, responseEntityDetails);
127                 commitResponse(response, responseEntityDetails);
128             }
129 
130             @Override
131             public void pushPromise(
132                     final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
133                 commitPromise(promise, pushProducer);
134             }
135 
136         };
137         this.httpProcessor = httpProcessor;
138         this.connMetrics = connMetrics;
139         this.exchangeHandlerFactory = exchangeHandlerFactory;
140         this.context = context;
141         this.responseCommitted = new AtomicBoolean(false);
142         this.failed = new AtomicBoolean(false);
143         this.done = new AtomicBoolean(false);
144         this.requestState = MessageState.HEADERS;
145         this.responseState = MessageState.IDLE;
146     }
147 
148     @Override
149     public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
150         return null;
151     }
152 
153     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
154         if (responseCommitted.get()) {
155             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
156         }
157         final int status = response.getCode();
158         if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
159             throw new HttpException("Invalid intermediate response: " + status);
160         }
161         final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
162         outputChannel.submit(responseHeaders, false);
163     }
164 
165     private void commitResponse(
166             final HttpResponse response,
167             final EntityDetails responseEntityDetails) throws HttpException, IOException {
168         if (responseCommitted.compareAndSet(false, true)) {
169 
170             final int status = response.getCode();
171             if (status < HttpStatus.SC_SUCCESS) {
172                 throw new HttpException("Invalid response: " + status);
173             }
174             context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
175             httpProcessor.process(response, responseEntityDetails, context);
176 
177             final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
178 
179             final boolean endStream = responseEntityDetails == null ||
180                     (receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod()));
181             outputChannel.submit(responseHeaders, endStream);
182             connMetrics.incrementResponseCount();
183             if (responseEntityDetails == null) {
184                 responseState = MessageState.COMPLETE;
185             } else {
186                 responseState = MessageState.BODY;
187                 exchangeHandler.produce(outputChannel);
188             }
189         } else {
190             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
191         }
192     }
193 
194     private void commitPromise(
195             final HttpRequest promise,
196             final AsyncPushProducer pushProducer) throws HttpException, IOException {
197 
198         httpProcessor.process(promise, null, context);
199 
200         final List<Header> promiseHeaders = DefaultH2RequestConverter.INSTANCE.convert(promise);
201         outputChannel.push(promiseHeaders, pushProducer);
202         connMetrics.incrementRequestCount();
203     }
204 
205     @Override
206     public void consumePromise(final List<Header> headers) throws HttpException, IOException {
207         throw new ProtocolException("Unexpected message promise");
208     }
209 
210     @Override
211     public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
212         if (done.get()) {
213             throw new ProtocolException("Unexpected message headers");
214         }
215         switch (requestState) {
216             case HEADERS:
217                 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
218 
219                 final HttpRequest request = DefaultH2RequestConverter.INSTANCE.convert(headers);
220                 final EntityDetails requestEntityDetails = endStream ? null : new IncomingEntityDetails(request, -1);
221 
222                 final AsyncServerExchangeHandler handler;
223                 try {
224                     handler = exchangeHandlerFactory != null ? exchangeHandlerFactory.create(request, context) : null;
225                 } catch (final ProtocolException ex) {
226                     throw new H2StreamResetException(H2Error.PROTOCOL_ERROR, ex.getMessage());
227                 }
228                 if (handler == null) {
229                     throw new H2StreamResetException(H2Error.REFUSED_STREAM, "Stream refused");
230                 }
231                 exchangeHandler = handler;
232 
233                 context.setProtocolVersion(HttpVersion.HTTP_2);
234                 context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
235 
236                 try {
237                     httpProcessor.process(request, requestEntityDetails, context);
238                     connMetrics.incrementRequestCount();
239                     receivedRequest = request;
240 
241                     exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
242                 } catch (final HttpException ex) {
243                     if (!responseCommitted.get()) {
244                         final AsyncResponseProducer responseProducer = new BasicResponseProducer(
245                                 ServerSupport.toStatusCode(ex),
246                                 ServerSupport.toErrorMessage(ex));
247                         exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
248                         exchangeHandler.handleRequest(request, requestEntityDetails, responseChannel, context);
249                     } else {
250                         throw ex;
251                     }
252                 }
253                 break;
254             case BODY:
255                 responseState = MessageState.COMPLETE;
256                 exchangeHandler.streamEnd(headers);
257                 break;
258             default:
259                 throw new ProtocolException("Unexpected message headers");
260         }
261     }
262 
263     @Override
264     public void updateInputCapacity() throws IOException {
265         Asserts.notNull(exchangeHandler, "Exchange handler");
266         exchangeHandler.updateCapacity(outputChannel);
267     }
268 
269     @Override
270     public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
271         if (done.get() || requestState != MessageState.BODY) {
272             throw new ProtocolException("Unexpected message data");
273         }
274         Asserts.notNull(exchangeHandler, "Exchange handler");
275         if (src != null) {
276             exchangeHandler.consume(src);
277         }
278         if (endStream) {
279             requestState = MessageState.COMPLETE;
280             exchangeHandler.streamEnd(null);
281         }
282     }
283 
284     @Override
285     public boolean isOutputReady() {
286         return responseState == MessageState.BODY && exchangeHandler != null && exchangeHandler.available() > 0;
287     }
288 
289     @Override
290     public void produceOutput() throws HttpException, IOException {
291         if (responseState == MessageState.BODY) {
292             Asserts.notNull(exchangeHandler, "Exchange handler");
293             exchangeHandler.produce(dataChannel);
294         }
295     }
296 
297     @Override
298     public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
299         if (done.get()) {
300             throw ex;
301         }
302         switch (requestState) {
303             case HEADERS:
304                 requestState = endStream ? MessageState.COMPLETE : MessageState.BODY;
305                 if (!responseCommitted.get()) {
306                     final AsyncResponseProducer responseProducer = new BasicResponseProducer(
307                             ServerSupport.toStatusCode(ex),
308                             ServerSupport.toErrorMessage(ex));
309                     exchangeHandler = new ImmediateResponseExchangeHandler(responseProducer);
310                     exchangeHandler.handleRequest(null, null, responseChannel, context);
311                 } else {
312                     throw ex;
313                 }
314                 break;
315             case BODY:
316                 responseState = MessageState.COMPLETE;
317             default:
318                 throw ex;
319         }
320     }
321 
322     @Override
323     public void failed(final Exception cause) {
324         try {
325             if (failed.compareAndSet(false, true)) {
326                 if (exchangeHandler != null) {
327                     exchangeHandler.failed(cause);
328                 }
329             }
330         } finally {
331             releaseResources();
332         }
333     }
334 
335     @Override
336     public void releaseResources() {
337         if (done.compareAndSet(false, true)) {
338             requestState = MessageState.COMPLETE;
339             responseState = MessageState.COMPLETE;
340             if (exchangeHandler != null) {
341                 exchangeHandler.releaseResources();
342             }
343         }
344     }
345 
346     @Override
347     public String toString() {
348         return "[" +
349                 "requestState=" + requestState +
350                 ", responseState=" + responseState +
351                 ']';
352     }
353 
354 }
355