View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  package org.apache.hc.core5.http2.impl.nio;
28  
29  import java.io.IOException;
30  import java.nio.ByteBuffer;
31  import java.util.List;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  
34  import org.apache.hc.core5.http.EntityDetails;
35  import org.apache.hc.core5.http.Header;
36  import org.apache.hc.core5.http.HeaderElements;
37  import org.apache.hc.core5.http.HttpException;
38  import org.apache.hc.core5.http.HttpHeaders;
39  import org.apache.hc.core5.http.HttpRequest;
40  import org.apache.hc.core5.http.HttpResponse;
41  import org.apache.hc.core5.http.HttpStatus;
42  import org.apache.hc.core5.http.HttpVersion;
43  import org.apache.hc.core5.http.ProtocolException;
44  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
45  import org.apache.hc.core5.http.impl.IncomingEntityDetails;
46  import org.apache.hc.core5.http.impl.nio.MessageState;
47  import org.apache.hc.core5.http.message.StatusLine;
48  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
49  import org.apache.hc.core5.http.nio.AsyncPushConsumer;
50  import org.apache.hc.core5.http.nio.DataStreamChannel;
51  import org.apache.hc.core5.http.nio.HandlerFactory;
52  import org.apache.hc.core5.http.nio.RequestChannel;
53  import org.apache.hc.core5.http.protocol.HttpContext;
54  import org.apache.hc.core5.http.protocol.HttpCoreContext;
55  import org.apache.hc.core5.http.protocol.HttpProcessor;
56  import org.apache.hc.core5.http2.H2ConnectionException;
57  import org.apache.hc.core5.http2.H2Error;
58  import org.apache.hc.core5.http2.impl.DefaultH2RequestConverter;
59  import org.apache.hc.core5.http2.impl.DefaultH2ResponseConverter;
60  
61  class ClientH2StreamHandler implements H2StreamHandler {
62  
63      private final H2StreamChannel outputChannel;
64      private final DataStreamChannel dataChannel;
65      private final HttpProcessor httpProcessor;
66      private final BasicHttpConnectionMetrics connMetrics;
67      private final AsyncClientExchangeHandler exchangeHandler;
68      private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
69      private final HttpCoreContext context;
70      private final AtomicBoolean requestCommitted;
71      private final AtomicBoolean failed;
72      private final AtomicBoolean done;
73  
74      private volatile MessageState requestState;
75      private volatile MessageState responseState;
76  
77      ClientH2StreamHandler(
78              final H2StreamChannel outputChannel,
79              final HttpProcessor httpProcessor,
80              final BasicHttpConnectionMetrics connMetrics,
81              final AsyncClientExchangeHandler exchangeHandler,
82              final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
83              final HttpCoreContext context) {
84          this.outputChannel = outputChannel;
85          this.dataChannel = new DataStreamChannel() {
86  
87              @Override
88              public void requestOutput() {
89                  outputChannel.requestOutput();
90              }
91  
92              @Override
93              public int write(final ByteBuffer src) throws IOException {
94                  return outputChannel.write(src);
95              }
96  
97              @Override
98              public void endStream(final List<? extends Header> trailers) throws IOException {
99                  outputChannel.endStream(trailers);
100                 requestState = MessageState.COMPLETE;
101             }
102 
103             @Override
104             public void endStream() throws IOException {
105                 outputChannel.endStream();
106                 requestState = MessageState.COMPLETE;
107             }
108 
109         };
110         this.httpProcessor = httpProcessor;
111         this.connMetrics = connMetrics;
112         this.exchangeHandler = exchangeHandler;
113         this.pushHandlerFactory = pushHandlerFactory;
114         this.context = context;
115         this.requestCommitted = new AtomicBoolean(false);
116         this.failed = new AtomicBoolean(false);
117         this.done = new AtomicBoolean(false);
118         this.requestState = MessageState.HEADERS;
119         this.responseState = MessageState.HEADERS;
120     }
121 
122     @Override
123     public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
124         return pushHandlerFactory;
125     }
126 
127     @Override
128     public boolean isOutputReady() {
129         switch (requestState) {
130             case HEADERS:
131                 return true;
132             case BODY:
133                 return exchangeHandler.available() > 0;
134             default:
135                 return false;
136         }
137     }
138 
139     private void commitRequest(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
140         if (requestCommitted.compareAndSet(false, true)) {
141             context.setProtocolVersion(HttpVersion.HTTP_2);
142             context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
143 
144             httpProcessor.process(request, entityDetails, context);
145 
146             final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
147             outputChannel.submit(headers, entityDetails == null);
148             connMetrics.incrementRequestCount();
149 
150             if (entityDetails == null) {
151                 requestState = MessageState.COMPLETE;
152             } else {
153                 final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
154                 final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
155                 if (expectContinue) {
156                     requestState = MessageState.ACK;
157                 } else {
158                     requestState = MessageState.BODY;
159                     exchangeHandler.produce(dataChannel);
160                 }
161             }
162         } else {
163             throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Request already committed");
164         }
165     }
166 
167     @Override
168     public void produceOutput() throws HttpException, IOException {
169         switch (requestState) {
170             case HEADERS:
171                 exchangeHandler.produceRequest(new RequestChannel() {
172 
173                     @Override
174                     public void sendRequest(
175                             final HttpRequest request, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
176                         commitRequest(request, entityDetails);
177                     }
178 
179                 }, context);
180                 break;
181             case BODY:
182                 exchangeHandler.produce(dataChannel);
183                 break;
184         }
185     }
186 
187     @Override
188     public void consumePromise(final List<Header> headers) throws HttpException, IOException {
189         throw new ProtocolException("Unexpected message promise");
190     }
191 
192     @Override
193     public void consumeHeader(final List<Header> headers, final boolean endStream) throws HttpException, IOException {
194         if (done.get()) {
195             throw new ProtocolException("Unexpected message headers");
196         }
197         switch (responseState) {
198             case HEADERS:
199                 final HttpResponse response = DefaultH2ResponseConverter.INSTANCE.convert(headers);
200                 final int status = response.getCode();
201                 if (status < HttpStatus.SC_INFORMATIONAL) {
202                     throw new ProtocolException("Invalid response: " + new StatusLine(response));
203                 }
204                 if (status > HttpStatus.SC_CONTINUE && status < HttpStatus.SC_SUCCESS) {
205                     exchangeHandler.consumeInformation(response, context);
206                 }
207                 if (requestState == MessageState.ACK) {
208                     if (status == HttpStatus.SC_CONTINUE || status >= HttpStatus.SC_SUCCESS) {
209                         requestState = MessageState.BODY;
210                         exchangeHandler.produce(dataChannel);
211                     }
212                 }
213                 if (status < HttpStatus.SC_SUCCESS) {
214                     return;
215                 }
216 
217                 final EntityDetails entityDetails = endStream ? null : new IncomingEntityDetails(response, -1);
218                 context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
219                 httpProcessor.process(response, entityDetails, context);
220                 connMetrics.incrementResponseCount();
221 
222                 exchangeHandler.consumeResponse(response, entityDetails, context);
223                 responseState = endStream ? MessageState.COMPLETE : MessageState.BODY;
224                 break;
225             case BODY:
226                 responseState = MessageState.COMPLETE;
227                 exchangeHandler.streamEnd(headers);
228                 break;
229             default:
230                 throw new ProtocolException("Unexpected message headers");
231         }
232     }
233 
234     @Override
235     public void updateInputCapacity() throws IOException {
236         exchangeHandler.updateCapacity(outputChannel);
237     }
238 
239     @Override
240     public void consumeData(final ByteBuffer src, final boolean endStream) throws HttpException, IOException {
241         if (done.get() || responseState != MessageState.BODY) {
242             throw new ProtocolException("Unexpected message data");
243         }
244         if (src != null) {
245             exchangeHandler.consume(src);
246         }
247         if (endStream) {
248             responseState = MessageState.COMPLETE;
249             exchangeHandler.streamEnd(null);
250         }
251     }
252 
253     @Override
254     public void handle(final HttpException ex, final boolean endStream) throws HttpException, IOException {
255         throw ex;
256     }
257 
258     @Override
259     public void failed(final Exception cause) {
260         try {
261             if (failed.compareAndSet(false, true)) {
262                 if (exchangeHandler != null) {
263                     exchangeHandler.failed(cause);
264                 }
265             }
266         } finally {
267             releaseResources();
268         }
269     }
270 
271     @Override
272     public void releaseResources() {
273         if (done.compareAndSet(false, true)) {
274             responseState = MessageState.COMPLETE;
275             requestState = MessageState.COMPLETE;
276             exchangeHandler.releaseResources();
277         }
278     }
279 
280     @Override
281     public String toString() {
282         return "[" +
283                 "requestState=" + requestState +
284                 ", responseState=" + responseState +
285                 ']';
286     }
287 
288 }
289