View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.LengthRequiredException;
49  import org.apache.hc.core5.http.config.CharCodingConfig;
50  import org.apache.hc.core5.http.config.Http1Config;
51  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
52  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
53  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
54  import org.apache.hc.core5.http.impl.Http1StreamListener;
55  import org.apache.hc.core5.http.message.MessageSupport;
56  import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
57  import org.apache.hc.core5.http.nio.CapacityChannel;
58  import org.apache.hc.core5.http.nio.ContentDecoder;
59  import org.apache.hc.core5.http.nio.ContentEncoder;
60  import org.apache.hc.core5.http.nio.NHttpMessageParser;
61  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
62  import org.apache.hc.core5.http.nio.SessionInputBuffer;
63  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
64  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
65  import org.apache.hc.core5.http.protocol.HttpCoreContext;
66  import org.apache.hc.core5.http.protocol.HttpProcessor;
67  import org.apache.hc.core5.io.CloseMode;
68  import org.apache.hc.core5.reactor.ProtocolIOSession;
69  import org.apache.hc.core5.util.Args;
70  import org.apache.hc.core5.util.Asserts;
71  import org.apache.hc.core5.util.Timeout;
72  
73  /**
74   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
75   * client side HTTP/1.1 messaging protocol with full support for
76   * duplexed message transmission and message pipelining.
77   *
78   * @since 5.0
79   */
80  @Internal
81  public class ClientHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpResponse, HttpRequest> {
82  
83      private final HttpProcessor httpProcessor;
84      private final ConnectionReuseStrategy connectionReuseStrategy;
85      private final Http1Config http1Config;
86      private final Http1StreamListener streamListener;
87      private final Queue<ClientHttp1StreamHandler> pipeline;
88      private final Http1StreamChannel<HttpRequest> outputChannel;
89  
90      private volatile ClientHttp1StreamHandler outgoing;
91      private volatile ClientHttp1StreamHandler incoming;
92  
93      public ClientHttp1StreamDuplexer(
94              final ProtocolIOSession ioSession,
95              final HttpProcessor httpProcessor,
96              final Http1Config http1Config,
97              final CharCodingConfig charCodingConfig,
98              final ConnectionReuseStrategy connectionReuseStrategy,
99              final NHttpMessageParser<HttpResponse> incomingMessageParser,
100             final NHttpMessageWriter<HttpRequest> outgoingMessageWriter,
101             final ContentLengthStrategy incomingContentStrategy,
102             final ContentLengthStrategy outgoingContentStrategy,
103             final Http1StreamListener streamListener) {
104         super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
105                 incomingContentStrategy, outgoingContentStrategy);
106         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
107         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
108         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
109                 DefaultConnectionReuseStrategy.INSTANCE;
110         this.streamListener = streamListener;
111         this.pipeline = new ConcurrentLinkedQueue<>();
112         this.outputChannel = new Http1StreamChannel<HttpRequest>() {
113 
114             @Override
115             public void close() {
116                 shutdownSession(CloseMode.IMMEDIATE);
117             }
118 
119             @Override
120             public void submit(
121                     final HttpRequest request,
122                     final boolean endStream,
123                     final FlushMode flushMode) throws HttpException, IOException {
124                 if (streamListener != null) {
125                     streamListener.onRequestHead(ClientHttp1StreamDuplexer.this, request);
126                 }
127                 commitMessageHead(request, endStream, flushMode);
128             }
129 
130             @Override
131             public void suspendOutput() throws IOException {
132                 suspendSessionOutput();
133             }
134 
135             @Override
136             public void requestOutput() {
137                 requestSessionOutput();
138             }
139 
140             @Override
141             public Timeout getSocketTimeout() {
142                 return getSessionTimeout();
143             }
144 
145             @Override
146             public void setSocketTimeout(final Timeout timeout) {
147                 setSessionTimeout(timeout);
148             }
149 
150             @Override
151             public int write(final ByteBuffer src) throws IOException {
152                 return streamOutput(src);
153             }
154 
155             @Override
156             public void complete(final List<? extends Header> trailers) throws IOException {
157                 endOutputStream(trailers);
158             }
159 
160             @Override
161             public boolean isCompleted() {
162                 return isOutputCompleted();
163             }
164 
165             @Override
166             public boolean abortGracefully() throws IOException {
167                 final MessageDelineation messageDelineation = endOutputStream(null);
168                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
169             }
170 
171             @Override
172             public void activate() throws HttpException, IOException {
173             }
174 
175         };
176     }
177 
178     @Override
179     void terminate(final Exception exception) {
180         if (incoming != null) {
181             incoming.failed(exception);
182             incoming.releaseResources();
183             incoming = null;
184         }
185         if (outgoing != null) {
186             outgoing.failed(exception);
187             outgoing.releaseResources();
188             outgoing = null;
189         }
190         for (;;) {
191             final ClientHttp1StreamHandler handler = pipeline.poll();
192             if (handler != null) {
193                 handler.failed(exception);
194                 handler.releaseResources();
195             } else {
196                 break;
197             }
198         }
199     }
200 
201     @Override
202     void disconnected() {
203         if (incoming != null) {
204             if (!incoming.isCompleted()) {
205                 incoming.failed(new ConnectionClosedException());
206             }
207             incoming.releaseResources();
208             incoming = null;
209         }
210         if (outgoing != null) {
211             if (!outgoing.isCompleted()) {
212                 outgoing.failed(new ConnectionClosedException());
213             }
214             outgoing.releaseResources();
215             outgoing = null;
216         }
217         for (;;) {
218             final ClientHttp1StreamHandler handler = pipeline.poll();
219             if (handler != null) {
220                 handler.failed(new ConnectionClosedException());
221                 handler.releaseResources();
222             } else {
223                 break;
224             }
225         }
226     }
227 
228     @Override
229     void updateInputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
230         if (response.getCode() >= HttpStatus.SC_OK) {
231             connMetrics.incrementRequestCount();
232         }
233     }
234 
235     @Override
236     void updateOutputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
237         connMetrics.incrementRequestCount();
238     }
239 
240     @Override
241     protected boolean handleIncomingMessage(final HttpResponse response) throws HttpException {
242 
243         if (incoming == null) {
244             incoming = pipeline.poll();
245         }
246         if (incoming == null) {
247             throw new HttpException("Unexpected response");
248         }
249         return MessageSupport.canResponseHaveBody(incoming.getRequestMethod(), response);
250     }
251 
252     @Override
253     protected ContentDecoder createContentDecoder(
254             final long len,
255             final ReadableByteChannel channel,
256             final SessionInputBuffer buffer,
257             final BasicHttpTransportMetrics metrics) throws HttpException {
258 
259         if (len >= 0) {
260             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
261         } else if (len == ContentLengthStrategy.CHUNKED) {
262             return new ChunkDecoder(channel, buffer, http1Config, metrics);
263         } else {
264             return new IdentityDecoder(channel, buffer, metrics);
265         }
266     }
267 
268     @Override
269     protected boolean handleOutgoingMessage(final HttpRequest request) throws HttpException {
270         return true;
271     }
272 
273     @Override
274     protected ContentEncoder createContentEncoder(
275             final long len,
276             final WritableByteChannel channel,
277             final SessionOutputBuffer buffer,
278             final BasicHttpTransportMetrics metrics) throws HttpException {
279         final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
280         if (len >= 0) {
281             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
282         } else if (len == ContentLengthStrategy.CHUNKED) {
283             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
284         } else {
285             throw new LengthRequiredException();
286         }
287     }
288 
289     @Override
290     boolean inputIdle() {
291         return incoming == null;
292     }
293 
294     @Override
295     boolean outputIdle() {
296         return outgoing == null && pipeline.isEmpty();
297     }
298 
299     @Override
300     void outputEnd() throws HttpException, IOException {
301         if (outgoing != null) {
302             if (outgoing.isCompleted()) {
303                 outgoing.releaseResources();
304             }
305             outgoing = null;
306         }
307     }
308 
309     @Override
310     void execute(final RequestExecutionCommand executionCommand) throws HttpException, IOException {
311         final AsyncClientExchangeHandler exchangeHandler = executionCommand.getExchangeHandler();
312         final HttpCoreContext context = HttpCoreContext.adapt(executionCommand.getContext());
313         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
314         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
315         final ClientHttp1StreamHandlerntHttp1StreamHandler.html#ClientHttp1StreamHandler">ClientHttp1StreamHandler handler = new ClientHttp1StreamHandler(
316                 outputChannel,
317                 httpProcessor,
318                 http1Config,
319                 connectionReuseStrategy,
320                 exchangeHandler,
321                 context);
322         pipeline.add(handler);
323         outgoing = handler;
324 
325         if (handler.isOutputReady()) {
326             handler.produceOutput();
327         }
328     }
329 
330     @Override
331     boolean isOutputReady() {
332         return outgoing != null && outgoing.isOutputReady();
333     }
334 
335     @Override
336     void produceOutput() throws HttpException, IOException {
337         if (outgoing != null) {
338             outgoing.produceOutput();
339         }
340     }
341 
342     @Override
343     void consumeHeader(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
344         if (streamListener != null) {
345             streamListener.onResponseHead(this, response);
346         }
347         Asserts.notNull(incoming, "Response stream handler");
348         incoming.consumeHeader(response, entityDetails);
349     }
350 
351     @Override
352     void consumeData(final ByteBuffer src) throws HttpException, IOException {
353         Asserts.notNull(incoming, "Response stream handler");
354         incoming.consumeData(src);
355     }
356 
357     @Override
358     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
359         Asserts.notNull(incoming, "Response stream handler");
360         incoming.updateCapacity(capacityChannel);
361     }
362 
363     @Override
364     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
365         Asserts.notNull(incoming, "Response stream handler");
366         incoming.dataEnd(trailers);
367     }
368 
369     @Override
370     void inputEnd() throws HttpException, IOException {
371         if (incoming != null && incoming.isResponseFinal()) {
372             if (streamListener != null) {
373                 streamListener.onExchangeComplete(this, isOpen());
374             }
375             if (incoming.isCompleted()) {
376                 incoming.releaseResources();
377             }
378             incoming = null;
379         }
380     }
381 
382     @Override
383     boolean handleTimeout() {
384         return outgoing != null && outgoing.handleTimeout();
385     }
386 
387     @Override
388     void appendState(final StringBuilder buf) {
389         super.appendState(buf);
390         super.appendState(buf);
391         buf.append(", incoming=[");
392         if (incoming != null) {
393             incoming.appendState(buf);
394         }
395         buf.append("], outgoing=[");
396         if (outgoing != null) {
397             outgoing.appendState(buf);
398         }
399         buf.append("], pipeline=");
400         buf.append(pipeline.size());
401     }
402 
403     @Override
404     public String toString() {
405         final StringBuilder buf = new StringBuilder();
406         buf.append("[");
407         appendState(buf);
408         buf.append("]");
409         return buf.toString();
410     }
411 
412 }