1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ReadableByteChannel;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37
38 import org.apache.hc.core5.annotation.Internal;
39 import org.apache.hc.core5.http.ConnectionClosedException;
40 import org.apache.hc.core5.http.ConnectionReuseStrategy;
41 import org.apache.hc.core5.http.ContentLengthStrategy;
42 import org.apache.hc.core5.http.EntityDetails;
43 import org.apache.hc.core5.http.Header;
44 import org.apache.hc.core5.http.HttpException;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.config.CharCodingConfig;
49 import org.apache.hc.core5.http.config.Http1Config;
50 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53 import org.apache.hc.core5.http.impl.Http1StreamListener;
54 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55 import org.apache.hc.core5.http.nio.CapacityChannel;
56 import org.apache.hc.core5.http.nio.ContentDecoder;
57 import org.apache.hc.core5.http.nio.ContentEncoder;
58 import org.apache.hc.core5.http.nio.HandlerFactory;
59 import org.apache.hc.core5.http.nio.NHttpMessageParser;
60 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61 import org.apache.hc.core5.http.nio.SessionInputBuffer;
62 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64 import org.apache.hc.core5.http.protocol.HttpCoreContext;
65 import org.apache.hc.core5.http.protocol.HttpProcessor;
66 import org.apache.hc.core5.io.CloseMode;
67 import org.apache.hc.core5.reactor.ProtocolIOSession;
68 import org.apache.hc.core5.util.Args;
69 import org.apache.hc.core5.util.Asserts;
70 import org.apache.hc.core5.util.Timeout;
71
72
73
74
75
76
77
78
79 @Internal
80 public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
81
82 private final String scheme;
83 private final HttpProcessor httpProcessor;
84 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
85 private final Http1Config http1Config;
86 private final ConnectionReuseStrategy connectionReuseStrategy;
87 private final Http1StreamListener streamListener;
88 private final Queue<ServerHttp1StreamHandler> pipeline;
89 private final Http1StreamChannel<HttpResponse> outputChannel;
90
91 private volatile ServerHttp1StreamHandler outgoing;
92 private volatile ServerHttp1StreamHandler incoming;
93
94 public ServerHttp1StreamDuplexer(
95 final ProtocolIOSession ioSession,
96 final HttpProcessor httpProcessor,
97 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
98 final String scheme,
99 final Http1Config http1Config,
100 final CharCodingConfig charCodingConfig,
101 final ConnectionReuseStrategy connectionReuseStrategy,
102 final NHttpMessageParser<HttpRequest> incomingMessageParser,
103 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
104 final ContentLengthStrategy incomingContentStrategy,
105 final ContentLengthStrategy outgoingContentStrategy,
106 final Http1StreamListener streamListener) {
107 super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter,
108 incomingContentStrategy, outgoingContentStrategy);
109 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
110 this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
111 this.scheme = scheme;
112 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
113 this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
114 DefaultConnectionReuseStrategy.INSTANCE;
115 this.streamListener = streamListener;
116 this.pipeline = new ConcurrentLinkedQueue<>();
117 this.outputChannel = new Http1StreamChannel<HttpResponse>() {
118
119 @Override
120 public void close() {
121 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
122 }
123
124 @Override
125 public void submit(
126 final HttpResponse response,
127 final boolean endStream,
128 final FlushMode flushMode) throws HttpException, IOException {
129 if (streamListener != null) {
130 streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
131 }
132 commitMessageHead(response, endStream, flushMode);
133 }
134
135 @Override
136 public void requestOutput() {
137 requestSessionOutput();
138 }
139
140 @Override
141 public void suspendOutput() throws IOException {
142 suspendSessionOutput();
143 }
144
145 @Override
146 public Timeout getSocketTimeout() {
147 return getSessionTimeout();
148 }
149
150 @Override
151 public void setSocketTimeout(final Timeout timeout) {
152 setSessionTimeout(timeout);
153 }
154
155 @Override
156 public int write(final ByteBuffer src) throws IOException {
157 return streamOutput(src);
158 }
159
160 @Override
161 public void complete(final List<? extends Header> trailers) throws IOException {
162 endOutputStream(trailers);
163 }
164
165 @Override
166 public boolean isCompleted() {
167 return isOutputCompleted();
168 }
169
170 @Override
171 public boolean abortGracefully() throws IOException {
172 final MessageDelineation messageDelineation = endOutputStream(null);
173 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
174 }
175
176 @Override
177 public void activate() throws HttpException, IOException {
178
179 }
180
181 @Override
182 public String toString() {
183 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
184 }
185
186 };
187 }
188
189 @Override
190 void terminate(final Exception exception) {
191 if (incoming != null) {
192 incoming.failed(exception);
193 incoming.releaseResources();
194 incoming = null;
195 }
196 if (outgoing != null) {
197 outgoing.failed(exception);
198 outgoing.releaseResources();
199 outgoing = null;
200 }
201 for (;;) {
202 final ServerHttp1StreamHandler handler = pipeline.poll();
203 if (handler != null) {
204 handler.failed(exception);
205 handler.releaseResources();
206 } else {
207 break;
208 }
209 }
210 }
211
212 @Override
213 void disconnected() {
214 if (incoming != null) {
215 if (!incoming.isCompleted()) {
216 incoming.failed(new ConnectionClosedException());
217 }
218 incoming.releaseResources();
219 incoming = null;
220 }
221 if (outgoing != null) {
222 if (!outgoing.isCompleted()) {
223 outgoing.failed(new ConnectionClosedException());
224 }
225 outgoing.releaseResources();
226 outgoing = null;
227 }
228 for (;;) {
229 final ServerHttp1StreamHandler handler = pipeline.poll();
230 if (handler != null) {
231 handler.failed(new ConnectionClosedException());
232 handler.releaseResources();
233 } else {
234 break;
235 }
236 }
237 }
238
239 @Override
240 void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
241 connMetrics.incrementRequestCount();
242 }
243
244 @Override
245 void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
246 if (response.getCode() >= HttpStatus.SC_OK) {
247 connMetrics.incrementRequestCount();
248 }
249 }
250
251 @Override
252 protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
253 return true;
254 }
255
256 @Override
257 protected ContentDecoder createContentDecoder(
258 final long len,
259 final ReadableByteChannel channel,
260 final SessionInputBuffer buffer,
261 final BasicHttpTransportMetrics metrics) throws HttpException {
262 if (len >= 0) {
263 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
264 } else if (len == ContentLengthStrategy.CHUNKED) {
265 return new ChunkDecoder(channel, buffer, http1Config, metrics);
266 } else {
267 return null;
268 }
269 }
270
271 @Override
272 protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
273 return true;
274 }
275
276 @Override
277 protected ContentEncoder createContentEncoder(
278 final long len,
279 final WritableByteChannel channel,
280 final SessionOutputBuffer buffer,
281 final BasicHttpTransportMetrics metrics) throws HttpException {
282 final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
283 if (len >= 0) {
284 return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
285 } else if (len == ContentLengthStrategy.CHUNKED) {
286 return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
287 } else {
288 return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
289 }
290 }
291
292 @Override
293 boolean inputIdle() {
294 return incoming == null;
295 }
296
297 @Override
298 boolean outputIdle() {
299 return outgoing == null && pipeline.isEmpty();
300 }
301
302 @Override
303 HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
304 try {
305 return super.parseMessageHead(endOfStream);
306 } catch (final HttpException ex) {
307 terminateExchange(ex);
308 return null;
309 }
310 }
311
312 void terminateExchange(final HttpException ex) throws HttpException, IOException {
313 suspendSessionInput();
314 final ServerHttp1StreamHandler streamHandler;
315 final HttpCoreContext context = HttpCoreContext.create();
316 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
317 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
318 if (outgoing == null) {
319 streamHandler = new ServerHttp1StreamHandler(
320 outputChannel,
321 httpProcessor,
322 connectionReuseStrategy,
323 exchangeHandlerFactory,
324 context);
325 outgoing = streamHandler;
326 } else {
327 streamHandler = new ServerHttp1StreamHandler(
328 new DelayedOutputChannel(outputChannel),
329 httpProcessor,
330 connectionReuseStrategy,
331 exchangeHandlerFactory,
332 context);
333 pipeline.add(streamHandler);
334 }
335 streamHandler.terminateExchange(ex);
336 incoming = null;
337 }
338
339 @Override
340 void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
341 if (streamListener != null) {
342 streamListener.onRequestHead(this, request);
343 }
344 final ServerHttp1StreamHandler streamHandler;
345 final HttpCoreContext context = HttpCoreContext.create();
346 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
347 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
348 if (outgoing == null) {
349 streamHandler = new ServerHttp1StreamHandler(
350 outputChannel,
351 httpProcessor,
352 connectionReuseStrategy,
353 exchangeHandlerFactory,
354 context);
355 outgoing = streamHandler;
356 } else {
357 streamHandler = new ServerHttp1StreamHandler(
358 new DelayedOutputChannel(outputChannel),
359 httpProcessor,
360 connectionReuseStrategy,
361 exchangeHandlerFactory,
362 context);
363 pipeline.add(streamHandler);
364 }
365 request.setScheme(scheme);
366 streamHandler.consumeHeader(request, entityDetails);
367 incoming = streamHandler;
368 }
369
370 @Override
371 void consumeData(final ByteBuffer src) throws HttpException, IOException {
372 Asserts.notNull(incoming, "Request stream handler");
373 incoming.consumeData(src);
374 }
375
376 @Override
377 void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
378 Asserts.notNull(incoming, "Request stream handler");
379 incoming.updateCapacity(capacityChannel);
380 }
381
382 @Override
383 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
384 Asserts.notNull(incoming, "Request stream handler");
385 incoming.dataEnd(trailers);
386 }
387
388 @Override
389 void inputEnd() throws HttpException, IOException {
390 if (incoming != null) {
391 if (incoming.isCompleted()) {
392 incoming.releaseResources();
393 }
394 incoming = null;
395 }
396 }
397
398 @Override
399 void execute(final RequestExecutionCommand executionCommand) throws HttpException {
400 throw new HttpException("Illegal command: " + executionCommand.getClass());
401 }
402
403 @Override
404 boolean isOutputReady() {
405 return outgoing != null && outgoing.isOutputReady();
406 }
407
408 @Override
409 void produceOutput() throws HttpException, IOException {
410 if (outgoing != null) {
411 outgoing.produceOutput();
412 }
413 }
414
415 @Override
416 void outputEnd() throws HttpException, IOException {
417 if (outgoing != null && outgoing.isResponseFinal()) {
418 if (streamListener != null) {
419 streamListener.onExchangeComplete(this, outgoing.keepAlive());
420 }
421 if (outgoing.isCompleted()) {
422 outgoing.releaseResources();
423 }
424 outgoing = null;
425 }
426 if (outgoing == null && isOpen()) {
427 final ServerHttp1StreamHandler handler = pipeline.poll();
428 if (handler != null) {
429 outgoing = handler;
430 handler.activateChannel();
431 if (handler.isOutputReady()) {
432 handler.produceOutput();
433 }
434 }
435 }
436 }
437
438 @Override
439 boolean handleTimeout() {
440 return false;
441 }
442
443 @Override
444 void appendState(final StringBuilder buf) {
445 super.appendState(buf);
446 buf.append(", incoming=[");
447 if (incoming != null) {
448 incoming.appendState(buf);
449 }
450 buf.append("], outgoing=[");
451 if (outgoing != null) {
452 outgoing.appendState(buf);
453 }
454 buf.append("], pipeline=");
455 buf.append(pipeline.size());
456 }
457
458 @Override
459 public String toString() {
460 final StringBuilder buf = new StringBuilder();
461 buf.append("[");
462 appendState(buf);
463 buf.append("]");
464 return buf.toString();
465 }
466
467 private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
468
469 private final Http1StreamChannel<HttpResponse> channel;
470
471 private volatile boolean direct;
472 private volatile HttpResponse delayedResponse;
473 private volatile boolean completed;
474
475 private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
476 this.channel = channel;
477 }
478
479 @Override
480 public void close() {
481 channel.close();
482 }
483
484 @Override
485 public void submit(
486 final HttpResponse response,
487 final boolean endStream,
488 final FlushMode flushMode) throws HttpException, IOException {
489 synchronized (this) {
490 if (direct) {
491 channel.submit(response, endStream, flushMode);
492 } else {
493 delayedResponse = response;
494 completed = endStream;
495 }
496 }
497 }
498
499 @Override
500 public void suspendOutput() throws IOException {
501 channel.suspendOutput();
502 }
503
504 @Override
505 public void requestOutput() {
506 channel.requestOutput();
507 }
508
509 @Override
510 public Timeout getSocketTimeout() {
511 return channel.getSocketTimeout();
512 }
513
514 @Override
515 public void setSocketTimeout(final Timeout timeout) {
516 channel.setSocketTimeout(timeout);
517 }
518
519 @Override
520 public int write(final ByteBuffer src) throws IOException {
521 synchronized (this) {
522 return direct ? channel.write(src) : 0;
523 }
524 }
525
526 @Override
527 public void complete(final List<? extends Header> trailers) throws IOException {
528 synchronized (this) {
529 if (direct) {
530 channel.complete(trailers);
531 } else {
532 completed = true;
533 }
534 }
535 }
536
537 @Override
538 public boolean abortGracefully() throws IOException {
539 synchronized (this) {
540 if (direct) {
541 return channel.abortGracefully();
542 }
543 completed = true;
544 return true;
545 }
546 }
547
548 @Override
549 public boolean isCompleted() {
550 synchronized (this) {
551 return direct ? channel.isCompleted() : completed;
552 }
553 }
554
555 @Override
556 public void activate() throws IOException, HttpException {
557 synchronized (this) {
558 direct = true;
559 if (delayedResponse != null) {
560 channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
561 delayedResponse = null;
562 }
563 }
564 }
565
566 }
567
568 }