1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ReadableByteChannel;
33 import java.nio.channels.WritableByteChannel;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37
38 import org.apache.hc.core5.annotation.Internal;
39 import org.apache.hc.core5.http.ConnectionClosedException;
40 import org.apache.hc.core5.http.ConnectionReuseStrategy;
41 import org.apache.hc.core5.http.ContentLengthStrategy;
42 import org.apache.hc.core5.http.EntityDetails;
43 import org.apache.hc.core5.http.Header;
44 import org.apache.hc.core5.http.HttpException;
45 import org.apache.hc.core5.http.HttpRequest;
46 import org.apache.hc.core5.http.HttpResponse;
47 import org.apache.hc.core5.http.HttpStatus;
48 import org.apache.hc.core5.http.config.CharCodingConfig;
49 import org.apache.hc.core5.http.config.Http1Config;
50 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
51 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
52 import org.apache.hc.core5.http.impl.DefaultConnectionReuseStrategy;
53 import org.apache.hc.core5.http.impl.Http1StreamListener;
54 import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
55 import org.apache.hc.core5.http.nio.CapacityChannel;
56 import org.apache.hc.core5.http.nio.ContentDecoder;
57 import org.apache.hc.core5.http.nio.ContentEncoder;
58 import org.apache.hc.core5.http.nio.HandlerFactory;
59 import org.apache.hc.core5.http.nio.NHttpMessageParser;
60 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
61 import org.apache.hc.core5.http.nio.SessionInputBuffer;
62 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
63 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
64 import org.apache.hc.core5.http.protocol.HttpCoreContext;
65 import org.apache.hc.core5.http.protocol.HttpProcessor;
66 import org.apache.hc.core5.io.CloseMode;
67 import org.apache.hc.core5.reactor.ProtocolIOSession;
68 import org.apache.hc.core5.util.Args;
69 import org.apache.hc.core5.util.Asserts;
70 import org.apache.hc.core5.util.Timeout;
71
72
73
74
75
76
77
78
79 @Internal
80 public class ServerHttp1StreamDuplexer extends AbstractHttp1StreamDuplexer<HttpRequest, HttpResponse> {
81
82 private final String scheme;
83 private final HttpProcessor httpProcessor;
84 private final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory;
85 private final Http1Config http1Config;
86 private final ConnectionReuseStrategy connectionReuseStrategy;
87 private final Http1StreamListener streamListener;
88 private final Queue<ServerHttp1StreamHandler> pipeline;
89 private final Http1StreamChannel<HttpResponse> outputChannel;
90
91 private volatile ServerHttp1StreamHandler outgoing;
92 private volatile ServerHttp1StreamHandler incoming;
93
94 public ServerHttp1StreamDuplexer(
95 final ProtocolIOSession ioSession,
96 final HttpProcessor httpProcessor,
97 final HandlerFactory<AsyncServerExchangeHandler> exchangeHandlerFactory,
98 final String scheme,
99 final Http1Config http1Config,
100 final CharCodingConfig charCodingConfig,
101 final ConnectionReuseStrategy connectionReuseStrategy,
102 final NHttpMessageParser<HttpRequest> incomingMessageParser,
103 final NHttpMessageWriter<HttpResponse> outgoingMessageWriter,
104 final ContentLengthStrategy incomingContentStrategy,
105 final ContentLengthStrategy outgoingContentStrategy,
106 final Http1StreamListener streamListener) {
107 super(ioSession, http1Config, charCodingConfig, incomingMessageParser, outgoingMessageWriter, incomingContentStrategy, outgoingContentStrategy);
108 this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
109 this.exchangeHandlerFactory = Args.notNull(exchangeHandlerFactory, "Exchange handler factory");
110 this.scheme = scheme;
111 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
112 this.connectionReuseStrategy = connectionReuseStrategy != null ? connectionReuseStrategy :
113 DefaultConnectionReuseStrategy.INSTANCE;
114 this.streamListener = streamListener;
115 this.pipeline = new ConcurrentLinkedQueue<>();
116 this.outputChannel = new Http1StreamChannel<HttpResponse>() {
117
118 @Override
119 public void close() {
120 ServerHttp1StreamDuplexer.this.close(CloseMode.GRACEFUL);
121 }
122
123 @Override
124 public void submit(
125 final HttpResponse response,
126 final boolean endStream,
127 final FlushMode flushMode) throws HttpException, IOException {
128 if (streamListener != null) {
129 streamListener.onResponseHead(ServerHttp1StreamDuplexer.this, response);
130 }
131 commitMessageHead(response, endStream, flushMode);
132 }
133
134 @Override
135 public void requestOutput() {
136 requestSessionOutput();
137 }
138
139 @Override
140 public void suspendOutput() throws IOException {
141 suspendSessionOutput();
142 }
143
144 @Override
145 public Timeout getSocketTimeout() {
146 return getSessionTimeout();
147 }
148
149 @Override
150 public void setSocketTimeout(final Timeout timeout) {
151 setSessionTimeout(timeout);
152 }
153
154 @Override
155 public int write(final ByteBuffer src) throws IOException {
156 return streamOutput(src);
157 }
158
159 @Override
160 public void complete(final List<? extends Header> trailers) throws IOException {
161 endOutputStream(trailers);
162 }
163
164 @Override
165 public boolean isCompleted() {
166 return isOutputCompleted();
167 }
168
169 @Override
170 public boolean abortGracefully() throws IOException {
171 final MessageDelineation messageDelineation = endOutputStream(null);
172 return messageDelineation != MessageDelineation.MESSAGE_HEAD;
173 }
174
175 @Override
176 public void activate() throws HttpException, IOException {
177
178 }
179
180 @Override
181 public String toString() {
182 return "Http1StreamChannel[" + ServerHttp1StreamDuplexer.this + "]";
183 }
184
185 };
186 }
187
188 @Override
189 void terminate(final Exception exception) {
190 if (incoming != null) {
191 incoming.failed(exception);
192 incoming.releaseResources();
193 incoming = null;
194 }
195 if (outgoing != null) {
196 outgoing.failed(exception);
197 outgoing.releaseResources();
198 outgoing = null;
199 }
200 for (;;) {
201 final ServerHttp1StreamHandler handler = pipeline.poll();
202 if (handler != null) {
203 handler.failed(exception);
204 handler.releaseResources();
205 } else {
206 break;
207 }
208 }
209 }
210
211 @Override
212 void disconnected() {
213 if (incoming != null) {
214 if (!incoming.isCompleted()) {
215 incoming.failed(new ConnectionClosedException());
216 }
217 incoming.releaseResources();
218 incoming = null;
219 }
220 if (outgoing != null) {
221 if (!outgoing.isCompleted()) {
222 outgoing.failed(new ConnectionClosedException());
223 }
224 outgoing.releaseResources();
225 outgoing = null;
226 }
227 for (;;) {
228 final ServerHttp1StreamHandler handler = pipeline.poll();
229 if (handler != null) {
230 handler.failed(new ConnectionClosedException());
231 handler.releaseResources();
232 } else {
233 break;
234 }
235 }
236 }
237
238 @Override
239 void updateInputMetrics(final HttpRequest request, final BasicHttpConnectionMetrics connMetrics) {
240 connMetrics.incrementRequestCount();
241 }
242
243 @Override
244 void updateOutputMetrics(final HttpResponse response, final BasicHttpConnectionMetrics connMetrics) {
245 if (response.getCode() >= HttpStatus.SC_OK) {
246 connMetrics.incrementRequestCount();
247 }
248 }
249
250 @Override
251 protected boolean handleIncomingMessage(final HttpRequest request) throws HttpException {
252 return true;
253 }
254
255 @Override
256 protected ContentDecoder createContentDecoder(
257 final long len,
258 final ReadableByteChannel channel,
259 final SessionInputBuffer buffer,
260 final BasicHttpTransportMetrics metrics) throws HttpException {
261 if (len >= 0) {
262 return new LengthDelimitedDecoder(channel, buffer, metrics, len);
263 } else if (len == ContentLengthStrategy.CHUNKED) {
264 return new ChunkDecoder(channel, buffer, http1Config, metrics);
265 } else {
266 return null;
267 }
268 }
269
270 @Override
271 protected boolean handleOutgoingMessage(final HttpResponse response) throws HttpException {
272 return true;
273 }
274
275 @Override
276 protected ContentEncoder createContentEncoder(
277 final long len,
278 final WritableByteChannel channel,
279 final SessionOutputBuffer buffer,
280 final BasicHttpTransportMetrics metrics) throws HttpException {
281 final int chunkSizeHint = http1Config.getChunkSizeHint() >= 0 ? http1Config.getChunkSizeHint() : 2048;
282 if (len >= 0) {
283 return new LengthDelimitedEncoder(channel, buffer, metrics, len, chunkSizeHint);
284 } else if (len == ContentLengthStrategy.CHUNKED) {
285 return new ChunkEncoder(channel, buffer, metrics, chunkSizeHint);
286 } else {
287 return new IdentityEncoder(channel, buffer, metrics, chunkSizeHint);
288 }
289 }
290
291 @Override
292 boolean inputIdle() {
293 return incoming == null;
294 }
295
296 @Override
297 boolean outputIdle() {
298 return outgoing == null && pipeline.isEmpty();
299 }
300
301 @Override
302 HttpRequest parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
303 try {
304 return super.parseMessageHead(endOfStream);
305 } catch (final HttpException ex) {
306 terminateExchange(ex);
307 return null;
308 }
309 }
310
311 void terminateExchange(final HttpException ex) throws HttpException, IOException {
312 suspendSessionInput();
313 final ServerHttp1StreamHandler streamHandler;
314 final HttpCoreContext context = HttpCoreContext.create();
315 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
316 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
317 if (outgoing == null) {
318 streamHandler = new ServerHttp1StreamHandler(
319 outputChannel,
320 httpProcessor,
321 connectionReuseStrategy,
322 exchangeHandlerFactory,
323 context);
324 outgoing = streamHandler;
325 } else {
326 streamHandler = new ServerHttp1StreamHandler(
327 new DelayedOutputChannel(outputChannel),
328 httpProcessor,
329 connectionReuseStrategy,
330 exchangeHandlerFactory,
331 context);
332 pipeline.add(streamHandler);
333 }
334 streamHandler.terminateExchange(ex);
335 incoming = null;
336 }
337
338 @Override
339 void consumeHeader(final HttpRequest request, final EntityDetails entityDetails) throws HttpException, IOException {
340 if (streamListener != null) {
341 streamListener.onRequestHead(this, request);
342 }
343 final ServerHttp1StreamHandler streamHandler;
344 final HttpCoreContext context = HttpCoreContext.create();
345 context.setAttribute(HttpCoreContext.SSL_SESSION, getSSLSession());
346 context.setAttribute(HttpCoreContext.CONNECTION_ENDPOINT, getEndpointDetails());
347 if (outgoing == null) {
348 streamHandler = new ServerHttp1StreamHandler(
349 outputChannel,
350 httpProcessor,
351 connectionReuseStrategy,
352 exchangeHandlerFactory,
353 context);
354 outgoing = streamHandler;
355 } else {
356 streamHandler = new ServerHttp1StreamHandler(
357 new DelayedOutputChannel(outputChannel),
358 httpProcessor,
359 connectionReuseStrategy,
360 exchangeHandlerFactory,
361 context);
362 pipeline.add(streamHandler);
363 }
364 request.setScheme(scheme);
365 streamHandler.consumeHeader(request, entityDetails);
366 incoming = streamHandler;
367 }
368
369 @Override
370 void consumeData(final ByteBuffer src) throws HttpException, IOException {
371 Asserts.notNull(incoming, "Request stream handler");
372 incoming.consumeData(src);
373 }
374
375 @Override
376 void updateCapacity(final CapacityChannel capacityChannel) throws HttpException, IOException {
377 Asserts.notNull(incoming, "Request stream handler");
378 incoming.updateCapacity(capacityChannel);
379 }
380
381 @Override
382 void dataEnd(final List<? extends Header> trailers) throws HttpException, IOException {
383 Asserts.notNull(incoming, "Request stream handler");
384 incoming.dataEnd(trailers);
385 }
386
387 @Override
388 void inputEnd() throws HttpException, IOException {
389 if (incoming != null) {
390 if (incoming.isCompleted()) {
391 incoming.releaseResources();
392 }
393 incoming = null;
394 }
395 }
396
397 @Override
398 void execute(final RequestExecutionCommand executionCommand) throws HttpException {
399 throw new HttpException("Illegal command: " + executionCommand.getClass());
400 }
401
402 @Override
403 boolean isOutputReady() {
404 return outgoing != null && outgoing.isOutputReady();
405 }
406
407 @Override
408 void produceOutput() throws HttpException, IOException {
409 if (outgoing != null) {
410 outgoing.produceOutput();
411 }
412 }
413
414 @Override
415 void outputEnd() throws HttpException, IOException {
416 if (outgoing != null && outgoing.isResponseFinal()) {
417 if (streamListener != null) {
418 streamListener.onExchangeComplete(this, outgoing.keepAlive());
419 }
420 if (outgoing.isCompleted()) {
421 outgoing.releaseResources();
422 }
423 outgoing = null;
424 }
425 if (outgoing == null && isOpen()) {
426 final ServerHttp1StreamHandler handler = pipeline.poll();
427 if (handler != null) {
428 outgoing = handler;
429 handler.activateChannel();
430 if (handler.isOutputReady()) {
431 handler.produceOutput();
432 }
433 }
434 }
435 }
436
437 @Override
438 boolean handleTimeout() {
439 return false;
440 }
441
442 @Override
443 void appendState(final StringBuilder buf) {
444 super.appendState(buf);
445 buf.append(", incoming=[");
446 if (incoming != null) {
447 incoming.appendState(buf);
448 }
449 buf.append("], outgoing=[");
450 if (outgoing != null) {
451 outgoing.appendState(buf);
452 }
453 buf.append("], pipeline=");
454 buf.append(pipeline.size());
455 }
456
457 @Override
458 public String toString() {
459 final StringBuilder buf = new StringBuilder();
460 buf.append("[");
461 appendState(buf);
462 buf.append("]");
463 return buf.toString();
464 }
465
466 private static class DelayedOutputChannel implements Http1StreamChannel<HttpResponse> {
467
468 private final Http1StreamChannel<HttpResponse> channel;
469
470 private volatile boolean direct;
471 private volatile HttpResponse delayedResponse;
472 private volatile boolean completed;
473
474 private DelayedOutputChannel(final Http1StreamChannel<HttpResponse> channel) {
475 this.channel = channel;
476 }
477
478 @Override
479 public void close() {
480 channel.close();
481 }
482
483 @Override
484 public void submit(
485 final HttpResponse response,
486 final boolean endStream,
487 final FlushMode flushMode) throws HttpException, IOException {
488 synchronized (this) {
489 if (direct) {
490 channel.submit(response, endStream, flushMode);
491 } else {
492 delayedResponse = response;
493 completed = endStream;
494 }
495 }
496 }
497
498 @Override
499 public void suspendOutput() throws IOException {
500 channel.suspendOutput();
501 }
502
503 @Override
504 public void requestOutput() {
505 channel.requestOutput();
506 }
507
508 @Override
509 public Timeout getSocketTimeout() {
510 return channel.getSocketTimeout();
511 }
512
513 @Override
514 public void setSocketTimeout(final Timeout timeout) {
515 channel.setSocketTimeout(timeout);
516 }
517
518 @Override
519 public int write(final ByteBuffer src) throws IOException {
520 synchronized (this) {
521 return direct ? channel.write(src) : 0;
522 }
523 }
524
525 @Override
526 public void complete(final List<? extends Header> trailers) throws IOException {
527 synchronized (this) {
528 if (direct) {
529 channel.complete(trailers);
530 } else {
531 completed = true;
532 }
533 }
534 }
535
536 @Override
537 public boolean abortGracefully() throws IOException {
538 synchronized (this) {
539 if (direct) {
540 return channel.abortGracefully();
541 }
542 completed = true;
543 return true;
544 }
545 }
546
547 @Override
548 public boolean isCompleted() {
549 synchronized (this) {
550 return direct ? channel.isCompleted() : completed;
551 }
552 }
553
554 @Override
555 public void activate() throws IOException, HttpException {
556 synchronized (this) {
557 direct = true;
558 if (delayedResponse != null) {
559 channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
560 delayedResponse = null;
561 }
562 }
563 }
564
565 }
566
567 }