View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.config.CharCodingConfig;
49  import org.apache.hc.core5.http.config.Http1Config;
50  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53  import org.apache.hc.core5.http.impl.Http1StreamListener;
54  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55  import org.apache.hc.core5.http.nio.CapacityChannel;
56  import org.apache.hc.core5.http.nio.ContentDecoder;
57  import org.apache.hc.core5.http.nio.ContentEncoder;
58  import org.apache.hc.core5.http.nio.HandlerFactory;
59  import org.apache.hc.core5.http.nio.NHttpMessageParser;
60  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61  import org.apache.hc.core5.http.nio.SessionInputBuffer;
62  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.protocol.HttpCoreContext;
65  import org.apache.hc.core5.http.protocol.HttpProcessor;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.reactor.ProtocolIOSession;
68  import org.apache.hc.core5.util.Args;
69  import org.apache.hc.core5.util.Asserts;
70  import org.apache.hc.core5.util.Timeout;
71  
72  /**
73   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
74   * server side HTTP/1.1 messaging protocol with full support for
75   * duplexed message transmission and message pipelining.
76   *
77   * @since 5.0
78   */
79  @Internal
80  public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
81  
82      private final String scheme;
83      private final HttpProcessor httpProcessor;
84      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
85      private final Http1Config http1Config;
86      private final ConnectionReuseStrategy connectionReuseStrategy;
87      private final Http1StreamListener streamListener;
88      private final Queue<ServerHttp1StreamHandler> pipeline;
89      private final Http1StreamChannel<HttpResponse> outputChannel;
90  
91      private volatile ServerHttp1StreamHandler outgoing;
92      private volatile ServerHttp1StreamHandler incoming;
93  
94      public ServerHttp1StreamDuplexer(
95              final ProtocolIOSession ioSession,
96              final HttpProcessor httpProcessor,
97              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
98              final String scheme,
99              final Http1Config http1Config,
100             final CharCodingConfig charCodingConfig,
101             final ConnectionReuseStrategy connectionReuseStrategy,
102             final NHttpMessageParser<HttpRequest> incomingMessageParser,
103             final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
104             final ContentLengthStrategy incomingContentStrategy,
105             final ContentLengthStrategy outgoingContentStrategy,
106             final Http1StreamListener streamListener) {
107         super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
108                 incomingContentStrategy, outgoingContentStrategy);
109         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
110         this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
111         this.scheme = scheme;
112         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
113         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
114                 DefaultConnectionReuseStrategy.INSTANCE;
115         this.streamListener = streamListener;
116         this.pipeline = new ConcurrentLinkedQueue<>();
117         this.outputChannel = new Http1StreamChannel<HttpResponse>() {
118 
119             @Override
120             public void close() {
121                 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
122             }
123 
124             @Override
125             public void submit(
126                     final HttpResponse response,
127                     final boolean endStream,
128                     final FlushMode flushMode) throws HttpException, IOException {
129                 if (streamListener != null) {
130                     streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
131                 }
132                 commitMessageHead(response, endStream, flushMode);
133             }
134 
135             @Override
136             public void requestOutput() {
137                 requestSessionOutput();
138             }
139 
140             @Override
141             public void suspendOutput() throws IOException {
142                 suspendSessionOutput();
143             }
144 
145             @Override
146             public Timeout getSocketTimeout() {
147                 return getSessionTimeout();
148             }
149 
150             @Override
151             public void setSocketTimeout(final Timeout timeout) {
152                 setSessionTimeout(timeout);
153             }
154 
155             @Override
156             public int write(final ByteBuffer src) throws IOException {
157                 return streamOutput(src);
158             }
159 
160             @Override
161             public void complete(final List<? extends Header> trailers) throws IOException {
162                 endOutputStream(trailers);
163             }
164 
165             @Override
166             public boolean isCompleted() {
167                 return isOutputCompleted();
168             }
169 
170             @Override
171             public boolean abortGracefully() throws IOException {
172                 final MessageDelineation messageDelineation = endOutputStream(null);
173                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
174             }
175 
176             @Override
177             public void activate() throws HttpException, IOException {
178                 // empty
179             }
180 
181             @Override
182             public String toString() {
183                 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
184             }
185 
186         };
187     }
188 
189     @Override
190     void terminate(final Exception exception) {
191         if (incoming != null) {
192             incoming.failed(exception);
193             incoming.releaseResources();
194             incoming = null;
195         }
196         if (outgoing != null) {
197             outgoing.failed(exception);
198             outgoing.releaseResources();
199             outgoing = null;
200         }
201         for (;;) {
202             final ServerHttp1StreamHandler handler = pipeline.poll();
203             if (handler != null) {
204                 handler.failed(exception);
205                 handler.releaseResources();
206             } else {
207                 break;
208             }
209         }
210     }
211 
212     @Override
213     void disconnected() {
214         if (incoming != null) {
215             if (!incoming.isCompleted()) {
216                 incoming.failed(new ConnectionClosedException());
217             }
218             incoming.releaseResources();
219             incoming = null;
220         }
221         if (outgoing != null) {
222             if (!outgoing.isCompleted()) {
223                 outgoing.failed(new ConnectionClosedException());
224             }
225             outgoing.releaseResources();
226             outgoing = null;
227         }
228         for (;;) {
229             final ServerHttp1StreamHandler handler = pipeline.poll();
230             if (handler != null) {
231                 handler.failed(new ConnectionClosedException());
232                 handler.releaseResources();
233             } else {
234                 break;
235             }
236         }
237     }
238 
239     @Override
240     void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
241         connMetrics.incrementRequestCount();
242     }
243 
244     @Override
245     void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
246         if (response.getCode() >= HttpStatus.SC_OK) {
247             connMetrics.incrementRequestCount();
248         }
249     }
250 
251     @Override
252     protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
253         return true;
254     }
255 
256     @Override
257     protected ContentDecoder createContentDecoder(
258             final long len,
259             final ReadableByteChannel channel,
260             final SessionInputBuffer buffer,
261             final BasicHttpTransportMetrics metrics) throws HttpException {
262         if (len >= 0) {
263             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
264         } else if (len == ContentLengthStrategy.CHUNKED) {
265             return new ChunkDecoder(channel, buffer, http1Config, metrics);
266         } else {
267             return null;
268         }
269     }
270 
271     @Override
272     protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
273         return true;
274     }
275 
276     @Override
277     protected ContentEncoder createContentEncoder(
278             final long len,
279             final WritableByteChannel channel,
280             final SessionOutputBuffer buffer,
281             final BasicHttpTransportMetrics metrics) throws HttpException {
282         final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
283         if (len >= 0) {
284             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
285         } else if (len == ContentLengthStrategy.CHUNKED) {
286             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
287         } else {
288             return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
289         }
290     }
291 
292     @Override
293     boolean inputIdle() {
294         return incoming == null;
295     }
296 
297     @Override
298     boolean outputIdle() {
299         return outgoing == null && pipeline.isEmpty();
300     }
301 
302     @Override
303     HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
304         try {
305             return super.parseMessageHead(endOfStream);
306         } catch (final HttpException ex) {
307             terminateExchange(ex);
308             return null;
309         }
310     }
311 
312     void terminateExchange(final HttpException ex) throws HttpException, IOException {
313         suspendSessionInput();
314         final ServerHttp1StreamHandler streamHandler;
315         final HttpCoreContext context = HttpCoreContext.create();
316         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
317         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
318         if (outgoing == null) {
319             streamHandler = new ServerHttp1StreamHandler(
320                     outputChannel,
321                     httpProcessor,
322                     connectionReuseStrategy,
323                     exchangeHandlerFactory,
324                     context);
325             outgoing = streamHandler;
326         } else {
327             streamHandler = new ServerHttp1StreamHandler(
328                     new DelayedOutputChannel(outputChannel),
329                     httpProcessor,
330                     connectionReuseStrategy,
331                     exchangeHandlerFactory,
332                     context);
333             pipeline.add(streamHandler);
334         }
335         streamHandler.terminateExchange(ex);
336         incoming = null;
337     }
338 
339     @Override
340     void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
341         if (streamListener != null) {
342             streamListener.onRequestHead(this, request);
343         }
344         final ServerHttp1StreamHandler streamHandler;
345         final HttpCoreContext context = HttpCoreContext.create();
346         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
347         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
348         if (outgoing == null) {
349             streamHandler = new ServerHttp1StreamHandler(
350                     outputChannel,
351                     httpProcessor,
352                     connectionReuseStrategy,
353                     exchangeHandlerFactory,
354                     context);
355             outgoing = streamHandler;
356         } else {
357             streamHandler = new ServerHttp1StreamHandler(
358                     new DelayedOutputChannel(outputChannel),
359                     httpProcessor,
360                     connectionReuseStrategy,
361                     exchangeHandlerFactory,
362                     context);
363             pipeline.add(streamHandler);
364         }
365         request.setScheme(scheme);
366         streamHandler.consumeHeader(request, entityDetails);
367         incoming = streamHandler;
368     }
369 
370     @Override
371     void consumeData(final ByteBuffer src) throws HttpException, IOException {
372         Asserts.notNull(incoming, "Request stream handler");
373         incoming.consumeData(src);
374     }
375 
376     @Override
377     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
378         Asserts.notNull(incoming, "Request stream handler");
379         incoming.updateCapacity(capacityChannel);
380     }
381 
382     @Override
383     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
384         Asserts.notNull(incoming, "Request stream handler");
385         incoming.dataEnd(trailers);
386     }
387 
388     @Override
389     void inputEnd() throws HttpException, IOException {
390         if (incoming != null) {
391             if (incoming.isCompleted()) {
392                 incoming.releaseResources();
393             }
394             incoming = null;
395         }
396         if (isShuttingDown() && outputIdle() && inputIdle()) {
397             shutdownSession(CloseMode.IMMEDIATE);
398         }
399     }
400 
401     @Override
402     void execute(final RequestExecutionCommand executionCommand) throws HttpException {
403         throw new HttpException("Illegal command: " + executionCommand.getClass());
404     }
405 
406     @Override
407     boolean isOutputReady() {
408         return outgoing != null && outgoing.isOutputReady();
409     }
410 
411     @Override
412     void produceOutput() throws HttpException, IOException {
413         if (outgoing != null) {
414             outgoing.produceOutput();
415         }
416     }
417 
418     @Override
419     void outputEnd() throws HttpException, IOException {
420         if (outgoing != null && outgoing.isResponseFinal()) {
421             if (streamListener != null) {
422                 streamListener.onExchangeComplete(this, outgoing.keepAlive());
423             }
424             if (outgoing.isCompleted()) {
425                 outgoing.releaseResources();
426             }
427             outgoing = null;
428         }
429         if (outgoing == null && isActive()) {
430             final ServerHttp1StreamHandler handler = pipeline.poll();
431             if (handler != null) {
432                 outgoing = handler;
433                 handler.activateChannel();
434                 if (handler.isOutputReady()) {
435                     handler.produceOutput();
436                 }
437             }
438         }
439         if (isShuttingDown() && outputIdle() && inputIdle()) {
440             shutdownSession(CloseMode.IMMEDIATE);
441         }
442     }
443 
444     @Override
445     boolean handleTimeout() {
446         return false;
447     }
448 
449     @Override
450     void appendState(final StringBuilder buf) {
451         super.appendState(buf);
452         buf.append(", incoming=[");
453         if (incoming != null) {
454             incoming.appendState(buf);
455         }
456         buf.append("], outgoing=[");
457         if (outgoing != null) {
458             outgoing.appendState(buf);
459         }
460         buf.append("], pipeline=");
461         buf.append(pipeline.size());
462     }
463 
464     @Override
465     public String toString() {
466         final StringBuilder buf = new StringBuilder();
467         buf.append("[");
468         appendState(buf);
469         buf.append("]");
470         return buf.toString();
471     }
472 
473     private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
474 
475         private final Http1StreamChannel<HttpResponse> channel;
476 
477         private volatile boolean direct;
478         private volatile HttpResponse delayedResponse;
479         private volatile boolean completed;
480 
481         private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
482             this.channel = channel;
483         }
484 
485         @Override
486         public void close() {
487             channel.close();
488         }
489 
490         @Override
491         public void submit(
492                 final HttpResponse response,
493                 final boolean endStream,
494                 final FlushMode flushMode) throws HttpException, IOException {
495             synchronized (this) {
496                 if (direct) {
497                     channel.submit(response, endStream, flushMode);
498                 } else {
499                     delayedResponse = response;
500                     completed = endStream;
501                 }
502             }
503         }
504 
505         @Override
506         public void suspendOutput() throws IOException {
507             channel.suspendOutput();
508         }
509 
510         @Override
511         public void requestOutput() {
512             channel.requestOutput();
513         }
514 
515         @Override
516         public Timeout getSocketTimeout() {
517             return channel.getSocketTimeout();
518         }
519 
520         @Override
521         public void setSocketTimeout(final Timeout timeout) {
522             channel.setSocketTimeout(timeout);
523         }
524 
525         @Override
526         public int write(final ByteBuffer src) throws IOException {
527             synchronized (this) {
528                 return direct ? channel.write(src) : 0;
529             }
530         }
531 
532         @Override
533         public void complete(final List<? extends Header> trailers) throws IOException {
534             synchronized (this) {
535                 if (direct) {
536                     channel.complete(trailers);
537                 } else {
538                     completed = true;
539                 }
540             }
541         }
542 
543         @Override
544         public boolean abortGracefully() throws IOException {
545             synchronized (this) {
546                 if (direct) {
547                     return channel.abortGracefully();
548                 }
549                 completed = true;
550                 return true;
551             }
552         }
553 
554         @Override
555         public boolean isCompleted() {
556             synchronized (this) {
557                 return direct ? channel.isCompleted() : completed;
558             }
559         }
560 
561         @Override
562         public void activate() throws IOException, HttpException {
563             synchronized (this) {
564                 direct = true;
565                 if (delayedResponse != null) {
566                     channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
567                     delayedResponse = null;
568                 }
569             }
570         }
571 
572     }
573 
574 }