1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ReadableByteChannel;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37
38 import org.apache.hc.core5.annotation.Internal;
39 import org.apache.hc.core5.http.ConnectionClosedException;
40 import org.apache.hc.core5.http.ConnectionReuseStrategy;
41 import org.apache.hc.core5.http.ContentLengthStrategy;
42 import org.apache.hc.core5.http.EntityDetails;
43 import org.apache.hc.core5.http.Header;
44 import org.apache.hc.core5.http.HttpException;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.config.CharCodingConfig;
49 import org.apache.hc.core5.http.config.Http1Config;
50 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53 import org.apache.hc.core5.http.impl.Http1StreamListener;
54 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55 import org.apache.hc.core5.http.nio.CapacityChannel;
56 import org.apache.hc.core5.http.nio.ContentDecoder;
57 import org.apache.hc.core5.http.nio.ContentEncoder;
58 import org.apache.hc.core5.http.nio.HandlerFactory;
59 import org.apache.hc.core5.http.nio.NHttpMessageParser;
60 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61 import org.apache.hc.core5.http.nio.SessionInputBuffer;
62 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64 import org.apache.hc.core5.http.protocol.HttpCoreContext;
65 import org.apache.hc.core5.http.protocol.HttpProcessor;
66 import org.apache.hc.core5.io.CloseMode;
67 import org.apache.hc.core5.reactor.ProtocolIOSession;
68 import org.apache.hc.core5.util.Args;
69 import org.apache.hc.core5.util.Asserts;
70 import org.apache.hc.core5.util.Timeout;
71
72
73
74
75
76
77
78
79 @Internal
80 public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
81
82 private final String scheme;
83 private final HttpProcessor httpProcessor;
84 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
85 private final Http1Config http1Config;
86 private final ConnectionReuseStrategy connectionReuseStrategy;
87 private final Http1StreamListener streamListener;
88 private final Queue<ServerHttp1StreamHandler> pipeline;
89 private final Http1StreamChannel<HttpResponse> outputChannel;
90
91 private volatile ServerHttp1StreamHandler outgoing;
92 private volatile ServerHttp1StreamHandler incoming;
93
94 public ServerHttp1StreamDuplexer(
95 final ProtocolIOSession ioSession,
96 final HttpProcessor httpProcessor,
97 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
98 final String scheme,
99 final Http1Config http1Config,
100 final CharCodingConfig charCodingConfig,
101 final ConnectionReuseStrategy connectionReuseStrategy,
102 final NHttpMessageParser<HttpRequest> incomingMessageParser,
103 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
104 final ContentLengthStrategy incomingContentStrategy,
105 final ContentLengthStrategy outgoingContentStrategy,
106 final Http1StreamListener streamListener) {
107 super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
108 incomingContentStrategy, outgoingContentStrategy);
109 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
110 this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
111 this.scheme = scheme;
112 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
113 this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
114 DefaultConnectionReuseStrategy.INSTANCE;
115 this.streamListener = streamListener;
116 this.pipeline = new ConcurrentLinkedQueue<>();
117 this.outputChannel = new Http1StreamChannel<HttpResponse>() {
118
119 @Override
120 public void close() {
121 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
122 }
123
124 @Override
125 public void submit(
126 final HttpResponse response,
127 final boolean endStream,
128 final FlushMode flushMode) throws HttpException, IOException {
129 if (streamListener != null) {
130 streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
131 }
132 commitMessageHead(response, endStream, flushMode);
133 }
134
135 @Override
136 public void requestOutput() {
137 requestSessionOutput();
138 }
139
140 @Override
141 public void suspendOutput() throws IOException {
142 suspendSessionOutput();
143 }
144
145 @Override
146 public Timeout getSocketTimeout() {
147 return getSessionTimeout();
148 }
149
150 @Override
151 public void setSocketTimeout(final Timeout timeout) {
152 setSessionTimeout(timeout);
153 }
154
155 @Override
156 public int write(final ByteBuffer src) throws IOException {
157 return streamOutput(src);
158 }
159
160 @Override
161 public void complete(final List<? extends Header> trailers) throws IOException {
162 endOutputStream(trailers);
163 }
164
165 @Override
166 public boolean isCompleted() {
167 return isOutputCompleted();
168 }
169
170 @Override
171 public boolean abortGracefully() throws IOException {
172 final MessageDelineation messageDelineation = endOutputStream(null);
173 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
174 }
175
176 @Override
177 public void activate() throws HttpException, IOException {
178
179 }
180
181 @Override
182 public String toString() {
183 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
184 }
185
186 };
187 }
188
189 @Override
190 void terminate(final Exception exception) {
191 if (incoming != null) {
192 incoming.failed(exception);
193 incoming.releaseResources();
194 incoming = null;
195 }
196 if (outgoing != null) {
197 outgoing.failed(exception);
198 outgoing.releaseResources();
199 outgoing = null;
200 }
201 for (;;) {
202 final ServerHttp1StreamHandler handler = pipeline.poll();
203 if (handler != null) {
204 handler.failed(exception);
205 handler.releaseResources();
206 } else {
207 break;
208 }
209 }
210 }
211
212 @Override
213 void disconnected() {
214 if (incoming != null) {
215 if (!incoming.isCompleted()) {
216 incoming.failed(new ConnectionClosedException());
217 }
218 incoming.releaseResources();
219 incoming = null;
220 }
221 if (outgoing != null) {
222 if (!outgoing.isCompleted()) {
223 outgoing.failed(new ConnectionClosedException());
224 }
225 outgoing.releaseResources();
226 outgoing = null;
227 }
228 for (;;) {
229 final ServerHttp1StreamHandler handler = pipeline.poll();
230 if (handler != null) {
231 handler.failed(new ConnectionClosedException());
232 handler.releaseResources();
233 } else {
234 break;
235 }
236 }
237 }
238
239 @Override
240 void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
241 connMetrics.incrementRequestCount();
242 }
243
244 @Override
245 void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
246 if (response.getCode() >= HttpStatus.SC_OK) {
247 connMetrics.incrementResponseCount();
248 }
249 }
250
251 @Override
252 protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
253 return true;
254 }
255
256 @Override
257 protected ContentDecoder createContentDecoder(
258 final long len,
259 final ReadableByteChannel channel,
260 final SessionInputBuffer buffer,
261 final BasicHttpTransportMetrics metrics) throws HttpException {
262 if (len >= 0) {
263 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
264 } else if (len == ContentLengthStrategy.CHUNKED) {
265 return new ChunkDecoder(channel, buffer, http1Config, metrics);
266 } else {
267 return null;
268 }
269 }
270
271 @Override
272 protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
273 return true;
274 }
275
276 @Override
277 protected ContentEncoder createContentEncoder(
278 final long len,
279 final WritableByteChannel channel,
280 final SessionOutputBuffer buffer,
281 final BasicHttpTransportMetrics metrics) throws HttpException {
282 final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
283 if (len >= 0) {
284 return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
285 } else if (len == ContentLengthStrategy.CHUNKED) {
286 return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
287 } else {
288 return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
289 }
290 }
291
292 @Override
293 boolean inputIdle() {
294 return incoming == null;
295 }
296
297 @Override
298 boolean outputIdle() {
299 return outgoing == null && pipeline.isEmpty();
300 }
301
302 @Override
303 HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
304 try {
305 return super.parseMessageHead(endOfStream);
306 } catch (final HttpException ex) {
307 terminateExchange(ex);
308 return null;
309 }
310 }
311
312 void terminateExchange(final HttpException ex) throws HttpException, IOException {
313 suspendSessionInput();
314 final ServerHttp1StreamHandler streamHandler;
315 final HttpCoreContext context = HttpCoreContext.create();
316 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
317 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
318 if (outgoing == null) {
319 streamHandler = new ServerHttp1StreamHandler(
320 outputChannel,
321 httpProcessor,
322 connectionReuseStrategy,
323 exchangeHandlerFactory,
324 context);
325 outgoing = streamHandler;
326 } else {
327 streamHandler = new ServerHttp1StreamHandler(
328 new DelayedOutputChannel(outputChannel),
329 httpProcessor,
330 connectionReuseStrategy,
331 exchangeHandlerFactory,
332 context);
333 pipeline.add(streamHandler);
334 }
335 streamHandler.terminateExchange(ex);
336 incoming = null;
337 }
338
339 @Override
340 void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
341 if (streamListener != null) {
342 streamListener.onRequestHead(this, request);
343 }
344 final ServerHttp1StreamHandler streamHandler;
345 final HttpCoreContext context = HttpCoreContext.create();
346 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
347 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
348 if (outgoing == null) {
349 streamHandler = new ServerHttp1StreamHandler(
350 outputChannel,
351 httpProcessor,
352 connectionReuseStrategy,
353 exchangeHandlerFactory,
354 context);
355 outgoing = streamHandler;
356 } else {
357 streamHandler = new ServerHttp1StreamHandler(
358 new DelayedOutputChannel(outputChannel),
359 httpProcessor,
360 connectionReuseStrategy,
361 exchangeHandlerFactory,
362 context);
363 pipeline.add(streamHandler);
364 }
365 request.setScheme(scheme);
366 streamHandler.consumeHeader(request, entityDetails);
367 incoming = streamHandler;
368 }
369
370 @Override
371 void consumeData(final ByteBuffer src) throws HttpException, IOException {
372 Asserts.notNull(incoming, "Request stream handler");
373 incoming.consumeData(src);
374 }
375
376 @Override
377 void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
378 Asserts.notNull(incoming, "Request stream handler");
379 incoming.updateCapacity(capacityChannel);
380 }
381
382 @Override
383 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
384 Asserts.notNull(incoming, "Request stream handler");
385 incoming.dataEnd(trailers);
386 }
387
388 @Override
389 void inputEnd() throws HttpException, IOException {
390 if (incoming != null) {
391 if (incoming.isCompleted()) {
392 incoming.releaseResources();
393 }
394 incoming = null;
395 }
396 if (isShuttingDown() && outputIdle() && inputIdle()) {
397 shutdownSession(CloseMode.IMMEDIATE);
398 }
399 }
400
401 @Override
402 void execute(final RequestExecutionCommand executionCommand) throws HttpException {
403 throw new HttpException("Illegal command: " + executionCommand.getClass());
404 }
405
406 @Override
407 boolean isOutputReady() {
408 return outgoing != null && outgoing.isOutputReady();
409 }
410
411 @Override
412 void produceOutput() throws HttpException, IOException {
413 if (outgoing != null) {
414 outgoing.produceOutput();
415 }
416 }
417
418 @Override
419 void outputEnd() throws HttpException, IOException {
420 if (outgoing != null && outgoing.isResponseFinal()) {
421 if (streamListener != null) {
422 streamListener.onExchangeComplete(this, outgoing.keepAlive());
423 }
424 if (outgoing.isCompleted()) {
425 outgoing.releaseResources();
426 }
427 outgoing = null;
428 }
429 if (outgoing == null && isActive()) {
430 final ServerHttp1StreamHandler handler = pipeline.poll();
431 if (handler != null) {
432 outgoing = handler;
433 handler.activateChannel();
434 if (handler.isOutputReady()) {
435 handler.produceOutput();
436 }
437 }
438 }
439 if (isShuttingDown() && outputIdle() && inputIdle()) {
440 shutdownSession(CloseMode.IMMEDIATE);
441 }
442 }
443
444 @Override
445 boolean handleTimeout() {
446 return false;
447 }
448
449 @Override
450 void appendState(final StringBuilder buf) {
451 super.appendState(buf);
452 buf.append(", incoming=[");
453 if (incoming != null) {
454 incoming.appendState(buf);
455 }
456 buf.append("], outgoing=[");
457 if (outgoing != null) {
458 outgoing.appendState(buf);
459 }
460 buf.append("], pipeline=");
461 buf.append(pipeline.size());
462 }
463
464 @Override
465 public String toString() {
466 final StringBuilder buf = new StringBuilder();
467 buf.append("[");
468 appendState(buf);
469 buf.append("]");
470 return buf.toString();
471 }
472
473 private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
474
475 private final Http1StreamChannel<HttpResponse> channel;
476
477 private volatile boolean direct;
478 private volatile HttpResponse delayedResponse;
479 private volatile boolean completed;
480
481 private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
482 this.channel = channel;
483 }
484
485 @Override
486 public void close() {
487 channel.close();
488 }
489
490 @Override
491 public void submit(
492 final HttpResponse response,
493 final boolean endStream,
494 final FlushMode flushMode) throws HttpException, IOException {
495 synchronized (this) {
496 if (direct) {
497 channel.submit(response, endStream, flushMode);
498 } else {
499 delayedResponse = response;
500 completed = endStream;
501 }
502 }
503 }
504
505 @Override
506 public void suspendOutput() throws IOException {
507 channel.suspendOutput();
508 }
509
510 @Override
511 public void requestOutput() {
512 channel.requestOutput();
513 }
514
515 @Override
516 public Timeout getSocketTimeout() {
517 return channel.getSocketTimeout();
518 }
519
520 @Override
521 public void setSocketTimeout(final Timeout timeout) {
522 channel.setSocketTimeout(timeout);
523 }
524
525 @Override
526 public int write(final ByteBuffer src) throws IOException {
527 synchronized (this) {
528 return direct ? channel.write(src) : 0;
529 }
530 }
531
532 @Override
533 public void complete(final List<? extends Header> trailers) throws IOException {
534 synchronized (this) {
535 if (direct) {
536 channel.complete(trailers);
537 } else {
538 completed = true;
539 }
540 }
541 }
542
543 @Override
544 public boolean abortGracefully() throws IOException {
545 synchronized (this) {
546 if (direct) {
547 return channel.abortGracefully();
548 }
549 completed = true;
550 return true;
551 }
552 }
553
554 @Override
555 public boolean isCompleted() {
556 synchronized (this) {
557 return direct ? channel.isCompleted() : completed;
558 }
559 }
560
561 @Override
562 public void activate() throws IOException, HttpException {
563 synchronized (this) {
564 direct = true;
565 if (delayedResponse != null) {
566 channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
567 delayedResponse = null;
568 }
569 }
570 }
571
572 }
573
574 }