001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.session; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.IOException; 025import java.net.SocketAddress; 026import java.nio.channels.FileChannel; 027import java.util.Iterator; 028import java.util.Queue; 029import java.util.Set; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.mina.core.buffer.IoBuffer; 036import org.apache.mina.core.file.DefaultFileRegion; 037import org.apache.mina.core.file.FilenameFileRegion; 038import org.apache.mina.core.filterchain.IoFilterChain; 039import org.apache.mina.core.future.CloseFuture; 040import org.apache.mina.core.future.DefaultCloseFuture; 041import org.apache.mina.core.future.DefaultReadFuture; 042import org.apache.mina.core.future.DefaultWriteFuture; 043import org.apache.mina.core.future.IoFutureListener; 044import org.apache.mina.core.future.ReadFuture; 045import org.apache.mina.core.future.WriteFuture; 046import org.apache.mina.core.service.AbstractIoService; 047import org.apache.mina.core.service.IoAcceptor; 048import org.apache.mina.core.service.IoHandler; 049import org.apache.mina.core.service.IoProcessor; 050import org.apache.mina.core.service.IoService; 051import org.apache.mina.core.service.TransportMetadata; 052import org.apache.mina.core.write.DefaultWriteRequest; 053import org.apache.mina.core.write.WriteException; 054import org.apache.mina.core.write.WriteRequest; 055import org.apache.mina.core.write.WriteRequestQueue; 056import org.apache.mina.core.write.WriteTimeoutException; 057import org.apache.mina.core.write.WriteToClosedSessionException; 058import org.apache.mina.util.ExceptionMonitor; 059 060/** 061 * Base implementation of {@link IoSession}. 062 * 063 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 064 */ 065public abstract class AbstractIoSession implements IoSession { 066 /** The associated handler */ 067 private final IoHandler handler; 068 069 /** The session config */ 070 protected IoSessionConfig config; 071 072 /** The service which will manage this session */ 073 private final IoService service; 074 075 private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, 076 "readyReadFutures"); 077 078 private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, 079 "waitingReadFutures"); 080 081 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() { 082 public void operationComplete(CloseFuture future) { 083 AbstractIoSession session = (AbstractIoSession) future.getSession(); 084 session.scheduledWriteBytes.set(0); 085 session.scheduledWriteMessages.set(0); 086 session.readBytesThroughput = 0; 087 session.readMessagesThroughput = 0; 088 session.writtenBytesThroughput = 0; 089 session.writtenMessagesThroughput = 0; 090 } 091 }; 092 093 /** 094 * An internal write request object that triggers session close. 095 * 096 * @see #writeRequestQueue 097 */ 098 private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object()); 099 100 private final Object lock = new Object(); 101 102 private IoSessionAttributeMap attributes; 103 104 private WriteRequestQueue writeRequestQueue; 105 106 private WriteRequest currentWriteRequest; 107 108 /** The Session creation's time */ 109 private final long creationTime; 110 111 /** An id generator guaranteed to generate unique IDs for the session */ 112 private static AtomicLong idGenerator = new AtomicLong(0); 113 114 /** The session ID */ 115 private long sessionId; 116 117 /** 118 * A future that will be set 'closed' when the connection is closed. 119 */ 120 private final CloseFuture closeFuture = new DefaultCloseFuture(this); 121 122 private volatile boolean closing; 123 124 // traffic control 125 private boolean readSuspended = false; 126 127 private boolean writeSuspended = false; 128 129 // Status variables 130 private final AtomicBoolean scheduledForFlush = new AtomicBoolean(); 131 132 private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); 133 134 private final AtomicInteger scheduledWriteMessages = new AtomicInteger(); 135 136 private long readBytes; 137 138 private long writtenBytes; 139 140 private long readMessages; 141 142 private long writtenMessages; 143 144 private long lastReadTime; 145 146 private long lastWriteTime; 147 148 private long lastThroughputCalculationTime; 149 150 private long lastReadBytes; 151 152 private long lastWrittenBytes; 153 154 private long lastReadMessages; 155 156 private long lastWrittenMessages; 157 158 private double readBytesThroughput; 159 160 private double writtenBytesThroughput; 161 162 private double readMessagesThroughput; 163 164 private double writtenMessagesThroughput; 165 166 private AtomicInteger idleCountForBoth = new AtomicInteger(); 167 168 private AtomicInteger idleCountForRead = new AtomicInteger(); 169 170 private AtomicInteger idleCountForWrite = new AtomicInteger(); 171 172 private long lastIdleTimeForBoth; 173 174 private long lastIdleTimeForRead; 175 176 private long lastIdleTimeForWrite; 177 178 private boolean deferDecreaseReadBuffer = true; 179 180 /** 181 * Create a Session for a service 182 * 183 * @param service the Service for this session 184 */ 185 protected AbstractIoSession(IoService service) { 186 this.service = service; 187 this.handler = service.getHandler(); 188 189 // Initialize all the Session counters to the current time 190 long currentTime = System.currentTimeMillis(); 191 creationTime = currentTime; 192 lastThroughputCalculationTime = currentTime; 193 lastReadTime = currentTime; 194 lastWriteTime = currentTime; 195 lastIdleTimeForBoth = currentTime; 196 lastIdleTimeForRead = currentTime; 197 lastIdleTimeForWrite = currentTime; 198 199 // TODO add documentation 200 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER); 201 202 // Set a new ID for this session 203 sessionId = idGenerator.incrementAndGet(); 204 } 205 206 /** 207 * {@inheritDoc} 208 * 209 * We use an AtomicLong to guarantee that the session ID are unique. 210 */ 211 public final long getId() { 212 return sessionId; 213 } 214 215 /** 216 * @return The associated IoProcessor for this session 217 */ 218 public abstract IoProcessor getProcessor(); 219 220 /** 221 * {@inheritDoc} 222 */ 223 public final boolean isConnected() { 224 return !closeFuture.isClosed(); 225 } 226 227 /** 228 * {@inheritDoc} 229 */ 230 public boolean isActive() { 231 // Return true by default 232 return true; 233 } 234 235 /** 236 * {@inheritDoc} 237 */ 238 public final boolean isClosing() { 239 return closing || closeFuture.isClosed(); 240 } 241 242 /** 243 * {@inheritDoc} 244 */ 245 public boolean isSecured() { 246 // Always false... 247 return false; 248 } 249 250 /** 251 * {@inheritDoc} 252 */ 253 public final CloseFuture getCloseFuture() { 254 return closeFuture; 255 } 256 257 /** 258 * Tells if the session is scheduled for flushed 259 * 260 * @return true if the session is scheduled for flush 261 */ 262 public final boolean isScheduledForFlush() { 263 return scheduledForFlush.get(); 264 } 265 266 /** 267 * Schedule the session for flushed 268 */ 269 public final void scheduledForFlush() { 270 scheduledForFlush.set(true); 271 } 272 273 /** 274 * Change the session's status : it's not anymore scheduled for flush 275 */ 276 public final void unscheduledForFlush() { 277 scheduledForFlush.set(false); 278 } 279 280 /** 281 * Set the scheduledForFLush flag. As we may have concurrent access to this 282 * flag, we compare and set it in one call. 283 * 284 * @param schedule 285 * the new value to set if not already set. 286 * @return true if the session flag has been set, and if it wasn't set 287 * already. 288 */ 289 public final boolean setScheduledForFlush(boolean schedule) { 290 if (schedule) { 291 // If the current tag is set to false, switch it to true, 292 // otherwise, we do nothing but return false : the session 293 // is already scheduled for flush 294 return scheduledForFlush.compareAndSet(false, schedule); 295 } 296 297 scheduledForFlush.set(schedule); 298 return true; 299 } 300 301 /** 302 * {@inheritDoc} 303 */ 304 public final CloseFuture close(boolean rightNow) { 305 if (rightNow) { 306 return closeNow(); 307 } else { 308 return closeOnFlush(); 309 } 310 } 311 312 /** 313 * {@inheritDoc} 314 */ 315 public final CloseFuture close() { 316 try { 317 closeNow(); 318 } finally { 319 return closeFuture; 320 } 321 } 322 323 /** 324 * {@inheritDoc} 325 */ 326 public final CloseFuture closeOnFlush() { 327 if (!isClosing()) { 328 getWriteRequestQueue().offer(this, CLOSE_REQUEST); 329 getProcessor().flush(this); 330 return closeFuture; 331 } else { 332 return closeFuture; 333 } 334 } 335 336 /** 337 * {@inheritDoc} 338 */ 339 public final CloseFuture closeNow() { 340 synchronized (lock) { 341 if (isClosing()) { 342 return closeFuture; 343 } 344 345 closing = true; 346 } 347 348 getFilterChain().fireFilterClose(); 349 350 return closeFuture; 351 } 352 353 /** 354 * {@inheritDoc} 355 */ 356 public IoHandler getHandler() { 357 return handler; 358 } 359 360 /** 361 * {@inheritDoc} 362 */ 363 public IoSessionConfig getConfig() { 364 return config; 365 } 366 367 /** 368 * {@inheritDoc} 369 */ 370 public final ReadFuture read() { 371 if (!getConfig().isUseReadOperation()) { 372 throw new IllegalStateException("useReadOperation is not enabled."); 373 } 374 375 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); 376 ReadFuture future; 377 synchronized (readyReadFutures) { 378 future = readyReadFutures.poll(); 379 if (future != null) { 380 if (future.isClosed()) { 381 // Let other readers get notified. 382 readyReadFutures.offer(future); 383 } 384 } else { 385 future = new DefaultReadFuture(this); 386 getWaitingReadFutures().offer(future); 387 } 388 } 389 390 return future; 391 } 392 393 /** 394 * Associates a message to a ReadFuture 395 * 396 * @param message the message to associate to the ReadFuture 397 * 398 */ 399 public final void offerReadFuture(Object message) { 400 newReadFuture().setRead(message); 401 } 402 403 /** 404 * Associates a failure to a ReadFuture 405 * 406 * @param exception the exception to associate to the ReadFuture 407 */ 408 public final void offerFailedReadFuture(Throwable exception) { 409 newReadFuture().setException(exception); 410 } 411 412 /** 413 * Inform the ReadFuture that the session has been closed 414 */ 415 public final void offerClosedReadFuture() { 416 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); 417 418 synchronized (readyReadFutures) { 419 newReadFuture().setClosed(); 420 } 421 } 422 423 /** 424 * @return a readFuture get from the waiting ReadFuture 425 */ 426 private ReadFuture newReadFuture() { 427 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); 428 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures(); 429 ReadFuture future; 430 431 synchronized (readyReadFutures) { 432 future = waitingReadFutures.poll(); 433 434 if (future == null) { 435 future = new DefaultReadFuture(this); 436 readyReadFutures.offer(future); 437 } 438 } 439 440 return future; 441 } 442 443 /** 444 * @return a queue of ReadFuture 445 */ 446 private Queue<ReadFuture> getReadyReadFutures() { 447 Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY); 448 449 if (readyReadFutures == null) { 450 readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>(); 451 452 Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, 453 readyReadFutures); 454 455 if (oldReadyReadFutures != null) { 456 readyReadFutures = oldReadyReadFutures; 457 } 458 } 459 460 return readyReadFutures; 461 } 462 463 /** 464 * @return the queue of waiting ReadFuture 465 */ 466 private Queue<ReadFuture> getWaitingReadFutures() { 467 Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY); 468 469 if (waitingReadyReadFutures == null) { 470 waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>(); 471 472 Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent( 473 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures); 474 475 if (oldWaitingReadyReadFutures != null) { 476 waitingReadyReadFutures = oldWaitingReadyReadFutures; 477 } 478 } 479 480 return waitingReadyReadFutures; 481 } 482 483 /** 484 * {@inheritDoc} 485 */ 486 public WriteFuture write(Object message) { 487 return write(message, null); 488 } 489 490 /** 491 * {@inheritDoc} 492 */ 493 public WriteFuture write(Object message, SocketAddress remoteAddress) { 494 if (message == null) { 495 throw new IllegalArgumentException("Trying to write a null message : not allowed"); 496 } 497 498 // We can't send a message to a connected session if we don't have 499 // the remote address 500 if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) { 501 throw new UnsupportedOperationException(); 502 } 503 504 // If the session has been closed or is closing, we can't either 505 // send a message to the remote side. We generate a future 506 // containing an exception. 507 if (isClosing() || !isConnected()) { 508 WriteFuture future = new DefaultWriteFuture(this); 509 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); 510 WriteException writeException = new WriteToClosedSessionException(request); 511 future.setException(writeException); 512 return future; 513 } 514 515 FileChannel openedFileChannel = null; 516 517 // TODO: remove this code as soon as we use InputStream 518 // instead of Object for the message. 519 try { 520 if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) { 521 // Nothing to write : probably an error in the user code 522 throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); 523 } else if (message instanceof FileChannel) { 524 FileChannel fileChannel = (FileChannel) message; 525 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); 526 } else if (message instanceof File) { 527 File file = (File) message; 528 openedFileChannel = new FileInputStream(file).getChannel(); 529 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size()); 530 } 531 } catch (IOException e) { 532 ExceptionMonitor.getInstance().exceptionCaught(e); 533 return DefaultWriteFuture.newNotWrittenFuture(this, e); 534 } 535 536 // Now, we can write the message. First, create a future 537 WriteFuture writeFuture = new DefaultWriteFuture(this); 538 WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); 539 540 // Then, get the chain and inject the WriteRequest into it 541 IoFilterChain filterChain = getFilterChain(); 542 filterChain.fireFilterWrite(writeRequest); 543 544 // TODO : This is not our business ! The caller has created a 545 // FileChannel, 546 // he has to close it ! 547 if (openedFileChannel != null) { 548 // If we opened a FileChannel, it needs to be closed when the write 549 // has completed 550 final FileChannel finalChannel = openedFileChannel; 551 writeFuture.addListener(new IoFutureListener<WriteFuture>() { 552 public void operationComplete(WriteFuture future) { 553 try { 554 finalChannel.close(); 555 } catch (IOException e) { 556 ExceptionMonitor.getInstance().exceptionCaught(e); 557 } 558 } 559 }); 560 } 561 562 // Return the WriteFuture. 563 return writeFuture; 564 } 565 566 /** 567 * {@inheritDoc} 568 */ 569 public final Object getAttachment() { 570 return getAttribute(""); 571 } 572 573 /** 574 * {@inheritDoc} 575 */ 576 public final Object setAttachment(Object attachment) { 577 return setAttribute("", attachment); 578 } 579 580 /** 581 * {@inheritDoc} 582 */ 583 public final Object getAttribute(Object key) { 584 return getAttribute(key, null); 585 } 586 587 /** 588 * {@inheritDoc} 589 */ 590 public final Object getAttribute(Object key, Object defaultValue) { 591 return attributes.getAttribute(this, key, defaultValue); 592 } 593 594 /** 595 * {@inheritDoc} 596 */ 597 public final Object setAttribute(Object key, Object value) { 598 return attributes.setAttribute(this, key, value); 599 } 600 601 /** 602 * {@inheritDoc} 603 */ 604 public final Object setAttribute(Object key) { 605 return setAttribute(key, Boolean.TRUE); 606 } 607 608 /** 609 * {@inheritDoc} 610 */ 611 public final Object setAttributeIfAbsent(Object key, Object value) { 612 return attributes.setAttributeIfAbsent(this, key, value); 613 } 614 615 /** 616 * {@inheritDoc} 617 */ 618 public final Object setAttributeIfAbsent(Object key) { 619 return setAttributeIfAbsent(key, Boolean.TRUE); 620 } 621 622 /** 623 * {@inheritDoc} 624 */ 625 public final Object removeAttribute(Object key) { 626 return attributes.removeAttribute(this, key); 627 } 628 629 /** 630 * {@inheritDoc} 631 */ 632 public final boolean removeAttribute(Object key, Object value) { 633 return attributes.removeAttribute(this, key, value); 634 } 635 636 /** 637 * {@inheritDoc} 638 */ 639 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) { 640 return attributes.replaceAttribute(this, key, oldValue, newValue); 641 } 642 643 /** 644 * {@inheritDoc} 645 */ 646 public final boolean containsAttribute(Object key) { 647 return attributes.containsAttribute(this, key); 648 } 649 650 /** 651 * {@inheritDoc} 652 */ 653 public final Set<Object> getAttributeKeys() { 654 return attributes.getAttributeKeys(this); 655 } 656 657 /** 658 * @return The map of attributes associated with the session 659 */ 660 public final IoSessionAttributeMap getAttributeMap() { 661 return attributes; 662 } 663 664 /** 665 * Set the map of attributes associated with the session 666 * 667 * @param attributes The Map of attributes 668 */ 669 public final void setAttributeMap(IoSessionAttributeMap attributes) { 670 this.attributes = attributes; 671 } 672 673 /** 674 * Create a new close aware write queue, based on the given write queue. 675 * 676 * @param writeRequestQueue The write request queue 677 */ 678 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) { 679 this.writeRequestQueue = new CloseAwareWriteQueue(writeRequestQueue); 680 } 681 682 /** 683 * {@inheritDoc} 684 */ 685 public final void suspendRead() { 686 readSuspended = true; 687 if (isClosing() || !isConnected()) { 688 return; 689 } 690 getProcessor().updateTrafficControl(this); 691 } 692 693 /** 694 * {@inheritDoc} 695 */ 696 public final void suspendWrite() { 697 writeSuspended = true; 698 if (isClosing() || !isConnected()) { 699 return; 700 } 701 getProcessor().updateTrafficControl(this); 702 } 703 704 /** 705 * {@inheritDoc} 706 */ 707 @SuppressWarnings("unchecked") 708 public final void resumeRead() { 709 readSuspended = false; 710 if (isClosing() || !isConnected()) { 711 return; 712 } 713 getProcessor().updateTrafficControl(this); 714 } 715 716 /** 717 * {@inheritDoc} 718 */ 719 @SuppressWarnings("unchecked") 720 public final void resumeWrite() { 721 writeSuspended = false; 722 if (isClosing() || !isConnected()) { 723 return; 724 } 725 getProcessor().updateTrafficControl(this); 726 } 727 728 /** 729 * {@inheritDoc} 730 */ 731 public boolean isReadSuspended() { 732 return readSuspended; 733 } 734 735 /** 736 * {@inheritDoc} 737 */ 738 public boolean isWriteSuspended() { 739 return writeSuspended; 740 } 741 742 /** 743 * {@inheritDoc} 744 */ 745 public final long getReadBytes() { 746 return readBytes; 747 } 748 749 /** 750 * {@inheritDoc} 751 */ 752 public final long getWrittenBytes() { 753 return writtenBytes; 754 } 755 756 /** 757 * {@inheritDoc} 758 */ 759 public final long getReadMessages() { 760 return readMessages; 761 } 762 763 /** 764 * {@inheritDoc} 765 */ 766 public final long getWrittenMessages() { 767 return writtenMessages; 768 } 769 770 /** 771 * {@inheritDoc} 772 */ 773 public final double getReadBytesThroughput() { 774 return readBytesThroughput; 775 } 776 777 /** 778 * {@inheritDoc} 779 */ 780 public final double getWrittenBytesThroughput() { 781 return writtenBytesThroughput; 782 } 783 784 /** 785 * {@inheritDoc} 786 */ 787 public final double getReadMessagesThroughput() { 788 return readMessagesThroughput; 789 } 790 791 /** 792 * {@inheritDoc} 793 */ 794 public final double getWrittenMessagesThroughput() { 795 return writtenMessagesThroughput; 796 } 797 798 /** 799 * {@inheritDoc} 800 */ 801 public final void updateThroughput(long currentTime, boolean force) { 802 int interval = (int) (currentTime - lastThroughputCalculationTime); 803 804 long minInterval = getConfig().getThroughputCalculationIntervalInMillis(); 805 806 if (((minInterval == 0) || (interval < minInterval)) && !force) { 807 return; 808 } 809 810 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval; 811 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval; 812 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval; 813 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval; 814 815 lastReadBytes = readBytes; 816 lastWrittenBytes = writtenBytes; 817 lastReadMessages = readMessages; 818 lastWrittenMessages = writtenMessages; 819 820 lastThroughputCalculationTime = currentTime; 821 } 822 823 /** 824 * {@inheritDoc} 825 */ 826 public final long getScheduledWriteBytes() { 827 return scheduledWriteBytes.get(); 828 } 829 830 /** 831 * {@inheritDoc} 832 */ 833 public final int getScheduledWriteMessages() { 834 return scheduledWriteMessages.get(); 835 } 836 837 /** 838 * Set the number of scheduled write bytes 839 * 840 * @param byteCount The number of scheduled bytes for write 841 */ 842 protected void setScheduledWriteBytes(int byteCount) { 843 scheduledWriteBytes.set(byteCount); 844 } 845 846 /** 847 * Set the number of scheduled write messages 848 * 849 * @param messages The number of scheduled messages for write 850 */ 851 protected void setScheduledWriteMessages(int messages) { 852 scheduledWriteMessages.set(messages); 853 } 854 855 /** 856 * Increase the number of read bytes 857 * 858 * @param increment The number of read bytes 859 * @param currentTime The current time 860 */ 861 public final void increaseReadBytes(long increment, long currentTime) { 862 if (increment <= 0) { 863 return; 864 } 865 866 readBytes += increment; 867 lastReadTime = currentTime; 868 idleCountForBoth.set(0); 869 idleCountForRead.set(0); 870 871 if (getService() instanceof AbstractIoService) { 872 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime); 873 } 874 } 875 876 /** 877 * Increase the number of read messages 878 * 879 * @param currentTime The current time 880 */ 881 public final void increaseReadMessages(long currentTime) { 882 readMessages++; 883 lastReadTime = currentTime; 884 idleCountForBoth.set(0); 885 idleCountForRead.set(0); 886 887 if (getService() instanceof AbstractIoService) { 888 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime); 889 } 890 } 891 892 /** 893 * Increase the number of written bytes 894 * 895 * @param increment The number of written bytes 896 * @param currentTime The current time 897 */ 898 public final void increaseWrittenBytes(int increment, long currentTime) { 899 if (increment <= 0) { 900 return; 901 } 902 903 writtenBytes += increment; 904 lastWriteTime = currentTime; 905 idleCountForBoth.set(0); 906 idleCountForWrite.set(0); 907 908 if (getService() instanceof AbstractIoService) { 909 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime); 910 } 911 912 increaseScheduledWriteBytes(-increment); 913 } 914 915 /** 916 * Increase the number of written messages 917 * 918 * @param request The written message 919 * @param currentTime The current tile 920 */ 921 public final void increaseWrittenMessages(WriteRequest request, long currentTime) { 922 Object message = request.getMessage(); 923 924 if (message instanceof IoBuffer) { 925 IoBuffer b = (IoBuffer) message; 926 927 if (b.hasRemaining()) { 928 return; 929 } 930 } 931 932 writtenMessages++; 933 lastWriteTime = currentTime; 934 935 if (getService() instanceof AbstractIoService) { 936 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime); 937 } 938 939 decreaseScheduledWriteMessages(); 940 } 941 942 /** 943 * Increase the number of scheduled write bytes for the session 944 * 945 * @param increment The number of newly added bytes to write 946 */ 947 public final void increaseScheduledWriteBytes(int increment) { 948 scheduledWriteBytes.addAndGet(increment); 949 if (getService() instanceof AbstractIoService) { 950 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment); 951 } 952 } 953 954 /** 955 * Increase the number of scheduled message to write 956 */ 957 public final void increaseScheduledWriteMessages() { 958 scheduledWriteMessages.incrementAndGet(); 959 960 if (getService() instanceof AbstractIoService) { 961 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages(); 962 } 963 } 964 965 /** 966 * Decrease the number of scheduled message written 967 */ 968 private void decreaseScheduledWriteMessages() { 969 scheduledWriteMessages.decrementAndGet(); 970 if (getService() instanceof AbstractIoService) { 971 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages(); 972 } 973 } 974 975 /** 976 * Decrease the counters of written messages and written bytes when a message has been written 977 * 978 * @param request The written message 979 */ 980 public final void decreaseScheduledBytesAndMessages(WriteRequest request) { 981 Object message = request.getMessage(); 982 983 if (message instanceof IoBuffer) { 984 IoBuffer b = (IoBuffer) message; 985 986 if (b.hasRemaining()) { 987 increaseScheduledWriteBytes(-((IoBuffer) message).remaining()); 988 } else { 989 decreaseScheduledWriteMessages(); 990 } 991 } else { 992 decreaseScheduledWriteMessages(); 993 } 994 } 995 996 /** 997 * {@inheritDoc} 998 */ 999 public final WriteRequestQueue getWriteRequestQueue() { 1000 if (writeRequestQueue == null) { 1001 throw new IllegalStateException(); 1002 } 1003 1004 return writeRequestQueue; 1005 } 1006 1007 /** 1008 * {@inheritDoc} 1009 */ 1010 public final WriteRequest getCurrentWriteRequest() { 1011 return currentWriteRequest; 1012 } 1013 1014 /** 1015 * {@inheritDoc} 1016 */ 1017 public final Object getCurrentWriteMessage() { 1018 WriteRequest req = getCurrentWriteRequest(); 1019 1020 if (req == null) { 1021 return null; 1022 } 1023 return req.getMessage(); 1024 } 1025 1026 /** 1027 * {@inheritDoc} 1028 */ 1029 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) { 1030 this.currentWriteRequest = currentWriteRequest; 1031 } 1032 1033 /** 1034 * Increase the ReadBuffer size (it will double) 1035 */ 1036 public final void increaseReadBufferSize() { 1037 int newReadBufferSize = getConfig().getReadBufferSize() << 1; 1038 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) { 1039 getConfig().setReadBufferSize(newReadBufferSize); 1040 } else { 1041 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize()); 1042 } 1043 1044 deferDecreaseReadBuffer = true; 1045 } 1046 1047 /** 1048 * Decrease the ReadBuffer size (it will be divided by a factor 2) 1049 */ 1050 public final void decreaseReadBufferSize() { 1051 if (deferDecreaseReadBuffer) { 1052 deferDecreaseReadBuffer = false; 1053 return; 1054 } 1055 1056 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) { 1057 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1); 1058 } 1059 1060 deferDecreaseReadBuffer = true; 1061 } 1062 1063 /** 1064 * {@inheritDoc} 1065 */ 1066 public final long getCreationTime() { 1067 return creationTime; 1068 } 1069 1070 /** 1071 * {@inheritDoc} 1072 */ 1073 public final long getLastIoTime() { 1074 return Math.max(lastReadTime, lastWriteTime); 1075 } 1076 1077 /** 1078 * {@inheritDoc} 1079 */ 1080 public final long getLastReadTime() { 1081 return lastReadTime; 1082 } 1083 1084 /** 1085 * {@inheritDoc} 1086 */ 1087 public final long getLastWriteTime() { 1088 return lastWriteTime; 1089 } 1090 1091 /** 1092 * {@inheritDoc} 1093 */ 1094 public final boolean isIdle(IdleStatus status) { 1095 if (status == IdleStatus.BOTH_IDLE) { 1096 return idleCountForBoth.get() > 0; 1097 } 1098 1099 if (status == IdleStatus.READER_IDLE) { 1100 return idleCountForRead.get() > 0; 1101 } 1102 1103 if (status == IdleStatus.WRITER_IDLE) { 1104 return idleCountForWrite.get() > 0; 1105 } 1106 1107 throw new IllegalArgumentException("Unknown idle status: " + status); 1108 } 1109 1110 /** 1111 * {@inheritDoc} 1112 */ 1113 public final boolean isBothIdle() { 1114 return isIdle(IdleStatus.BOTH_IDLE); 1115 } 1116 1117 /** 1118 * {@inheritDoc} 1119 */ 1120 public final boolean isReaderIdle() { 1121 return isIdle(IdleStatus.READER_IDLE); 1122 } 1123 1124 /** 1125 * {@inheritDoc} 1126 */ 1127 public final boolean isWriterIdle() { 1128 return isIdle(IdleStatus.WRITER_IDLE); 1129 } 1130 1131 /** 1132 * {@inheritDoc} 1133 */ 1134 public final int getIdleCount(IdleStatus status) { 1135 if (getConfig().getIdleTime(status) == 0) { 1136 if (status == IdleStatus.BOTH_IDLE) { 1137 idleCountForBoth.set(0); 1138 } 1139 1140 if (status == IdleStatus.READER_IDLE) { 1141 idleCountForRead.set(0); 1142 } 1143 1144 if (status == IdleStatus.WRITER_IDLE) { 1145 idleCountForWrite.set(0); 1146 } 1147 } 1148 1149 if (status == IdleStatus.BOTH_IDLE) { 1150 return idleCountForBoth.get(); 1151 } 1152 1153 if (status == IdleStatus.READER_IDLE) { 1154 return idleCountForRead.get(); 1155 } 1156 1157 if (status == IdleStatus.WRITER_IDLE) { 1158 return idleCountForWrite.get(); 1159 } 1160 1161 throw new IllegalArgumentException("Unknown idle status: " + status); 1162 } 1163 1164 /** 1165 * {@inheritDoc} 1166 */ 1167 public final long getLastIdleTime(IdleStatus status) { 1168 if (status == IdleStatus.BOTH_IDLE) { 1169 return lastIdleTimeForBoth; 1170 } 1171 1172 if (status == IdleStatus.READER_IDLE) { 1173 return lastIdleTimeForRead; 1174 } 1175 1176 if (status == IdleStatus.WRITER_IDLE) { 1177 return lastIdleTimeForWrite; 1178 } 1179 1180 throw new IllegalArgumentException("Unknown idle status: " + status); 1181 } 1182 1183 /** 1184 * Increase the count of the various Idle counter 1185 * 1186 * @param status The current status 1187 * @param currentTime The current time 1188 */ 1189 public final void increaseIdleCount(IdleStatus status, long currentTime) { 1190 if (status == IdleStatus.BOTH_IDLE) { 1191 idleCountForBoth.incrementAndGet(); 1192 lastIdleTimeForBoth = currentTime; 1193 } else if (status == IdleStatus.READER_IDLE) { 1194 idleCountForRead.incrementAndGet(); 1195 lastIdleTimeForRead = currentTime; 1196 } else if (status == IdleStatus.WRITER_IDLE) { 1197 idleCountForWrite.incrementAndGet(); 1198 lastIdleTimeForWrite = currentTime; 1199 } else { 1200 throw new IllegalArgumentException("Unknown idle status: " + status); 1201 } 1202 } 1203 1204 /** 1205 * {@inheritDoc} 1206 */ 1207 public final int getBothIdleCount() { 1208 return getIdleCount(IdleStatus.BOTH_IDLE); 1209 } 1210 1211 /** 1212 * {@inheritDoc} 1213 */ 1214 public final long getLastBothIdleTime() { 1215 return getLastIdleTime(IdleStatus.BOTH_IDLE); 1216 } 1217 1218 /** 1219 * {@inheritDoc} 1220 */ 1221 public final long getLastReaderIdleTime() { 1222 return getLastIdleTime(IdleStatus.READER_IDLE); 1223 } 1224 1225 /** 1226 * {@inheritDoc} 1227 */ 1228 public final long getLastWriterIdleTime() { 1229 return getLastIdleTime(IdleStatus.WRITER_IDLE); 1230 } 1231 1232 /** 1233 * {@inheritDoc} 1234 */ 1235 public final int getReaderIdleCount() { 1236 return getIdleCount(IdleStatus.READER_IDLE); 1237 } 1238 1239 /** 1240 * {@inheritDoc} 1241 */ 1242 public final int getWriterIdleCount() { 1243 return getIdleCount(IdleStatus.WRITER_IDLE); 1244 } 1245 1246 /** 1247 * {@inheritDoc} 1248 */ 1249 public SocketAddress getServiceAddress() { 1250 IoService service = getService(); 1251 if (service instanceof IoAcceptor) { 1252 return ((IoAcceptor) service).getLocalAddress(); 1253 } 1254 1255 return getRemoteAddress(); 1256 } 1257 1258 /** 1259 * {@inheritDoc} 1260 */ 1261 @Override 1262 public final int hashCode() { 1263 return super.hashCode(); 1264 } 1265 1266 /** 1267 * {@inheritDoc} TODO This is a ridiculous implementation. Need to be 1268 * replaced. 1269 */ 1270 @Override 1271 public final boolean equals(Object o) { 1272 return super.equals(o); 1273 } 1274 1275 /** 1276 * {@inheritDoc} 1277 */ 1278 @Override 1279 public String toString() { 1280 if (isConnected() || isClosing()) { 1281 String remote = null; 1282 String local = null; 1283 1284 try { 1285 remote = String.valueOf(getRemoteAddress()); 1286 } catch (Exception e) { 1287 remote = "Cannot get the remote address informations: " + e.getMessage(); 1288 } 1289 1290 try { 1291 local = String.valueOf(getLocalAddress()); 1292 } catch (Exception e) { 1293 } 1294 1295 if (getService() instanceof IoAcceptor) { 1296 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')'; 1297 } 1298 1299 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')'; 1300 } 1301 1302 return "(" + getIdAsString() + ") Session disconnected ..."; 1303 } 1304 1305 /** 1306 * Get the Id as a String 1307 */ 1308 private String getIdAsString() { 1309 String id = Long.toHexString(getId()).toUpperCase(); 1310 1311 if (id.length() <= 8) { 1312 return "0x00000000".substring(0, 10 - id.length()) + id; 1313 } else { 1314 return "0x" + id; 1315 } 1316 } 1317 1318 /** 1319 * TGet the Service name 1320 */ 1321 private String getServiceName() { 1322 TransportMetadata tm = getTransportMetadata(); 1323 if (tm == null) { 1324 return "null"; 1325 } 1326 1327 return tm.getProviderName() + ' ' + tm.getName(); 1328 } 1329 1330 /** 1331 * {@inheritDoc} 1332 */ 1333 public IoService getService() { 1334 return service; 1335 } 1336 1337 /** 1338 * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions 1339 * in the specified collection. 1340 * 1341 * @param sessions The sessions that are notified 1342 * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) 1343 */ 1344 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) { 1345 while (sessions.hasNext()) { 1346 IoSession session = sessions.next(); 1347 1348 if (!session.getCloseFuture().isClosed()) { 1349 notifyIdleSession(session, currentTime); 1350 } 1351 } 1352 } 1353 1354 /** 1355 * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the 1356 * specified {@code session}. 1357 * 1358 * @param session The session that is notified 1359 * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) 1360 */ 1361 public static void notifyIdleSession(IoSession session, long currentTime) { 1362 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), 1363 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); 1364 1365 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), 1366 IdleStatus.READER_IDLE, 1367 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); 1368 1369 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), 1370 IdleStatus.WRITER_IDLE, 1371 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); 1372 1373 notifyWriteTimeout(session, currentTime); 1374 } 1375 1376 private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, 1377 long lastIoTime) { 1378 if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) { 1379 session.getFilterChain().fireSessionIdle(status); 1380 } 1381 } 1382 1383 private static void notifyWriteTimeout(IoSession session, long currentTime) { 1384 1385 long writeTimeout = session.getConfig().getWriteTimeoutInMillis(); 1386 if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout) 1387 && !session.getWriteRequestQueue().isEmpty(session)) { 1388 WriteRequest request = session.getCurrentWriteRequest(); 1389 if (request != null) { 1390 session.setCurrentWriteRequest(null); 1391 WriteTimeoutException cause = new WriteTimeoutException(request); 1392 request.getFuture().setException(cause); 1393 session.getFilterChain().fireExceptionCaught(cause); 1394 // WriteException is an IOException, so we close the session. 1395 session.close(true); 1396 } 1397 } 1398 } 1399 1400 /** 1401 * A queue which handles the CLOSE request. 1402 * 1403 * TODO : Check that when closing a session, all the pending requests are 1404 * correctly sent. 1405 */ 1406 private class CloseAwareWriteQueue implements WriteRequestQueue { 1407 1408 private final WriteRequestQueue queue; 1409 1410 /** 1411 * {@inheritDoc} 1412 */ 1413 public CloseAwareWriteQueue(WriteRequestQueue queue) { 1414 this.queue = queue; 1415 } 1416 1417 /** 1418 * {@inheritDoc} 1419 */ 1420 public synchronized WriteRequest poll(IoSession session) { 1421 WriteRequest answer = queue.poll(session); 1422 1423 if (answer == CLOSE_REQUEST) { 1424 AbstractIoSession.this.close( true ); 1425 dispose(session); 1426 answer = null; 1427 } 1428 1429 return answer; 1430 } 1431 1432 /** 1433 * {@inheritDoc} 1434 */ 1435 public void offer(IoSession session, WriteRequest e) { 1436 queue.offer(session, e); 1437 } 1438 1439 /** 1440 * {@inheritDoc} 1441 */ 1442 public boolean isEmpty(IoSession session) { 1443 return queue.isEmpty(session); 1444 } 1445 1446 /** 1447 * {@inheritDoc} 1448 */ 1449 public void clear(IoSession session) { 1450 queue.clear(session); 1451 } 1452 1453 /** 1454 * {@inheritDoc} 1455 */ 1456 public void dispose(IoSession session) { 1457 queue.dispose(session); 1458 } 1459 1460 /** 1461 * {@inheritDoc} 1462 */ 1463 public int size() { 1464 return queue.size(); 1465 } 1466 } 1467}