View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.EntityDetails;
35  import org.apache.hc.core5.http.Header;
36  import org.apache.hc.core5.http.HttpException;
37  import org.apache.hc.core5.http.HttpRequest;
38  import org.apache.hc.core5.http.HttpResponse;
39  import org.apache.hc.core5.http.HttpStatus;
40  import org.apache.hc.core5.http.HttpVersion;
41  import org.apache.hc.core5.http.ProtocolException;
42  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
43  import org.apache.hc.core5.http.impl.nio.MessageState;
44  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
45  import org.apache.hc.core5.http.nio.AsyncPushProducer;
46  import org.apache.hc.core5.http.nio.DataStreamChannel;
47  import org.apache.hc.core5.http.nio.HandlerFactory;
48  import org.apache.hc.core5.http.nio.ResponseChannel;
49  import org.apache.hc.core5.http.protocol.HttpContext;
50  import org.apache.hc.core5.http.protocol.HttpCoreContext;
51  import org.apache.hc.core5.http.protocol.HttpProcessor;
52  import org.apache.hc.core5.http2.H2ConnectionException;
53  import org.apache.hc.core5.http2.H2Error;
54  import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
55  import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
56  
57  class ServerPushH2StreamHandler implements H2StreamHandler {
58  
59      private final H2StreamChannel outputChannel;
60      private final DataStreamChannel dataChannel;
61      private final HttpProcessor httpProcessor;
62      private final BasicHttpConnectionMetrics connMetrics;
63      private final AsyncPushProducer pushProducer;
64      private final HttpCoreContext context;
65      private final AtomicBoolean responseCommitted;
66      private final AtomicBoolean failed;
67      private final AtomicBoolean done;
68  
69      private volatile MessageState requestState;
70      private volatile MessageState responseState;
71  
72      ServerPushH2StreamHandler(
73              final H2StreamChannel outputChannel,
74              final HttpProcessor httpProcessor,
75              final BasicHttpConnectionMetrics connMetrics,
76              final AsyncPushProducer pushProducer,
77              final HttpCoreContext context) {
78          this.outputChannel = outputChannel;
79          this.dataChannel = new DataStreamChannel() {
80  
81              @Override
82              public void requestOutput() {
83                  outputChannel.requestOutput();
84              }
85  
86              @Override
87              public int write(final ByteBuffer src) throws IOException {
88                  return outputChannel.write(src);
89              }
90  
91              @Override
92              public void endStream(final List<? extends Header> trailers) throws IOException {
93                  outputChannel.endStream(trailers);
94                  responseState = MessageState.COMPLETE;
95              }
96  
97              @Override
98              public void endStream() throws IOException {
99                  outputChannel.endStream();
100                 responseState = MessageState.COMPLETE;
101             }
102 
103         };
104         this.httpProcessor = httpProcessor;
105         this.connMetrics = connMetrics;
106         this.pushProducer = pushProducer;
107         this.context = context;
108         this.responseCommitted = new AtomicBoolean(false);
109         this.failed = new AtomicBoolean(false);
110         this.done = new AtomicBoolean(false);
111         this.requestState = MessageState.COMPLETE;
112         this.responseState = MessageState.IDLE;
113     }
114 
115     @Override
116     public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
117         return null;
118     }
119 
120     @Override
121     public void consumePromise(final List<Header> headers) throws HttpException, IOException {
122         throw new ProtocolException("Unexpected message promise");
123     }
124 
125     @Override
126     public void consumeHeader(final List<Header> requestHeaders, final boolean requestEndStream) throws HttpException, IOException {
127         throw new ProtocolException("Unexpected message headers");
128     }
129 
130     @Override
131     public void updateInputCapacity() throws IOException {
132     }
133 
134     @Override
135     public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
136         throw new ProtocolException("Unexpected message data");
137     }
138 
139     @Override
140     public boolean isOutputReady() {
141         switch (responseState) {
142             case IDLE:
143                 return true;
144             case BODY:
145                 return pushProducer.available() > 0;
146             default:
147                 return false;
148         }
149     }
150 
151     private void commitInformation(final HttpResponse response) throws IOException, HttpException {
152         if (responseCommitted.get()) {
153             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Response already committed");
154         }
155         final int status = response.getCode();
156         if (status < HttpStatus.SC_INFORMATIONAL || status >= HttpStatus.SC_SUCCESS) {
157             throw new HttpException("Invalid intermediate response: " + status);
158         }
159         final List<Header> responseHeaders = DefaultH2ResponseConverter.INSTANCE.convert(response);
160         outputChannel.submit(responseHeaders, false);
161     }
162 
163     private void commitResponse(
164             final HttpResponse response,
165             final EntityDetails responseEntityDetails) throws HttpException, IOException {
166         if (responseCommitted.compareAndSet(false, true)) {
167 
168             context.setProtocolVersion(HttpVersion.HTTP_2);
169             context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
170             httpProcessor.process(response, responseEntityDetails, context);
171 
172             final List<Header> headers = DefaultH2ResponseConverter.INSTANCE.convert(response);
173             outputChannel.submit(headers, responseEntityDetails == null);
174             connMetrics.incrementResponseCount();
175             if (responseEntityDetails == null) {
176                 responseState = MessageState.COMPLETE;
177             } else {
178                 responseState = MessageState.BODY;
179                 pushProducer.produce(outputChannel);
180             }
181         }
182     }
183 
184     private void commitPromise(
185             final HttpRequest promise,
186             final AsyncPushProducer pushProducer) throws HttpException, IOException {
187 
188         context.setProtocolVersion(HttpVersion.HTTP_2);
189         context.setAttribute(HttpCoreContext.HTTP_REQUEST, promise);
190         httpProcessor.process(promise, null, context);
191 
192         final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(promise);
193 
194         outputChannel.push(headers, pushProducer);
195         connMetrics.incrementRequestCount();
196     }
197 
198     @Override
199     public void produceOutput() throws HttpException, IOException {
200         switch (responseState) {
201             case IDLE:
202                 responseState = MessageState.HEADERS;
203                 pushProducer.produceResponse(new ResponseChannel() {
204 
205                     @Override
206                     public void sendInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
207                         commitInformation(response);
208                     }
209 
210                     @Override
211                     public void sendResponse(
212                             final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
213                         commitResponse(response, entityDetails);
214                     }
215 
216                     @Override
217                     public void pushPromise(
218                             final HttpRequest promise, final AsyncPushProducer pushProducer, final HttpContext httpContext) throws HttpException, IOException {
219                         commitPromise(promise, pushProducer);
220                     }
221 
222                 }, context);
223                 break;
224             case BODY:
225                 pushProducer.produce(dataChannel);
226                 break;
227         }
228     }
229 
230     @Override
231     public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
232         throw ex;
233     }
234 
235     @Override
236     public void failed(final Exception cause) {
237         try {
238             if (failed.compareAndSet(false, true)) {
239                 pushProducer.failed(cause);
240             }
241         } finally {
242             releaseResources();
243         }
244     }
245 
246     @Override
247     public void releaseResources() {
248         if (done.compareAndSet(false, true)) {
249             requestState = MessageState.COMPLETE;
250             responseState = MessageState.COMPLETE;
251             pushProducer.releaseResources();
252         }
253     }
254 
255     @Override
256     public String toString() {
257         return "[" +
258                 "requestState=" + requestState +
259                 ", responseState=" + responseState +
260                 ']';
261     }
262 
263 }
264