1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.apache.hc.core5.http.impl.nio;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.nio.ByteBuffer;
33 import java.nio.channels.ClosedChannelException;
34 import java.nio.channels.ReadableByteChannel;
35 import java.nio.channels.SelectionKey;
36 import java.nio.channels.WritableByteChannel;
37 import java.util.List;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 import javax.net.ssl.SSLSession;
41
42 import org.apache.hc.core5.http.ConnectionClosedException;
43 import org.apache.hc.core5.http.ContentLengthStrategy;
44 import org.apache.hc.core5.http.EndpointDetails;
45 import org.apache.hc.core5.http.EntityDetails;
46 import org.apache.hc.core5.http.Header;
47 import org.apache.hc.core5.http.HttpConnection;
48 import org.apache.hc.core5.http.HttpException;
49 import org.apache.hc.core5.http.HttpMessage;
50 import org.apache.hc.core5.http.Message;
51 import org.apache.hc.core5.http.ProtocolVersion;
52 import org.apache.hc.core5.http.config.CharCodingConfig;
53 import org.apache.hc.core5.http.config.Http1Config;
54 import org.apache.hc.core5.http.impl.BasicEndpointDetails;
55 import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
56 import org.apache.hc.core5.http.impl.BasicHttpTransportMetrics;
57 import org.apache.hc.core5.http.impl.CharCodingSupport;
58 import org.apache.hc.core5.http.impl.DefaultContentLengthStrategy;
59 import org.apache.hc.core5.http.impl.IncomingEntityDetails;
60 import org.apache.hc.core5.http.nio.CapacityChannel;
61 import org.apache.hc.core5.http.nio.ContentDecoder;
62 import org.apache.hc.core5.http.nio.ContentEncoder;
63 import org.apache.hc.core5.http.nio.NHttpMessageParser;
64 import org.apache.hc.core5.http.nio.NHttpMessageWriter;
65 import org.apache.hc.core5.http.nio.SessionInputBuffer;
66 import org.apache.hc.core5.http.nio.SessionOutputBuffer;
67 import org.apache.hc.core5.http.nio.command.CommandSupport;
68 import org.apache.hc.core5.http.nio.command.RequestExecutionCommand;
69 import org.apache.hc.core5.http.nio.command.ShutdownCommand;
70 import org.apache.hc.core5.io.CloseMode;
71 import org.apache.hc.core5.io.SocketTimeoutExceptionFactory;
72 import org.apache.hc.core5.reactor.Command;
73 import org.apache.hc.core5.reactor.EventMask;
74 import org.apache.hc.core5.reactor.IOSession;
75 import org.apache.hc.core5.reactor.ProtocolIOSession;
76 import org.apache.hc.core5.reactor.ssl.TlsDetails;
77 import org.apache.hc.core5.util.Args;
78 import org.apache.hc.core5.util.Identifiable;
79 import org.apache.hc.core5.util.Timeout;
80
81 abstract class AbstractHttp1StreamDuplexer<IncomingMessage extends HttpMessage, OutgoingMessage extends HttpMessage>
82 implements Identifiable, HttpConnection {
83
84 private enum ConnectionState { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
85
86 private final ProtocolIOSession ioSession;
87 private final Http1Config http1Config;
88 private final SessionInputBufferImpl inbuf;
89 private final SessionOutputBufferImpl outbuf;
90 private final BasicHttpTransportMetrics inTransportMetrics;
91 private final BasicHttpTransportMetrics outTransportMetrics;
92 private final BasicHttpConnectionMetrics connMetrics;
93 private final NHttpMessageParser<IncomingMessage> incomingMessageParser;
94 private final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter;
95 private final ContentLengthStrategy incomingContentStrategy;
96 private final ContentLengthStrategy outgoingContentStrategy;
97 private final ByteBuffer contentBuffer;
98 private final AtomicInteger outputRequests;
99
100 private volatile Message<IncomingMessage, ContentDecoder> incomingMessage;
101 private volatile Message<OutgoingMessage, ContentEncoder> outgoingMessage;
102 private volatile ConnectionState connState;
103 private volatile CapacityWindow capacityWindow;
104
105 private volatile ProtocolVersion version;
106 private volatile EndpointDetails endpointDetails;
107
108 AbstractHttp1StreamDuplexer(
109 final ProtocolIOSession ioSession,
110 final Http1Config http1Config,
111 final CharCodingConfig charCodingConfig,
112 final NHttpMessageParser<IncomingMessage> incomingMessageParser,
113 final NHttpMessageWriter<OutgoingMessage> outgoingMessageWriter,
114 final ContentLengthStrategy incomingContentStrategy,
115 final ContentLengthStrategy outgoingContentStrategy) {
116 this.ioSession = Args.notNull(ioSession, "I/O session");
117 this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
118 final int bufferSize = this.http1Config.getBufferSize();
119 this.inbuf = new SessionInputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
120 this.http1Config.getMaxLineLength(),
121 CharCodingSupport.createDecoder(charCodingConfig));
122 this.outbuf = new SessionOutputBufferImpl(bufferSize, bufferSize < 512 ? bufferSize : 512,
123 CharCodingSupport.createEncoder(charCodingConfig));
124 this.inTransportMetrics = new BasicHttpTransportMetrics();
125 this.outTransportMetrics = new BasicHttpTransportMetrics();
126 this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
127 this.incomingMessageParser = incomingMessageParser;
128 this.outgoingMessageWriter = outgoingMessageWriter;
129 this.incomingContentStrategy = incomingContentStrategy != null ? incomingContentStrategy :
130 DefaultContentLengthStrategy.INSTANCE;
131 this.outgoingContentStrategy = outgoingContentStrategy != null ? outgoingContentStrategy :
132 DefaultContentLengthStrategy.INSTANCE;
133 this.contentBuffer = ByteBuffer.allocate(this.http1Config.getBufferSize());
134 this.outputRequests = new AtomicInteger(0);
135 this.connState = ConnectionState.READY;
136 }
137
138 @Override
139 public String getId() {
140 return ioSession.getId();
141 }
142
143 void shutdownSession(final CloseMode closeMode) {
144 if (closeMode == CloseMode.GRACEFUL) {
145 connState = ConnectionState.GRACEFUL_SHUTDOWN;
146 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
147 } else {
148 connState = ConnectionState.SHUTDOWN;
149 ioSession.close();
150 }
151 }
152
153 void shutdownSession(final Exception cause) {
154 connState = ConnectionState.SHUTDOWN;
155 try {
156 terminate(cause);
157 } finally {
158 final CloseMode closeMode;
159 if (cause instanceof ConnectionClosedException) {
160 closeMode = CloseMode.GRACEFUL;
161 } else if (cause instanceof IOException) {
162 closeMode = CloseMode.IMMEDIATE;
163 } else {
164 closeMode = CloseMode.GRACEFUL;
165 }
166 ioSession.close(closeMode);
167 }
168 }
169
170 abstract void disconnected();
171
172 abstract void terminate(final Exception exception);
173
174 abstract void updateInputMetrics(IncomingMessage incomingMessage, BasicHttpConnectionMetrics connMetrics);
175
176 abstract void updateOutputMetrics(OutgoingMessage outgoingMessage, BasicHttpConnectionMetrics connMetrics);
177
178 abstract void consumeHeader(IncomingMessage messageHead, EntityDetails entityDetails) throws HttpException, IOException;
179
180 abstract boolean handleIncomingMessage(IncomingMessage incomingMessage) throws HttpException;
181
182 abstract boolean handleOutgoingMessage(OutgoingMessage outgoingMessage) throws HttpException;
183
184 abstract ContentDecoder createContentDecoder(
185 long contentLength,
186 ReadableByteChannel channel,
187 SessionInputBuffer buffer,
188 BasicHttpTransportMetrics metrics) throws HttpException;
189
190 abstract ContentEncoder createContentEncoder(
191 long contentLength,
192 WritableByteChannel channel,
193 SessionOutputBuffer buffer,
194 BasicHttpTransportMetrics metrics) throws HttpException;
195
196 abstract void consumeData(ByteBuffer src) throws HttpException, IOException;
197
198 abstract void updateCapacity(CapacityChannel capacityChannel) throws HttpException, IOException;
199
200 abstract void dataEnd(List<? extends Header> trailers) throws HttpException, IOException;
201
202 abstract boolean isOutputReady();
203
204 abstract void produceOutput() throws HttpException, IOException;
205
206 abstract void execute(RequestExecutionCommand executionCommand) throws HttpException, IOException;
207
208 abstract void inputEnd() throws HttpException, IOException;
209
210 abstract void outputEnd() throws HttpException, IOException;
211
212 abstract boolean inputIdle();
213
214 abstract boolean outputIdle();
215
216 abstract boolean handleTimeout();
217
218 private void processCommands() throws HttpException, IOException {
219 for (;;) {
220 final Command command = ioSession.poll();
221 if (command == null) {
222 return;
223 }
224 if (command instanceof ShutdownCommand) {
225 final ShutdownCommand../org/apache/hc/core5/http/nio/command/ShutdownCommand.html#ShutdownCommand">ShutdownCommand shutdownCommand = (ShutdownCommand) command;
226 requestShutdown(shutdownCommand.getType());
227 } else if (command instanceof RequestExecutionCommand) {
228 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0) {
229 command.cancel();
230 } else {
231 execute((RequestExecutionCommand) command);
232 return;
233 }
234 } else {
235 throw new HttpException("Unexpected command: " + command.getClass());
236 }
237 }
238 }
239
240 public final void onConnect() throws HttpException, IOException {
241 if (connState == ConnectionState.READY) {
242 connState = ConnectionState.ACTIVE;
243 processCommands();
244 }
245 }
246
247 IncomingMessage parseMessageHead(final boolean endOfStream) throws IOException, HttpException {
248 final IncomingMessage messageHead = incomingMessageParser.parse(inbuf, endOfStream);
249 if (messageHead != null) {
250 incomingMessageParser.reset();
251 }
252 return messageHead;
253 }
254
255 public final void onInput(final ByteBuffer src) throws HttpException, IOException {
256 if (src != null) {
257 inbuf.put(src);
258 }
259
260 if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inbuf.hasData() && inputIdle()) {
261 ioSession.clearEvent(SelectionKey.OP_READ);
262 return;
263 }
264
265 boolean endOfStream = false;
266 if (incomingMessage == null) {
267 final int bytesRead = inbuf.fill(ioSession);
268 if (bytesRead > 0) {
269 inTransportMetrics.incrementBytesTransferred(bytesRead);
270 }
271 endOfStream = bytesRead == -1;
272 }
273
274 do {
275 if (incomingMessage == null) {
276
277 final IncomingMessage messageHead = parseMessageHead(endOfStream);
278 if (messageHead != null) {
279 this.version = messageHead.getVersion();
280
281 updateInputMetrics(messageHead, connMetrics);
282 final ContentDecoder contentDecoder;
283 if (handleIncomingMessage(messageHead)) {
284 final long len = incomingContentStrategy.determineLength(messageHead);
285 contentDecoder = createContentDecoder(len, ioSession, inbuf, inTransportMetrics);
286 consumeHeader(messageHead, contentDecoder != null ? new IncomingEntityDetails(messageHead, len) : null);
287 } else {
288 consumeHeader(messageHead, null);
289 contentDecoder = null;
290 }
291 capacityWindow = new CapacityWindow(http1Config.getInitialWindowSize(), ioSession);
292 if (contentDecoder != null) {
293 incomingMessage = new Message<>(messageHead, contentDecoder);
294 } else {
295 inputEnd();
296 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
297 ioSession.setEvent(SelectionKey.OP_READ);
298 }
299 }
300 } else {
301 break;
302 }
303 }
304
305 if (incomingMessage != null) {
306 final ContentDecoder contentDecoder = incomingMessage.getBody();
307
308
309
310
311
312 final int bytesRead = contentDecoder.read(contentBuffer);
313 if (bytesRead > 0) {
314 contentBuffer.flip();
315 consumeData(contentBuffer);
316 contentBuffer.clear();
317 final int capacity = capacityWindow.removeCapacity(bytesRead);
318 if (capacity <= 0) {
319 if (!contentDecoder.isCompleted()) {
320 updateCapacity(capacityWindow);
321 }
322 }
323 }
324 if (contentDecoder.isCompleted()) {
325 dataEnd(contentDecoder.getTrailers());
326 capacityWindow.close();
327 incomingMessage = null;
328 ioSession.setEvent(SelectionKey.OP_READ);
329 inputEnd();
330 }
331 if (bytesRead == 0) {
332 break;
333 }
334 }
335 } while (inbuf.hasData());
336
337 if (endOfStream && !inbuf.hasData()) {
338 if (outputIdle() && inputIdle()) {
339 requestShutdown(CloseMode.GRACEFUL);
340 } else {
341 shutdownSession(new ConnectionClosedException("Connection closed by peer"));
342 }
343 }
344 }
345
346 public final void onOutput() throws IOException, HttpException {
347 ioSession.getLock().lock();
348 try {
349 if (outbuf.hasData()) {
350 final int bytesWritten = outbuf.flush(ioSession);
351 if (bytesWritten > 0) {
352 outTransportMetrics.incrementBytesTransferred(bytesWritten);
353 }
354 }
355 } finally {
356 ioSession.getLock().unlock();
357 }
358 if (connState.compareTo(ConnectionState.SHUTDOWN) < 0) {
359 final int pendingOutputRequests = outputRequests.get();
360 produceOutput();
361 final boolean outputPending = isOutputReady();
362 final boolean outputEnd;
363 ioSession.getLock().lock();
364 try {
365 if (!outputPending && !outbuf.hasData() && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
366 ioSession.clearEvent(SelectionKey.OP_WRITE);
367 } else {
368 outputRequests.addAndGet(-pendingOutputRequests);
369 }
370 outputEnd = outgoingMessage == null && !outbuf.hasData();
371 } finally {
372 ioSession.getLock().unlock();
373 }
374 if (outputEnd) {
375 outputEnd();
376 if (connState.compareTo(ConnectionState.ACTIVE) == 0) {
377 processCommands();
378 } else if (connState.compareTo(ConnectionState.GRACEFUL_SHUTDOWN) >= 0 && inputIdle() && outputIdle()) {
379 connState = ConnectionState.SHUTDOWN;
380 }
381 }
382 }
383 if (connState.compareTo(ConnectionState.SHUTDOWN) >= 0) {
384 ioSession.close();
385 }
386 }
387
388 public final void onTimeout(final Timeout timeout) throws IOException, HttpException {
389 if (!handleTimeout()) {
390 onException(SocketTimeoutExceptionFactory.create(timeout));
391 }
392 }
393
394 public final void onException(final Exception ex) {
395 shutdownSession(ex);
396 CommandSupport.failCommands(ioSession, ex);
397 }
398
399 public final void onDisconnect() {
400 disconnected();
401 CommandSupport.cancelCommands(ioSession);
402 }
403
404 void requestShutdown(final CloseMode closeMode) {
405 switch (closeMode) {
406 case GRACEFUL:
407 if (connState == ConnectionState.ACTIVE) {
408 connState = ConnectionState.GRACEFUL_SHUTDOWN;
409 }
410 break;
411 case IMMEDIATE:
412 connState = ConnectionState.SHUTDOWN;
413 break;
414 }
415 ioSession.setEvent(SelectionKey.OP_WRITE);
416 }
417
418 void commitMessageHead(
419 final OutgoingMessage messageHead,
420 final boolean endStream,
421 final FlushMode flushMode) throws HttpException, IOException {
422 ioSession.getLock().lock();
423 try {
424 outgoingMessageWriter.write(messageHead, outbuf);
425 updateOutputMetrics(messageHead, connMetrics);
426 if (!endStream) {
427 final ContentEncoder contentEncoder;
428 if (handleOutgoingMessage(messageHead)) {
429 final long len = outgoingContentStrategy.determineLength(messageHead);
430 contentEncoder = createContentEncoder(len, ioSession, outbuf, outTransportMetrics);
431 } else {
432 contentEncoder = null;
433 }
434 if (contentEncoder != null) {
435 outgoingMessage = new Message<>(messageHead, contentEncoder);
436 }
437 }
438 outgoingMessageWriter.reset();
439 if (flushMode == FlushMode.IMMEDIATE) {
440 outbuf.flush(ioSession);
441 }
442 ioSession.setEvent(EventMask.WRITE);
443 } finally {
444 ioSession.getLock().unlock();
445 }
446 }
447
448 void requestSessionInput() {
449 ioSession.setEvent(SelectionKey.OP_READ);
450 }
451
452 void requestSessionOutput() {
453 outputRequests.incrementAndGet();
454 ioSession.setEvent(SelectionKey.OP_WRITE);
455 }
456
457 Timeout getSessionTimeout() {
458 return ioSession.getSocketTimeout();
459 }
460
461 void setSessionTimeout(final Timeout timeout) {
462 ioSession.setSocketTimeout(timeout);
463 }
464
465 void suspendSessionInput() {
466 ioSession.clearEvent(SelectionKey.OP_READ);
467 }
468
469 void suspendSessionOutput() throws IOException {
470 ioSession.getLock().lock();
471 try {
472 if (outbuf.hasData()) {
473 outbuf.flush(ioSession);
474 } else {
475 ioSession.clearEvent(SelectionKey.OP_WRITE);
476 }
477 } finally {
478 ioSession.getLock().unlock();
479 }
480 }
481
482 int streamOutput(final ByteBuffer src) throws IOException {
483 ioSession.getLock().lock();
484 try {
485 if (outgoingMessage == null) {
486 throw new ClosedChannelException();
487 }
488 final ContentEncoder contentEncoder = outgoingMessage.getBody();
489 final int bytesWritten = contentEncoder.write(src);
490 if (bytesWritten > 0) {
491 ioSession.setEvent(SelectionKey.OP_WRITE);
492 }
493 return bytesWritten;
494 } finally {
495 ioSession.getLock().unlock();
496 }
497 }
498
499 enum MessageDelineation { NONE, CHUNK_CODED, MESSAGE_HEAD}
500
501 MessageDelineation endOutputStream(final List<? extends Header> trailers) throws IOException {
502 ioSession.getLock().lock();
503 try {
504 if (outgoingMessage == null) {
505 return MessageDelineation.NONE;
506 }
507 final ContentEncoder contentEncoder = outgoingMessage.getBody();
508 contentEncoder.complete(trailers);
509 ioSession.setEvent(SelectionKey.OP_WRITE);
510 outgoingMessage = null;
511 return contentEncoder instanceof ChunkEncoder
512 ? MessageDelineation.CHUNK_CODED
513 : MessageDelineation.MESSAGE_HEAD;
514 } finally {
515 ioSession.getLock().unlock();
516 }
517 }
518
519 boolean isOutputCompleted() {
520 ioSession.getLock().lock();
521 try {
522 if (outgoingMessage == null) {
523 return true;
524 }
525 final ContentEncoder contentEncoder = outgoingMessage.getBody();
526 return contentEncoder.isCompleted();
527 } finally {
528 ioSession.getLock().unlock();
529 }
530 }
531
532 @Override
533 public void close() throws IOException {
534 ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.NORMAL);
535 }
536
537 @Override
538 public void close(final CloseMode closeMode) {
539 ioSession.enqueue(new ShutdownCommand(closeMode), Command.Priority.IMMEDIATE);
540 }
541
542 @Override
543 public boolean isOpen() {
544 return connState == ConnectionState.ACTIVE;
545 }
546
547 @Override
548 public Timeout getSocketTimeout() {
549 return ioSession.getSocketTimeout();
550 }
551
552 @Override
553 public void setSocketTimeout(final Timeout timeout) {
554 ioSession.setSocketTimeout(timeout);
555 }
556
557 @Override
558 public EndpointDetails getEndpointDetails() {
559 if (endpointDetails == null) {
560 endpointDetails = new BasicEndpointDetails(
561 ioSession.getRemoteAddress(),
562 ioSession.getLocalAddress(),
563 connMetrics,
564 ioSession.getSocketTimeout());
565 }
566 return endpointDetails;
567 }
568
569 @Override
570 public ProtocolVersion getProtocolVersion() {
571 return version;
572 }
573
574 @Override
575 public SocketAddress getRemoteAddress() {
576 return ioSession.getRemoteAddress();
577 }
578
579 @Override
580 public SocketAddress getLocalAddress() {
581 return ioSession.getLocalAddress();
582 }
583
584 @Override
585 public SSLSession getSSLSession() {
586 final TlsDetails tlsDetails = ioSession.getTlsDetails();
587 return tlsDetails != null ? tlsDetails.getSSLSession() : null;
588 }
589
590 void appendState(final StringBuilder buf) {
591 buf.append("connState=").append(connState)
592 .append(", inbuf=").append(inbuf)
593 .append(", outbuf=").append(outbuf)
594 .append(", inputWindow=").append(capacityWindow != null ? capacityWindow.getWindow() : 0);
595 }
596
597 static class CapacityWindow implements CapacityChannel {
598 private final IOSession ioSession;
599 private final Object lock;
600 private int window;
601 private boolean closed;
602
603 CapacityWindow(final int window, final IOSession ioSession) {
604 this.window = window;
605 this.ioSession = ioSession;
606 this.lock = new Object();
607 }
608
609 @Override
610 public void update(final int increment) throws IOException {
611 synchronized (lock) {
612 if (closed) {
613 return;
614 }
615 if (increment > 0) {
616 updateWindow(increment);
617 ioSession.setEvent(SelectionKey.OP_READ);
618 }
619 }
620 }
621
622
623
624
625
626 int removeCapacity(final int delta) {
627 synchronized (lock) {
628 updateWindow(-delta);
629 if (window <= 0) {
630 ioSession.clearEvent(SelectionKey.OP_READ);
631 }
632 return window;
633 }
634 }
635
636 private void updateWindow(final int delta) {
637 int newValue = window + delta;
638
639 if (((window ^ newValue) & (delta ^ newValue)) < 0) {
640 newValue = delta < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
641 }
642 window = newValue;
643 }
644
645
646
647
648
649 void close() {
650 synchronized (lock) {
651 closed = true;
652 }
653 }
654
655
656 int getWindow() {
657 return window;
658 }
659 }
660 }