View Javadoc
1   /*
2    * ====================================================================
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *   http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing,
14   * software distributed under the License is distributed on an
15   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16   * KIND, either express or implied.  See the License for the
17   * specific language governing permissions and limitations
18   * under the License.
19   * ====================================================================
20   *
21   * This software consists of voluntary contributions made by many
22   * individuals on behalf of the Apache Software Foundation.  For more
23   * information on the Apache Software Foundation, please see
24   * <http://www.apache.org/>.
25   *
26   */
27  
28  package org.apache.hc.core5.http.impl.nio;
29  
30  import java.io.IOException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ReadableByteChannel;
33  import java.nio.channels.WritableByteChannel;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  
38  import org.apache.hc.core5.annotation.Internal;
39  import org.apache.hc.core5.http.ConnectionClosedException;
40  import org.apache.hc.core5.http.ConnectionReuseStrategy;
41  import org.apache.hc.core5.http.ContentLengthStrategy;
42  import org.apache.hc.core5.http.EntityDetails;
43  import org.apache.hc.core5.http.Header;
44  import org.apache.hc.core5.http.HttpException;
45  import org.apache.hc.core5.http.HttpRequest;
46  import org.apache.hc.core5.http.HttpResponse;
47  import org.apache.hc.core5.http.HttpStatus;
48  import org.apache.hc.core5.http.config.CharCodingConfig;
49  import org.apache.hc.core5.http.config.Http1Config;
50  import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51  import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52  import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53  import org.apache.hc.core5.http.impl.Http1StreamListener;
54  import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55  import org.apache.hc.core5.http.nio.CapacityChannel;
56  import org.apache.hc.core5.http.nio.ContentDecoder;
57  import org.apache.hc.core5.http.nio.ContentEncoder;
58  import org.apache.hc.core5.http.nio.HandlerFactory;
59  import org.apache.hc.core5.http.nio.NHttpMessageParser;
60  import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61  import org.apache.hc.core5.http.nio.SessionInputBuffer;
62  import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63  import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64  import org.apache.hc.core5.http.protocol.HttpCoreContext;
65  import org.apache.hc.core5.http.protocol.HttpProcessor;
66  import org.apache.hc.core5.io.CloseMode;
67  import org.apache.hc.core5.reactor.ProtocolIOSession;
68  import org.apache.hc.core5.util.Args;
69  import org.apache.hc.core5.util.Asserts;
70  import org.apache.hc.core5.util.Timeout;
71  
72  /**
73   * I/O event handler for events fired by {@link ProtocolIOSession} that implements
74   * server side HTTP/1.1 messaging protocol with full support for
75   * duplexed message transmission and message pipelining.
76   *
77   * @since 5.0
78   */
79  @Internal
80  public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
81  
82      private final String scheme;
83      private final HttpProcessor httpProcessor;
84      private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
85      private final Http1Config http1Config;
86      private final ConnectionReuseStrategy connectionReuseStrategy;
87      private final Http1StreamListener streamListener;
88      private final Queue<ServerHttp1StreamHandler> pipeline;
89      private final Http1StreamChannel<HttpResponse> outputChannel;
90  
91      private volatile ServerHttp1StreamHandler outgoing;
92      private volatile ServerHttp1StreamHandler incoming;
93  
94      public ServerHttp1StreamDuplexer(
95              final ProtocolIOSession ioSession,
96              final HttpProcessor httpProcessor,
97              final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
98              final String scheme,
99              final Http1Config http1Config,
100             final CharCodingConfig charCodingConfig,
101             final ConnectionReuseStrategy connectionReuseStrategy,
102             final NHttpMessageParser<HttpRequest> incomingMessageParser,
103             final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
104             final ContentLengthStrategy incomingContentStrategy,
105             final ContentLengthStrategy outgoingContentStrategy,
106             final Http1StreamListener streamListener) {
107         super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
108                 incomingContentStrategy, outgoingContentStrategy);
109         this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
110         this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
111         this.scheme = scheme;
112         this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
113         this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
114                 DefaultConnectionReuseStrategy.INSTANCE;
115         this.streamListener = streamListener;
116         this.pipeline = new ConcurrentLinkedQueue<>();
117         this.outputChannel = new Http1StreamChannel<HttpResponse>() {
118 
119             @Override
120             public void close() {
121                 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
122             }
123 
124             @Override
125             public void submit(
126                     final HttpResponse response,
127                     final boolean endStream,
128                     final FlushMode flushMode) throws HttpException, IOException {
129                 if (streamListener != null) {
130                     streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
131                 }
132                 commitMessageHead(response, endStream, flushMode);
133             }
134 
135             @Override
136             public void requestOutput() {
137                 requestSessionOutput();
138             }
139 
140             @Override
141             public void suspendOutput() throws IOException {
142                 suspendSessionOutput();
143             }
144 
145             @Override
146             public Timeout getSocketTimeout() {
147                 return getSessionTimeout();
148             }
149 
150             @Override
151             public void setSocketTimeout(final Timeout timeout) {
152                 setSessionTimeout(timeout);
153             }
154 
155             @Override
156             public int write(final ByteBuffer src) throws IOException {
157                 return streamOutput(src);
158             }
159 
160             @Override
161             public void complete(final List<? extends Header> trailers) throws IOException {
162                 endOutputStream(trailers);
163             }
164 
165             @Override
166             public boolean isCompleted() {
167                 return isOutputCompleted();
168             }
169 
170             @Override
171             public boolean abortGracefully() throws IOException {
172                 final MessageDelineation messageDelineation = endOutputStream(null);
173                 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
174             }
175 
176             @Override
177             public void activate() throws HttpException, IOException {
178                 // empty
179             }
180 
181             @Override
182             public String toString() {
183                 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
184             }
185 
186         };
187     }
188 
189     @Override
190     void terminate(final Exception exception) {
191         if (incoming != null) {
192             incoming.failed(exception);
193             incoming.releaseResources();
194             incoming = null;
195         }
196         if (outgoing != null) {
197             outgoing.failed(exception);
198             outgoing.releaseResources();
199             outgoing = null;
200         }
201         for (;;) {
202             final ServerHttp1StreamHandler handler = pipeline.poll();
203             if (handler != null) {
204                 handler.failed(exception);
205                 handler.releaseResources();
206             } else {
207                 break;
208             }
209         }
210     }
211 
212     @Override
213     void disconnected() {
214         if (incoming != null) {
215             if (!incoming.isCompleted()) {
216                 incoming.failed(new ConnectionClosedException());
217             }
218             incoming.releaseResources();
219             incoming = null;
220         }
221         if (outgoing != null) {
222             if (!outgoing.isCompleted()) {
223                 outgoing.failed(new ConnectionClosedException());
224             }
225             outgoing.releaseResources();
226             outgoing = null;
227         }
228         for (;;) {
229             final ServerHttp1StreamHandler handler = pipeline.poll();
230             if (handler != null) {
231                 handler.failed(new ConnectionClosedException());
232                 handler.releaseResources();
233             } else {
234                 break;
235             }
236         }
237     }
238 
239     @Override
240     void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
241         connMetrics.incrementRequestCount();
242     }
243 
244     @Override
245     void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
246         if (response.getCode() >= HttpStatus.SC_OK) {
247             connMetrics.incrementRequestCount();
248         }
249     }
250 
251     @Override
252     protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
253         return true;
254     }
255 
256     @Override
257     protected ContentDecoder createContentDecoder(
258             final long len,
259             final ReadableByteChannel channel,
260             final SessionInputBuffer buffer,
261             final BasicHttpTransportMetrics metrics) throws HttpException {
262         if (len >= 0) {
263             return new LengthDelimitedDecoder(channel, buffer, metrics, len);
264         } else if (len == ContentLengthStrategy.CHUNKED) {
265             return new ChunkDecoder(channel, buffer, http1Config, metrics);
266         } else {
267             return null;
268         }
269     }
270 
271     @Override
272     protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
273         return true;
274     }
275 
276     @Override
277     protected ContentEncoder createContentEncoder(
278             final long len,
279             final WritableByteChannel channel,
280             final SessionOutputBuffer buffer,
281             final BasicHttpTransportMetrics metrics) throws HttpException {
282         final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
283         if (len >= 0) {
284             return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
285         } else if (len == ContentLengthStrategy.CHUNKED) {
286             return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
287         } else {
288             return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
289         }
290     }
291 
292     @Override
293     boolean inputIdle() {
294         return incoming == null;
295     }
296 
297     @Override
298     boolean outputIdle() {
299         return outgoing == null && pipeline.isEmpty();
300     }
301 
302     @Override
303     HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
304         try {
305             return super.parseMessageHead(endOfStream);
306         } catch (final HttpException ex) {
307             terminateExchange(ex);
308             return null;
309         }
310     }
311 
312     void terminateExchange(final HttpException ex) throws HttpException, IOException {
313         suspendSessionInput();
314         final ServerHttp1StreamHandler streamHandler;
315         final HttpCoreContext context = HttpCoreContext.create();
316         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
317         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
318         if (outgoing == null) {
319             streamHandler = new ServerHttp1StreamHandler(
320                     outputChannel,
321                     httpProcessor,
322                     connectionReuseStrategy,
323                     exchangeHandlerFactory,
324                     context);
325             outgoing = streamHandler;
326         } else {
327             streamHandler = new ServerHttp1StreamHandler(
328                     new DelayedOutputChannel(outputChannel),
329                     httpProcessor,
330                     connectionReuseStrategy,
331                     exchangeHandlerFactory,
332                     context);
333             pipeline.add(streamHandler);
334         }
335         streamHandler.terminateExchange(ex);
336         incoming = null;
337     }
338 
339     @Override
340     void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
341         if (streamListener != null) {
342             streamListener.onRequestHead(this, request);
343         }
344         final ServerHttp1StreamHandler streamHandler;
345         final HttpCoreContext context = HttpCoreContext.create();
346         context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
347         context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
348         if (outgoing == null) {
349             streamHandler = new ServerHttp1StreamHandler(
350                     outputChannel,
351                     httpProcessor,
352                     connectionReuseStrategy,
353                     exchangeHandlerFactory,
354                     context);
355             outgoing = streamHandler;
356         } else {
357             streamHandler = new ServerHttp1StreamHandler(
358                     new DelayedOutputChannel(outputChannel),
359                     httpProcessor,
360                     connectionReuseStrategy,
361                     exchangeHandlerFactory,
362                     context);
363             pipeline.add(streamHandler);
364         }
365         request.setScheme(scheme);
366         streamHandler.consumeHeader(request, entityDetails);
367         incoming = streamHandler;
368     }
369 
370     @Override
371     void consumeData(final ByteBuffer src) throws HttpException, IOException {
372         Asserts.notNull(incoming, "Request stream handler");
373         incoming.consumeData(src);
374     }
375 
376     @Override
377     void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
378         Asserts.notNull(incoming, "Request stream handler");
379         incoming.updateCapacity(capacityChannel);
380     }
381 
382     @Override
383     void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
384         Asserts.notNull(incoming, "Request stream handler");
385         incoming.dataEnd(trailers);
386     }
387 
388     @Override
389     void inputEnd() throws HttpException, IOException {
390         if (incoming != null) {
391             if (incoming.isCompleted()) {
392                 incoming.releaseResources();
393             }
394             incoming = null;
395         }
396     }
397 
398     @Override
399     void execute(final RequestExecutionCommand executionCommand) throws HttpException {
400         throw new HttpException("Illegal command: " + executionCommand.getClass());
401     }
402 
403     @Override
404     boolean isOutputReady() {
405         return outgoing != null && outgoing.isOutputReady();
406     }
407 
408     @Override
409     void produceOutput() throws HttpException, IOException {
410         if (outgoing != null) {
411             outgoing.produceOutput();
412         }
413     }
414 
415     @Override
416     void outputEnd() throws HttpException, IOException {
417         if (outgoing != null && outgoing.isResponseFinal()) {
418             if (streamListener != null) {
419                 streamListener.onExchangeComplete(this, outgoing.keepAlive());
420             }
421             if (outgoing.isCompleted()) {
422                 outgoing.releaseResources();
423             }
424             outgoing = null;
425         }
426         if (outgoing == null && isOpen()) {
427             final ServerHttp1StreamHandler handler = pipeline.poll();
428             if (handler != null) {
429                 outgoing = handler;
430                 handler.activateChannel();
431                 if (handler.isOutputReady()) {
432                     handler.produceOutput();
433                 }
434             }
435         }
436     }
437 
438     @Override
439     boolean handleTimeout() {
440         return false;
441     }
442 
443     @Override
444     void appendState(final StringBuilder buf) {
445         super.appendState(buf);
446         buf.append(", incoming=[");
447         if (incoming != null) {
448             incoming.appendState(buf);
449         }
450         buf.append("], outgoing=[");
451         if (outgoing != null) {
452             outgoing.appendState(buf);
453         }
454         buf.append("], pipeline=");
455         buf.append(pipeline.size());
456     }
457 
458     @Override
459     public String toString() {
460         final StringBuilder buf = new StringBuilder();
461         buf.append("[");
462         appendState(buf);
463         buf.append("]");
464         return buf.toString();
465     }
466 
467     private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
468 
469         private final Http1StreamChannel<HttpResponse> channel;
470 
471         private volatile boolean direct;
472         private volatile HttpResponse delayedResponse;
473         private volatile boolean completed;
474 
475         private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
476             this.channel = channel;
477         }
478 
479         @Override
480         public void close() {
481             channel.close();
482         }
483 
484         @Override
485         public void submit(
486                 final HttpResponse response,
487                 final boolean endStream,
488                 final FlushMode flushMode) throws HttpException, IOException {
489             synchronized (this) {
490                 if (direct) {
491                     channel.submit(response, endStream, flushMode);
492                 } else {
493                     delayedResponse = response;
494                     completed = endStream;
495                 }
496             }
497         }
498 
499         @Override
500         public void suspendOutput() throws IOException {
501             channel.suspendOutput();
502         }
503 
504         @Override
505         public void requestOutput() {
506             channel.requestOutput();
507         }
508 
509         @Override
510         public Timeout getSocketTimeout() {
511             return channel.getSocketTimeout();
512         }
513 
514         @Override
515         public void setSocketTimeout(final Timeout timeout) {
516             channel.setSocketTimeout(timeout);
517         }
518 
519         @Override
520         public int write(final ByteBuffer src) throws IOException {
521             synchronized (this) {
522                 return direct ? channel.write(src) : 0;
523             }
524         }
525 
526         @Override
527         public void complete(final List<? extends Header> trailers) throws IOException {
528             synchronized (this) {
529                 if (direct) {
530                     channel.complete(trailers);
531                 } else {
532                     completed = true;
533                 }
534             }
535         }
536 
537         @Override
538         public boolean abortGracefully() throws IOException {
539             synchronized (this) {
540                 if (direct) {
541                     return channel.abortGracefully();
542                 }
543                 completed = true;
544                 return true;
545             }
546         }
547 
548         @Override
549         public boolean isCompleted() {
550             synchronized (this) {
551                 return direct ? channel.isCompleted() : completed;
552             }
553         }
554 
555         @Override
556         public void activate() throws IOException, HttpException {
557             synchronized (this) {
558                 direct = true;
559                 if (delayedResponse != null) {
560                     channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
561                     delayedResponse = null;
562                 }
563             }
564         }
565 
566     }
567 
568 }