1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ReadableByteChannel;
34 import java.nio.channels.SelectionKey;
35 import java.nio.channels.WritableByteChannel;
36 import java.util.List;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import javax.net.ssl.SSLSession;
40
41 import org.apache.hc.core5.http.ConnectionClosedException;
42 import org.apache.hc.core5.http.ContentLengthStrategy;
43 import org.apache.hc.core5.http.EndpointDetails;
44 import org.apache.hc.core5.http.EntityDetails;
45 import org.apache.hc.core5.http.Header;
46 import org.apache.hc.core5.http.HttpConnection;
47 import org.apache.hc.core5.http.HttpException;
48 import org.apache.hc.core5.http.HttpMessage;
49 import org.apache.hc.core5.http.Message;
50 import org.apache.hc.core5.http.ProtocolVersion;
51 import org.apache.hc.core5.http.config.CharCodingConfig;
52 import org.apache.hc.core5.http.config.Http1Config;
53 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
54 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
55 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
56 import org.apache.hc.core5.http.impl.CharCodingSupport;
57 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
58 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
59 import org.apache.hc.core5.http.nio.CapacityChannel;
60 import org.apache.hc.core5.http.nio.ContentDecoder;
61 import org.apache.hc.core5.http.nio.ContentEncoder;
62 import org.apache.hc.core5.http.nio.NHttpMessageParser;
63 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
64 import org.apache.hc.core5.http.nio.SessionInputBuffer;
65 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
66 import org.apache.hc.core5.http.nio.command.CommandSupport;
67 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
68 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
69 import org.apache.hc.core5.io.CloseMode;
70 import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
71 import org.apache.hc.core5.reactor.Command;
72 import org.apache.hc.core5.reactor.EventMask;
73 import org.apache.hc.core5.reactor.IOSession;
74 import org.apache.hc.core5.reactor.ProtocolIOSession;
75 import org.apache.hc.core5.reactor.ssl.TlsDetails;
76 import org.apache.hc.core5.util.Args;
77 import org.apache.hc.core5.util.Identifiable;
78 import org.apache.hc.core5.util.Timeout;
79
80 abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
81 implements Identifiable, HttpConnection {
82
83 private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
84
85 private final ProtocolIOSession ioSession;
86 private final Http1Config http1Config;
87 private final SessionInputBufferImpl inbuf;
88 private final SessionOutputBufferImpl outbuf;
89 private final BasicHttpTransportMetrics inTransportMetrics;
90 private final BasicHttpTransportMetrics outTransportMetrics;
91 private final BasicHttpConnectionMetrics connMetrics;
92 private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
93 private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
94 private final ContentLengthStrategy incomingContentStrategy;
95 private final ContentLengthStrategy outgoingContentStrategy;
96 private final ByteBuffer contentBuffer;
97 private final AtomicInteger outputRequests;
98
99 private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
100 private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
101 private volatile ConnectionState connState;
102 private volatile CapacityWindow capacityWindow;
103
104 private volatile ProtocolVersion version;
105 private volatile EndpointDetails endpointDetails;
106
107 AbstractHttp1StreamDuplexer(
108 final ProtocolIOSession ioSession,
109 final Http1Config http1Config,
110 final CharCodingConfig charCodingConfig,
111 final NHttpMessageParser<IncomingMessage> incomingMessageParser,
112 final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
113 final ContentLengthStrategy incomingContentStrategy,
114 final ContentLengthStrategy outgoingContentStrategy) {
115 this.ioSession = Args.notNull(ioSession, "I/O session");
116 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
117 final int bufferSize = this.http1Config.getBufferSize();
118 this.inbuf = new SessionInputBufferImpl(bufferSize, Math.min(bufferSize, 512),
119 this.http1Config.getMaxLineLength(),
120 CharCodingSupport.createDecoder(charCodingConfig));
121 this.outbuf = new SessionOutputBufferImpl(bufferSize, Math.min(bufferSize, 512),
122 CharCodingSupport.createEncoder(charCodingConfig));
123 this.inTransportMetrics = new BasicHttpTransportMetrics();
124 this.outTransportMetrics = new BasicHttpTransportMetrics();
125 this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
126 this.incomingMessageParser = incomingMessageParser;
127 this.outgoingMessageWriter = outgoingMessageWriter;
128 this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
129 DefaultContentLengthStrategy.INSTANCE;
130 this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
131 DefaultContentLengthStrategy.INSTANCE;
132 this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
133 this.outputRequests = new AtomicInteger(0);
134 this.connState = ConnectionState.READY;
135 }
136
137 @Override
138 public String getId() {
139 return ioSession.getId();
140 }
141
142 boolean isActive() {
143 return connState == ConnectionState.ACTIVE;
144 }
145
146 boolean isShuttingDown() {
147 return connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0;
148 }
149
150 void shutdownSession(final CloseMode closeMode) {
151 if (closeMode == CloseMode.GRACEFUL) {
152 connState = ConnectionState.GRACEFUL_SHUTDOWN;
153 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
154 } else {
155 connState = ConnectionState.SHUTDOWN;
156 ioSession.close();
157 }
158 }
159
160 void shutdownSession(final Exception cause) {
161 connState = ConnectionState.SHUTDOWN;
162 try {
163 terminate(cause);
164 } finally {
165 final CloseMode closeMode;
166 if (cause instanceof ConnectionClosedException) {
167 closeMode = CloseMode.GRACEFUL;
168 } else if (cause instanceof IOException) {
169 closeMode = CloseMode.IMMEDIATE;
170 } else {
171 closeMode = CloseMode.GRACEFUL;
172 }
173 ioSession.close(closeMode);
174 }
175 }
176
177 abstract void disconnected();
178
179 abstract void terminate(final Exception exception);
180
181 abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
182
183 abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
184
185 abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
186
187 abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
188
189 abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
190
191 abstract ContentDecoder createContentDecoder(
192 long contentLength,
193 ReadableByteChannel channel,
194 SessionInputBuffer buffer,
195 BasicHttpTransportMetrics metrics) throws HttpException;
196
197 abstract ContentEncoder createContentEncoder(
198 long contentLength,
199 WritableByteChannel channel,
200 SessionOutputBuffer buffer,
201 BasicHttpTransportMetrics metrics) throws HttpException;
202
203 abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
204
205 abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
206
207 abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
208
209 abstract boolean isOutputReady();
210
211 abstract void produceOutput() throws HttpException, IOException;
212
213 abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
214
215 abstract void inputEnd() throws HttpException, IOException;
216
217 abstract void outputEnd() throws HttpException, IOException;
218
219 abstract boolean inputIdle();
220
221 abstract boolean outputIdle();
222
223 abstract boolean handleTimeout();
224
225 private void processCommands() throws HttpException, IOException {
226 for (;;) {
227 final Command command = ioSession.poll();
228 if (command == null) {
229 return;
230 }
231 if (command instanceof ShutdownCommand) {
232 final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
233 requestShutdown(shutdownCommand.getType());
234 } else if (command instanceof RequestExecutionCommand) {
235 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
236 command.cancel();
237 } else {
238 execute((RequestExecutionCommand) command);
239 return;
240 }
241 } else {
242 throw new HttpException("Unexpected command: " + command.getClass());
243 }
244 }
245 }
246
247 public final void onConnect() throws HttpException, IOException {
248 if (connState == ConnectionState.READY) {
249 connState = ConnectionState.ACTIVE;
250 processCommands();
251 }
252 }
253
254 IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
255 final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
256 if (messageHead != null) {
257 incomingMessageParser.reset();
258 }
259 return messageHead;
260 }
261
262 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
263 if (src != null) {
264 final int n = src.remaining();
265 inbuf.put(src);
266 inTransportMetrics.incrementBytesTransferred(n);
267 }
268
269 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
270 ioSession.clearEvent(SelectionKey.OP_READ);
271 return;
272 }
273
274 boolean endOfStream = false;
275 if (incomingMessage == null) {
276 final int bytesRead = inbuf.fill(ioSession);
277 if (bytesRead > 0) {
278 inTransportMetrics.incrementBytesTransferred(bytesRead);
279 }
280 endOfStream = bytesRead == -1;
281 }
282
283 do {
284 if (incomingMessage == null) {
285
286 final IncomingMessage messageHead = parseMessageHead(endOfStream);
287 if (messageHead != null) {
288 this.version = messageHead.getVersion();
289
290 updateInputMetrics(messageHead, connMetrics);
291 final ContentDecoder contentDecoder;
292 if (handleIncomingMessage(messageHead)) {
293 final long len = incomingContentStrategy.determineLength(messageHead);
294 contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
295 consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
296 } else {
297 consumeHeader(messageHead, null);
298 contentDecoder = null;
299 }
300 capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
301 if (contentDecoder != null) {
302 incomingMessage = new Message<>(messageHead, contentDecoder);
303 } else {
304 inputEnd();
305 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
306 ioSession.setEvent(SelectionKey.OP_READ);
307 }
308 }
309 } else {
310 break;
311 }
312 }
313
314 if (incomingMessage != null) {
315 final ContentDecoder contentDecoder = incomingMessage.getBody();
316
317
318
319
320
321 final int bytesRead = contentDecoder.read(contentBuffer);
322 if (bytesRead > 0) {
323 contentBuffer.flip();
324 consumeData(contentBuffer);
325 contentBuffer.clear();
326 final int capacity = capacityWindow.removeCapacity(bytesRead);
327 if (capacity <= 0) {
328 if (!contentDecoder.isCompleted()) {
329 updateCapacity(capacityWindow);
330 }
331 }
332 }
333 if (contentDecoder.isCompleted()) {
334 dataEnd(contentDecoder.getTrailers());
335 capacityWindow.close();
336 incomingMessage = null;
337 ioSession.setEvent(SelectionKey.OP_READ);
338 inputEnd();
339 } else if (bytesRead == 0) {
340 break;
341 }
342 }
343 } while (inbuf.hasData());
344
345 if (endOfStream && !inbuf.hasData()) {
346 if (outputIdle() && inputIdle()) {
347 requestShutdown(CloseMode.GRACEFUL);
348 } else {
349 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
350 }
351 }
352 }
353
354 public final void onOutput() throws IOException, HttpException {
355 ioSession.getLock().lock();
356 try {
357 if (outbuf.hasData()) {
358 final int bytesWritten = outbuf.flush(ioSession);
359 if (bytesWritten > 0) {
360 outTransportMetrics.incrementBytesTransferred(bytesWritten);
361 }
362 }
363 } finally {
364 ioSession.getLock().unlock();
365 }
366 if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
367 final int pendingOutputRequests = outputRequests.get();
368 produceOutput();
369 final boolean outputPending = isOutputReady();
370 final boolean outputEnd;
371 ioSession.getLock().lock();
372 try {
373 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
374 ioSession.clearEvent(SelectionKey.OP_WRITE);
375 } else {
376 outputRequests.addAndGet(-pendingOutputRequests);
377 }
378 outputEnd = outgoingMessage == null && !outbuf.hasData();
379 } finally {
380 ioSession.getLock().unlock();
381 }
382 if (outputEnd) {
383 outputEnd();
384 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
385 processCommands();
386 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
387 connState = ConnectionState.SHUTDOWN;
388 }
389 }
390 }
391 if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
392 ioSession.close();
393 }
394 }
395
396 public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
397 if (!handleTimeout()) {
398 onException(SocketTimeoutExceptionFactory.create(timeout));
399 }
400 }
401
402 public final void onException(final Exception ex) {
403 shutdownSession(ex);
404 CommandSupport.failCommands(ioSession, ex);
405 }
406
407 public final void onDisconnect() {
408 disconnected();
409 CommandSupport.cancelCommands(ioSession);
410 }
411
412 void requestShutdown(final CloseMode closeMode) {
413 switch (closeMode) {
414 case GRACEFUL:
415 if (connState == ConnectionState.ACTIVE) {
416 connState = ConnectionState.GRACEFUL_SHUTDOWN;
417 }
418 break;
419 case IMMEDIATE:
420 connState = ConnectionState.SHUTDOWN;
421 break;
422 }
423 ioSession.setEvent(SelectionKey.OP_WRITE);
424 }
425
426 void commitMessageHead(
427 final OutgoingMessage messageHead,
428 final boolean endStream,
429 final FlushMode flushMode) throws HttpException, IOException {
430 ioSession.getLock().lock();
431 try {
432 outgoingMessageWriter.write(messageHead, outbuf);
433 updateOutputMetrics(messageHead, connMetrics);
434 if (!endStream) {
435 final ContentEncoder contentEncoder;
436 if (handleOutgoingMessage(messageHead)) {
437 final long len = outgoingContentStrategy.determineLength(messageHead);
438 contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
439 } else {
440 contentEncoder = null;
441 }
442 if (contentEncoder != null) {
443 outgoingMessage = new Message<>(messageHead, contentEncoder);
444 }
445 }
446 outgoingMessageWriter.reset();
447 if (flushMode == FlushMode.IMMEDIATE) {
448 final int bytesWritten = outbuf.flush(ioSession);
449 if (bytesWritten > 0) {
450 outTransportMetrics.incrementBytesTransferred(bytesWritten);
451 }
452 }
453 ioSession.setEvent(EventMask.WRITE);
454 } finally {
455 ioSession.getLock().unlock();
456 }
457 }
458
459 void requestSessionInput() {
460 ioSession.setEvent(SelectionKey.OP_READ);
461 }
462
463 void requestSessionOutput() {
464 outputRequests.incrementAndGet();
465 ioSession.setEvent(SelectionKey.OP_WRITE);
466 }
467
468 Timeout getSessionTimeout() {
469 return ioSession.getSocketTimeout();
470 }
471
472 void setSessionTimeout(final Timeout timeout) {
473 ioSession.setSocketTimeout(timeout);
474 }
475
476 void suspendSessionInput() {
477 ioSession.clearEvent(SelectionKey.OP_READ);
478 }
479
480 void suspendSessionOutput() throws IOException {
481 ioSession.getLock().lock();
482 try {
483 if (outbuf.hasData()) {
484 final int bytesWritten = outbuf.flush(ioSession);
485 if (bytesWritten > 0) {
486 outTransportMetrics.incrementBytesTransferred(bytesWritten);
487 }
488 } else {
489 ioSession.clearEvent(SelectionKey.OP_WRITE);
490 }
491 } finally {
492 ioSession.getLock().unlock();
493 }
494 }
495
496 int streamOutput(final ByteBuffer src) throws IOException {
497 ioSession.getLock().lock();
498 try {
499 if (outgoingMessage == null) {
500 throw new ConnectionClosedException();
501 }
502 final ContentEncoder contentEncoder = outgoingMessage.getBody();
503 final int bytesWritten = contentEncoder.write(src);
504 if (bytesWritten > 0) {
505 ioSession.setEvent(SelectionKey.OP_WRITE);
506 }
507 return bytesWritten;
508 } finally {
509 ioSession.getLock().unlock();
510 }
511 }
512
513 enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
514
515 MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
516 ioSession.getLock().lock();
517 try {
518 if (outgoingMessage == null) {
519 return MessageDelineation.NONE;
520 }
521 final ContentEncoder contentEncoder = outgoingMessage.getBody();
522 contentEncoder.complete(trailers);
523 ioSession.setEvent(SelectionKey.OP_WRITE);
524 outgoingMessage = null;
525 return contentEncoder instanceof ChunkEncoder
526 ? MessageDelineation.CHUNK_CODED
527 : MessageDelineation.MESSAGE_HEAD;
528 } finally {
529 ioSession.getLock().unlock();
530 }
531 }
532
533 boolean isOutputCompleted() {
534 ioSession.getLock().lock();
535 try {
536 if (outgoingMessage == null) {
537 return true;
538 }
539 final ContentEncoder contentEncoder = outgoingMessage.getBody();
540 return contentEncoder.isCompleted();
541 } finally {
542 ioSession.getLock().unlock();
543 }
544 }
545
546 @Override
547 public void close() throws IOException {
548 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
549 }
550
551 @Override
552 public void close(final CloseMode closeMode) {
553 ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
554 }
555
556 @Override
557 public boolean isOpen() {
558 return connState == ConnectionState.ACTIVE;
559 }
560
561 @Override
562 public Timeout getSocketTimeout() {
563 return ioSession.getSocketTimeout();
564 }
565
566 @Override
567 public void setSocketTimeout(final Timeout timeout) {
568 ioSession.setSocketTimeout(timeout);
569 }
570
571 @Override
572 public EndpointDetails getEndpointDetails() {
573 if (endpointDetails == null) {
574 endpointDetails = new BasicEndpointDetails(
575 ioSession.getRemoteAddress(),
576 ioSession.getLocalAddress(),
577 connMetrics,
578 ioSession.getSocketTimeout());
579 }
580 return endpointDetails;
581 }
582
583 @Override
584 public ProtocolVersion getProtocolVersion() {
585 return version;
586 }
587
588 @Override
589 public SocketAddress getRemoteAddress() {
590 return ioSession.getRemoteAddress();
591 }
592
593 @Override
594 public SocketAddress getLocalAddress() {
595 return ioSession.getLocalAddress();
596 }
597
598 @Override
599 public SSLSession getSSLSession() {
600 final TlsDetails tlsDetails = ioSession.getTlsDetails();
601 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
602 }
603
604 void appendState(final StringBuilder buf) {
605 buf.append("connState=").append(connState)
606 .append(", inbuf=").append(inbuf)
607 .append(", outbuf=").append(outbuf)
608 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
609 }
610
611 static class CapacityWindow implements CapacityChannel {
612 private final IOSession ioSession;
613 private final Object lock;
614 private int window;
615 private boolean closed;
616
617 CapacityWindow(final int window, final IOSession ioSession) {
618 this.window = window;
619 this.ioSession = ioSession;
620 this.lock = new Object();
621 }
622
623 @Override
624 public void update(final int increment) throws IOException {
625 synchronized (lock) {
626 if (closed) {
627 return;
628 }
629 if (increment > 0) {
630 updateWindow(increment);
631 ioSession.setEvent(SelectionKey.OP_READ);
632 }
633 }
634 }
635
636
637
638
639
640 int removeCapacity(final int delta) {
641 synchronized (lock) {
642 updateWindow(-delta);
643 if (window <= 0) {
644 ioSession.clearEvent(SelectionKey.OP_READ);
645 }
646 return window;
647 }
648 }
649
650 private void updateWindow(final int delta) {
651 int newValue = window + delta;
652
653 if (((window ^ newValue) & (delta ^ newValue)) < 0) {
654 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
655 }
656 window = newValue;
657 }
658
659
660
661
662
663 void close() {
664 synchronized (lock) {
665 closed = true;
666 }
667 }
668
669
670 int getWindow() {
671 return window;
672 }
673 }
674 }