001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.session; 021 022import java.io.File; 023import java.io.FileInputStream; 024import java.io.IOException; 025import java.net.SocketAddress; 026import java.nio.channels.FileChannel; 027import java.util.Iterator; 028import java.util.Queue; 029import java.util.Set; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.atomic.AtomicBoolean; 032import java.util.concurrent.atomic.AtomicInteger; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.mina.core.buffer.IoBuffer; 036import org.apache.mina.core.file.DefaultFileRegion; 037import org.apache.mina.core.file.FilenameFileRegion; 038import org.apache.mina.core.filterchain.IoFilterChain; 039import org.apache.mina.core.future.CloseFuture; 040import org.apache.mina.core.future.DefaultCloseFuture; 041import org.apache.mina.core.future.DefaultReadFuture; 042import org.apache.mina.core.future.DefaultWriteFuture; 043import org.apache.mina.core.future.IoFutureListener; 044import org.apache.mina.core.future.ReadFuture; 045import org.apache.mina.core.future.WriteFuture; 046import org.apache.mina.core.service.AbstractIoService; 047import org.apache.mina.core.service.IoAcceptor; 048import org.apache.mina.core.service.IoHandler; 049import org.apache.mina.core.service.IoProcessor; 050import org.apache.mina.core.service.IoService; 051import org.apache.mina.core.service.TransportMetadata; 052import org.apache.mina.core.write.DefaultWriteRequest; 053import org.apache.mina.core.write.WriteException; 054import org.apache.mina.core.write.WriteRequest; 055import org.apache.mina.core.write.WriteRequestQueue; 056import org.apache.mina.core.write.WriteTimeoutException; 057import org.apache.mina.core.write.WriteToClosedSessionException; 058import org.apache.mina.util.ExceptionMonitor; 059 060/** 061 * Base implementation of {@link IoSession}. 062 * 063 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 064 */ 065public abstract class AbstractIoSession implements IoSession { 066 /** The associated handler */ 067 private final IoHandler handler; 068 069 /** The session config */ 070 protected IoSessionConfig config; 071 072 /** The service which will manage this session */ 073 private final IoService service; 074 075 private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, 076 "readyReadFutures"); 077 078 private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class, 079 "waitingReadFutures"); 080 081 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() { 082 public void operationComplete(CloseFuture future) { 083 AbstractIoSession session = (AbstractIoSession) future.getSession(); 084 session.scheduledWriteBytes.set(0); 085 session.scheduledWriteMessages.set(0); 086 session.readBytesThroughput = 0; 087 session.readMessagesThroughput = 0; 088 session.writtenBytesThroughput = 0; 089 session.writtenMessagesThroughput = 0; 090 } 091 }; 092 093 /** 094 * An internal write request object that triggers session close. 095 * 096 * @see #writeRequestQueue 097 */ 098 public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object()); 099 100 /** 101 * An internal write request object that triggers message sent events. 102 * 103 * @see #writeRequestQueue 104 */ 105 public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE); 106 107 private final Object lock = new Object(); 108 109 private IoSessionAttributeMap attributes; 110 111 private WriteRequestQueue writeRequestQueue; 112 113 private WriteRequest currentWriteRequest; 114 115 /** The Session creation's time */ 116 private final long creationTime; 117 118 /** An id generator guaranteed to generate unique IDs for the session */ 119 private static AtomicLong idGenerator = new AtomicLong(0); 120 121 /** The session ID */ 122 private long sessionId; 123 124 /** 125 * A future that will be set 'closed' when the connection is closed. 126 */ 127 private final CloseFuture closeFuture = new DefaultCloseFuture(this); 128 129 private volatile boolean closing; 130 131 // traffic control 132 private boolean readSuspended = false; 133 134 private boolean writeSuspended = false; 135 136 // Status variables 137 private final AtomicBoolean scheduledForFlush = new AtomicBoolean(); 138 139 private final AtomicInteger scheduledWriteBytes = new AtomicInteger(); 140 141 private final AtomicInteger scheduledWriteMessages = new AtomicInteger(); 142 143 private long readBytes; 144 145 private long writtenBytes; 146 147 private long readMessages; 148 149 private long writtenMessages; 150 151 private long lastReadTime; 152 153 private long lastWriteTime; 154 155 private long lastThroughputCalculationTime; 156 157 private long lastReadBytes; 158 159 private long lastWrittenBytes; 160 161 private long lastReadMessages; 162 163 private long lastWrittenMessages; 164 165 private double readBytesThroughput; 166 167 private double writtenBytesThroughput; 168 169 private double readMessagesThroughput; 170 171 private double writtenMessagesThroughput; 172 173 private AtomicInteger idleCountForBoth = new AtomicInteger(); 174 175 private AtomicInteger idleCountForRead = new AtomicInteger(); 176 177 private AtomicInteger idleCountForWrite = new AtomicInteger(); 178 179 private long lastIdleTimeForBoth; 180 181 private long lastIdleTimeForRead; 182 183 private long lastIdleTimeForWrite; 184 185 private boolean deferDecreaseReadBuffer = true; 186 187 /** 188 * Create a Session for a service 189 * 190 * @param service the Service for this session 191 */ 192 protected AbstractIoSession(IoService service) { 193 this.service = service; 194 this.handler = service.getHandler(); 195 196 // Initialize all the Session counters to the current time 197 long currentTime = System.currentTimeMillis(); 198 creationTime = currentTime; 199 lastThroughputCalculationTime = currentTime; 200 lastReadTime = currentTime; 201 lastWriteTime = currentTime; 202 lastIdleTimeForBoth = currentTime; 203 lastIdleTimeForRead = currentTime; 204 lastIdleTimeForWrite = currentTime; 205 206 // TODO add documentation 207 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER); 208 209 // Set a new ID for this session 210 sessionId = idGenerator.incrementAndGet(); 211 } 212 213 /** 214 * {@inheritDoc} 215 * 216 * We use an AtomicLong to guarantee that the session ID are unique. 217 */ 218 public final long getId() { 219 return sessionId; 220 } 221 222 /** 223 * @return The associated IoProcessor for this session 224 */ 225 public abstract IoProcessor getProcessor(); 226 227 /** 228 * {@inheritDoc} 229 */ 230 public final boolean isConnected() { 231 return !closeFuture.isClosed(); 232 } 233 234 /** 235 * {@inheritDoc} 236 */ 237 public boolean isActive() { 238 // Return true by default 239 return true; 240 } 241 242 /** 243 * {@inheritDoc} 244 */ 245 public final boolean isClosing() { 246 return closing || closeFuture.isClosed(); 247 } 248 249 /** 250 * {@inheritDoc} 251 */ 252 public boolean isSecured() { 253 // Always false... 254 return false; 255 } 256 257 /** 258 * {@inheritDoc} 259 */ 260 public final CloseFuture getCloseFuture() { 261 return closeFuture; 262 } 263 264 /** 265 * Tells if the session is scheduled for flushed 266 * 267 * @return true if the session is scheduled for flush 268 */ 269 public final boolean isScheduledForFlush() { 270 return scheduledForFlush.get(); 271 } 272 273 /** 274 * Schedule the session for flushed 275 */ 276 public final void scheduledForFlush() { 277 scheduledForFlush.set(true); 278 } 279 280 /** 281 * Change the session's status : it's not anymore scheduled for flush 282 */ 283 public final void unscheduledForFlush() { 284 scheduledForFlush.set(false); 285 } 286 287 /** 288 * Set the scheduledForFLush flag. As we may have concurrent access to this 289 * flag, we compare and set it in one call. 290 * 291 * @param schedule 292 * the new value to set if not already set. 293 * @return true if the session flag has been set, and if it wasn't set 294 * already. 295 */ 296 public final boolean setScheduledForFlush(boolean schedule) { 297 if (schedule) { 298 // If the current tag is set to false, switch it to true, 299 // otherwise, we do nothing but return false : the session 300 // is already scheduled for flush 301 return scheduledForFlush.compareAndSet(false, schedule); 302 } 303 304 scheduledForFlush.set(schedule); 305 return true; 306 } 307 308 /** 309 * {@inheritDoc} 310 */ 311 public final CloseFuture close(boolean rightNow) { 312 if (rightNow) { 313 return closeNow(); 314 } else { 315 return closeOnFlush(); 316 } 317 } 318 319 /** 320 * {@inheritDoc} 321 */ 322 public final CloseFuture close() { 323 return closeNow(); 324 } 325 326 /** 327 * {@inheritDoc} 328 */ 329 public final CloseFuture closeOnFlush() { 330 if (!isClosing()) { 331 getWriteRequestQueue().offer(this, CLOSE_REQUEST); 332 getProcessor().flush(this); 333 } 334 335 return closeFuture; 336 } 337 338 /** 339 * {@inheritDoc} 340 */ 341 public final CloseFuture closeNow() { 342 synchronized (lock) { 343 if (isClosing()) { 344 return closeFuture; 345 } 346 347 closing = true; 348 349 try { 350 destroy(); 351 } catch (Exception e) { 352 IoFilterChain filterChain = getFilterChain(); 353 filterChain.fireExceptionCaught(e); 354 } 355 } 356 357 getFilterChain().fireFilterClose(); 358 359 return closeFuture; 360 } 361 362 /** 363 * Destroy the session 364 */ 365 protected void destroy() { 366 if (writeRequestQueue != null) { 367 while (!writeRequestQueue.isEmpty(this)) { 368 WriteRequest writeRequest = writeRequestQueue.poll(this); 369 writeRequest.getFuture().setWritten(); 370 } 371 } 372 } 373 374 /** 375 * {@inheritDoc} 376 */ 377 public IoHandler getHandler() { 378 return handler; 379 } 380 381 /** 382 * {@inheritDoc} 383 */ 384 public IoSessionConfig getConfig() { 385 return config; 386 } 387 388 /** 389 * {@inheritDoc} 390 */ 391 public final ReadFuture read() { 392 if (!getConfig().isUseReadOperation()) { 393 throw new IllegalStateException("useReadOperation is not enabled."); 394 } 395 396 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); 397 ReadFuture future; 398 synchronized (readyReadFutures) { 399 future = readyReadFutures.poll(); 400 if (future != null) { 401 if (future.isClosed()) { 402 // Let other readers get notified. 403 readyReadFutures.offer(future); 404 } 405 } else { 406 future = new DefaultReadFuture(this); 407 getWaitingReadFutures().offer(future); 408 } 409 } 410 411 return future; 412 } 413 414 /** 415 * Associates a message to a ReadFuture 416 * 417 * @param message the message to associate to the ReadFuture 418 * 419 */ 420 public final void offerReadFuture(Object message) { 421 newReadFuture().setRead(message); 422 } 423 424 /** 425 * Associates a failure to a ReadFuture 426 * 427 * @param exception the exception to associate to the ReadFuture 428 */ 429 public final void offerFailedReadFuture(Throwable exception) { 430 newReadFuture().setException(exception); 431 } 432 433 /** 434 * Inform the ReadFuture that the session has been closed 435 */ 436 public final void offerClosedReadFuture() { 437 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); 438 439 synchronized (readyReadFutures) { 440 newReadFuture().setClosed(); 441 } 442 } 443 444 /** 445 * @return a readFuture get from the waiting ReadFuture 446 */ 447 private ReadFuture newReadFuture() { 448 Queue<ReadFuture> readyReadFutures = getReadyReadFutures(); 449 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures(); 450 ReadFuture future; 451 452 synchronized (readyReadFutures) { 453 future = waitingReadFutures.poll(); 454 455 if (future == null) { 456 future = new DefaultReadFuture(this); 457 readyReadFutures.offer(future); 458 } 459 } 460 461 return future; 462 } 463 464 /** 465 * @return a queue of ReadFuture 466 */ 467 private Queue<ReadFuture> getReadyReadFutures() { 468 Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY); 469 470 if (readyReadFutures == null) { 471 readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>(); 472 473 Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, 474 readyReadFutures); 475 476 if (oldReadyReadFutures != null) { 477 readyReadFutures = oldReadyReadFutures; 478 } 479 } 480 481 return readyReadFutures; 482 } 483 484 /** 485 * @return the queue of waiting ReadFuture 486 */ 487 private Queue<ReadFuture> getWaitingReadFutures() { 488 Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY); 489 490 if (waitingReadyReadFutures == null) { 491 waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>(); 492 493 Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent( 494 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures); 495 496 if (oldWaitingReadyReadFutures != null) { 497 waitingReadyReadFutures = oldWaitingReadyReadFutures; 498 } 499 } 500 501 return waitingReadyReadFutures; 502 } 503 504 /** 505 * {@inheritDoc} 506 */ 507 public WriteFuture write(Object message) { 508 return write(message, null); 509 } 510 511 /** 512 * {@inheritDoc} 513 */ 514 public WriteFuture write(Object message, SocketAddress remoteAddress) { 515 if (message == null) { 516 throw new IllegalArgumentException("Trying to write a null message : not allowed"); 517 } 518 519 // We can't send a message to a connected session if we don't have 520 // the remote address 521 if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) { 522 throw new UnsupportedOperationException(); 523 } 524 525 // If the session has been closed or is closing, we can't either 526 // send a message to the remote side. We generate a future 527 // containing an exception. 528 if (isClosing() || !isConnected()) { 529 WriteFuture future = new DefaultWriteFuture(this); 530 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); 531 WriteException writeException = new WriteToClosedSessionException(request); 532 future.setException(writeException); 533 return future; 534 } 535 536 FileChannel openedFileChannel = null; 537 538 // TODO: remove this code as soon as we use InputStream 539 // instead of Object for the message. 540 try { 541 if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) { 542 // Nothing to write : probably an error in the user code 543 throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); 544 } else if (message instanceof FileChannel) { 545 FileChannel fileChannel = (FileChannel) message; 546 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); 547 } else if (message instanceof File) { 548 File file = (File) message; 549 openedFileChannel = new FileInputStream(file).getChannel(); 550 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size()); 551 } 552 } catch (IOException e) { 553 ExceptionMonitor.getInstance().exceptionCaught(e); 554 return DefaultWriteFuture.newNotWrittenFuture(this, e); 555 } 556 557 // Now, we can write the message. First, create a future 558 WriteFuture writeFuture = new DefaultWriteFuture(this); 559 WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); 560 561 // Then, get the chain and inject the WriteRequest into it 562 IoFilterChain filterChain = getFilterChain(); 563 filterChain.fireFilterWrite(writeRequest); 564 565 // TODO : This is not our business ! The caller has created a 566 // FileChannel, 567 // he has to close it ! 568 if (openedFileChannel != null) { 569 // If we opened a FileChannel, it needs to be closed when the write 570 // has completed 571 final FileChannel finalChannel = openedFileChannel; 572 writeFuture.addListener(new IoFutureListener<WriteFuture>() { 573 public void operationComplete(WriteFuture future) { 574 try { 575 finalChannel.close(); 576 } catch (IOException e) { 577 ExceptionMonitor.getInstance().exceptionCaught(e); 578 } 579 } 580 }); 581 } 582 583 // Return the WriteFuture. 584 return writeFuture; 585 } 586 587 /** 588 * {@inheritDoc} 589 */ 590 public final Object getAttachment() { 591 return getAttribute(""); 592 } 593 594 /** 595 * {@inheritDoc} 596 */ 597 public final Object setAttachment(Object attachment) { 598 return setAttribute("", attachment); 599 } 600 601 /** 602 * {@inheritDoc} 603 */ 604 public final Object getAttribute(Object key) { 605 return getAttribute(key, null); 606 } 607 608 /** 609 * {@inheritDoc} 610 */ 611 public final Object getAttribute(Object key, Object defaultValue) { 612 return attributes.getAttribute(this, key, defaultValue); 613 } 614 615 /** 616 * {@inheritDoc} 617 */ 618 public final Object setAttribute(Object key, Object value) { 619 return attributes.setAttribute(this, key, value); 620 } 621 622 /** 623 * {@inheritDoc} 624 */ 625 public final Object setAttribute(Object key) { 626 return setAttribute(key, Boolean.TRUE); 627 } 628 629 /** 630 * {@inheritDoc} 631 */ 632 public final Object setAttributeIfAbsent(Object key, Object value) { 633 return attributes.setAttributeIfAbsent(this, key, value); 634 } 635 636 /** 637 * {@inheritDoc} 638 */ 639 public final Object setAttributeIfAbsent(Object key) { 640 return setAttributeIfAbsent(key, Boolean.TRUE); 641 } 642 643 /** 644 * {@inheritDoc} 645 */ 646 public final Object removeAttribute(Object key) { 647 return attributes.removeAttribute(this, key); 648 } 649 650 /** 651 * {@inheritDoc} 652 */ 653 public final boolean removeAttribute(Object key, Object value) { 654 return attributes.removeAttribute(this, key, value); 655 } 656 657 /** 658 * {@inheritDoc} 659 */ 660 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) { 661 return attributes.replaceAttribute(this, key, oldValue, newValue); 662 } 663 664 /** 665 * {@inheritDoc} 666 */ 667 public final boolean containsAttribute(Object key) { 668 return attributes.containsAttribute(this, key); 669 } 670 671 /** 672 * {@inheritDoc} 673 */ 674 public final Set<Object> getAttributeKeys() { 675 return attributes.getAttributeKeys(this); 676 } 677 678 /** 679 * @return The map of attributes associated with the session 680 */ 681 public final IoSessionAttributeMap getAttributeMap() { 682 return attributes; 683 } 684 685 /** 686 * Set the map of attributes associated with the session 687 * 688 * @param attributes The Map of attributes 689 */ 690 public final void setAttributeMap(IoSessionAttributeMap attributes) { 691 this.attributes = attributes; 692 } 693 694 /** 695 * Create a new close aware write queue, based on the given write queue. 696 * 697 * @param writeRequestQueue The write request queue 698 */ 699 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) { 700 this.writeRequestQueue = writeRequestQueue; 701 } 702 703 /** 704 * {@inheritDoc} 705 */ 706 public final void suspendRead() { 707 readSuspended = true; 708 if (isClosing() || !isConnected()) { 709 return; 710 } 711 getProcessor().updateTrafficControl(this); 712 } 713 714 /** 715 * {@inheritDoc} 716 */ 717 public final void suspendWrite() { 718 writeSuspended = true; 719 if (isClosing() || !isConnected()) { 720 return; 721 } 722 getProcessor().updateTrafficControl(this); 723 } 724 725 /** 726 * {@inheritDoc} 727 */ 728 @SuppressWarnings("unchecked") 729 public final void resumeRead() { 730 readSuspended = false; 731 if (isClosing() || !isConnected()) { 732 return; 733 } 734 getProcessor().updateTrafficControl(this); 735 } 736 737 /** 738 * {@inheritDoc} 739 */ 740 @SuppressWarnings("unchecked") 741 public final void resumeWrite() { 742 writeSuspended = false; 743 if (isClosing() || !isConnected()) { 744 return; 745 } 746 getProcessor().updateTrafficControl(this); 747 } 748 749 /** 750 * {@inheritDoc} 751 */ 752 public boolean isReadSuspended() { 753 return readSuspended; 754 } 755 756 /** 757 * {@inheritDoc} 758 */ 759 public boolean isWriteSuspended() { 760 return writeSuspended; 761 } 762 763 /** 764 * {@inheritDoc} 765 */ 766 public final long getReadBytes() { 767 return readBytes; 768 } 769 770 /** 771 * {@inheritDoc} 772 */ 773 public final long getWrittenBytes() { 774 return writtenBytes; 775 } 776 777 /** 778 * {@inheritDoc} 779 */ 780 public final long getReadMessages() { 781 return readMessages; 782 } 783 784 /** 785 * {@inheritDoc} 786 */ 787 public final long getWrittenMessages() { 788 return writtenMessages; 789 } 790 791 /** 792 * {@inheritDoc} 793 */ 794 public final double getReadBytesThroughput() { 795 return readBytesThroughput; 796 } 797 798 /** 799 * {@inheritDoc} 800 */ 801 public final double getWrittenBytesThroughput() { 802 return writtenBytesThroughput; 803 } 804 805 /** 806 * {@inheritDoc} 807 */ 808 public final double getReadMessagesThroughput() { 809 return readMessagesThroughput; 810 } 811 812 /** 813 * {@inheritDoc} 814 */ 815 public final double getWrittenMessagesThroughput() { 816 return writtenMessagesThroughput; 817 } 818 819 /** 820 * {@inheritDoc} 821 */ 822 public final void updateThroughput(long currentTime, boolean force) { 823 int interval = (int) (currentTime - lastThroughputCalculationTime); 824 825 long minInterval = getConfig().getThroughputCalculationIntervalInMillis(); 826 827 if (((minInterval == 0) || (interval < minInterval)) && !force) { 828 return; 829 } 830 831 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval; 832 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval; 833 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval; 834 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval; 835 836 lastReadBytes = readBytes; 837 lastWrittenBytes = writtenBytes; 838 lastReadMessages = readMessages; 839 lastWrittenMessages = writtenMessages; 840 841 lastThroughputCalculationTime = currentTime; 842 } 843 844 /** 845 * {@inheritDoc} 846 */ 847 public final long getScheduledWriteBytes() { 848 return scheduledWriteBytes.get(); 849 } 850 851 /** 852 * {@inheritDoc} 853 */ 854 public final int getScheduledWriteMessages() { 855 return scheduledWriteMessages.get(); 856 } 857 858 /** 859 * Set the number of scheduled write bytes 860 * 861 * @param byteCount The number of scheduled bytes for write 862 */ 863 protected void setScheduledWriteBytes(int byteCount) { 864 scheduledWriteBytes.set(byteCount); 865 } 866 867 /** 868 * Set the number of scheduled write messages 869 * 870 * @param messages The number of scheduled messages for write 871 */ 872 protected void setScheduledWriteMessages(int messages) { 873 scheduledWriteMessages.set(messages); 874 } 875 876 /** 877 * Increase the number of read bytes 878 * 879 * @param increment The number of read bytes 880 * @param currentTime The current time 881 */ 882 public final void increaseReadBytes(long increment, long currentTime) { 883 if (increment <= 0) { 884 return; 885 } 886 887 readBytes += increment; 888 lastReadTime = currentTime; 889 idleCountForBoth.set(0); 890 idleCountForRead.set(0); 891 892 if (getService() instanceof AbstractIoService) { 893 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime); 894 } 895 } 896 897 /** 898 * Increase the number of read messages 899 * 900 * @param currentTime The current time 901 */ 902 public final void increaseReadMessages(long currentTime) { 903 readMessages++; 904 lastReadTime = currentTime; 905 idleCountForBoth.set(0); 906 idleCountForRead.set(0); 907 908 if (getService() instanceof AbstractIoService) { 909 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime); 910 } 911 } 912 913 /** 914 * Increase the number of written bytes 915 * 916 * @param increment The number of written bytes 917 * @param currentTime The current time 918 */ 919 public final void increaseWrittenBytes(int increment, long currentTime) { 920 if (increment <= 0) { 921 return; 922 } 923 924 writtenBytes += increment; 925 lastWriteTime = currentTime; 926 idleCountForBoth.set(0); 927 idleCountForWrite.set(0); 928 929 if (getService() instanceof AbstractIoService) { 930 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime); 931 } 932 933 increaseScheduledWriteBytes(-increment); 934 } 935 936 /** 937 * Increase the number of written messages 938 * 939 * @param request The written message 940 * @param currentTime The current tile 941 */ 942 public final void increaseWrittenMessages(WriteRequest request, long currentTime) { 943 Object message = request.getMessage(); 944 945 if (message instanceof IoBuffer) { 946 IoBuffer b = (IoBuffer) message; 947 948 if (b.hasRemaining()) { 949 return; 950 } 951 } 952 953 writtenMessages++; 954 lastWriteTime = currentTime; 955 956 if (getService() instanceof AbstractIoService) { 957 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime); 958 } 959 960 decreaseScheduledWriteMessages(); 961 } 962 963 /** 964 * Increase the number of scheduled write bytes for the session 965 * 966 * @param increment The number of newly added bytes to write 967 */ 968 public final void increaseScheduledWriteBytes(int increment) { 969 scheduledWriteBytes.addAndGet(increment); 970 if (getService() instanceof AbstractIoService) { 971 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment); 972 } 973 } 974 975 /** 976 * Increase the number of scheduled message to write 977 */ 978 public final void increaseScheduledWriteMessages() { 979 scheduledWriteMessages.incrementAndGet(); 980 981 if (getService() instanceof AbstractIoService) { 982 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages(); 983 } 984 } 985 986 /** 987 * Decrease the number of scheduled message written 988 */ 989 private void decreaseScheduledWriteMessages() { 990 scheduledWriteMessages.decrementAndGet(); 991 if (getService() instanceof AbstractIoService) { 992 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages(); 993 } 994 } 995 996 /** 997 * Decrease the counters of written messages and written bytes when a message has been written 998 * 999 * @param request The written message 1000 */ 1001 public final void decreaseScheduledBytesAndMessages(WriteRequest request) { 1002 Object message = request.getMessage(); 1003 1004 if (message instanceof IoBuffer) { 1005 IoBuffer b = (IoBuffer) message; 1006 1007 if (b.hasRemaining()) { 1008 increaseScheduledWriteBytes(-((IoBuffer) message).remaining()); 1009 } else { 1010 decreaseScheduledWriteMessages(); 1011 } 1012 } else { 1013 decreaseScheduledWriteMessages(); 1014 } 1015 } 1016 1017 /** 1018 * {@inheritDoc} 1019 */ 1020 public final WriteRequestQueue getWriteRequestQueue() { 1021 if (writeRequestQueue == null) { 1022 throw new IllegalStateException(); 1023 } 1024 1025 return writeRequestQueue; 1026 } 1027 1028 /** 1029 * {@inheritDoc} 1030 */ 1031 public final WriteRequest getCurrentWriteRequest() { 1032 return currentWriteRequest; 1033 } 1034 1035 /** 1036 * {@inheritDoc} 1037 */ 1038 public final Object getCurrentWriteMessage() { 1039 WriteRequest req = getCurrentWriteRequest(); 1040 1041 if (req == null) { 1042 return null; 1043 } 1044 return req.getMessage(); 1045 } 1046 1047 /** 1048 * {@inheritDoc} 1049 */ 1050 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) { 1051 this.currentWriteRequest = currentWriteRequest; 1052 } 1053 1054 /** 1055 * Increase the ReadBuffer size (it will double) 1056 */ 1057 public final void increaseReadBufferSize() { 1058 int newReadBufferSize = getConfig().getReadBufferSize() << 1; 1059 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) { 1060 getConfig().setReadBufferSize(newReadBufferSize); 1061 } else { 1062 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize()); 1063 } 1064 1065 deferDecreaseReadBuffer = true; 1066 } 1067 1068 /** 1069 * Decrease the ReadBuffer size (it will be divided by a factor 2) 1070 */ 1071 public final void decreaseReadBufferSize() { 1072 if (deferDecreaseReadBuffer) { 1073 deferDecreaseReadBuffer = false; 1074 return; 1075 } 1076 1077 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) { 1078 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1); 1079 } 1080 1081 deferDecreaseReadBuffer = true; 1082 } 1083 1084 /** 1085 * {@inheritDoc} 1086 */ 1087 public final long getCreationTime() { 1088 return creationTime; 1089 } 1090 1091 /** 1092 * {@inheritDoc} 1093 */ 1094 public final long getLastIoTime() { 1095 return Math.max(lastReadTime, lastWriteTime); 1096 } 1097 1098 /** 1099 * {@inheritDoc} 1100 */ 1101 public final long getLastReadTime() { 1102 return lastReadTime; 1103 } 1104 1105 /** 1106 * {@inheritDoc} 1107 */ 1108 public final long getLastWriteTime() { 1109 return lastWriteTime; 1110 } 1111 1112 /** 1113 * {@inheritDoc} 1114 */ 1115 public final boolean isIdle(IdleStatus status) { 1116 if (status == IdleStatus.BOTH_IDLE) { 1117 return idleCountForBoth.get() > 0; 1118 } 1119 1120 if (status == IdleStatus.READER_IDLE) { 1121 return idleCountForRead.get() > 0; 1122 } 1123 1124 if (status == IdleStatus.WRITER_IDLE) { 1125 return idleCountForWrite.get() > 0; 1126 } 1127 1128 throw new IllegalArgumentException("Unknown idle status: " + status); 1129 } 1130 1131 /** 1132 * {@inheritDoc} 1133 */ 1134 public final boolean isBothIdle() { 1135 return isIdle(IdleStatus.BOTH_IDLE); 1136 } 1137 1138 /** 1139 * {@inheritDoc} 1140 */ 1141 public final boolean isReaderIdle() { 1142 return isIdle(IdleStatus.READER_IDLE); 1143 } 1144 1145 /** 1146 * {@inheritDoc} 1147 */ 1148 public final boolean isWriterIdle() { 1149 return isIdle(IdleStatus.WRITER_IDLE); 1150 } 1151 1152 /** 1153 * {@inheritDoc} 1154 */ 1155 public final int getIdleCount(IdleStatus status) { 1156 if (getConfig().getIdleTime(status) == 0) { 1157 if (status == IdleStatus.BOTH_IDLE) { 1158 idleCountForBoth.set(0); 1159 } 1160 1161 if (status == IdleStatus.READER_IDLE) { 1162 idleCountForRead.set(0); 1163 } 1164 1165 if (status == IdleStatus.WRITER_IDLE) { 1166 idleCountForWrite.set(0); 1167 } 1168 } 1169 1170 if (status == IdleStatus.BOTH_IDLE) { 1171 return idleCountForBoth.get(); 1172 } 1173 1174 if (status == IdleStatus.READER_IDLE) { 1175 return idleCountForRead.get(); 1176 } 1177 1178 if (status == IdleStatus.WRITER_IDLE) { 1179 return idleCountForWrite.get(); 1180 } 1181 1182 throw new IllegalArgumentException("Unknown idle status: " + status); 1183 } 1184 1185 /** 1186 * {@inheritDoc} 1187 */ 1188 public final long getLastIdleTime(IdleStatus status) { 1189 if (status == IdleStatus.BOTH_IDLE) { 1190 return lastIdleTimeForBoth; 1191 } 1192 1193 if (status == IdleStatus.READER_IDLE) { 1194 return lastIdleTimeForRead; 1195 } 1196 1197 if (status == IdleStatus.WRITER_IDLE) { 1198 return lastIdleTimeForWrite; 1199 } 1200 1201 throw new IllegalArgumentException("Unknown idle status: " + status); 1202 } 1203 1204 /** 1205 * Increase the count of the various Idle counter 1206 * 1207 * @param status The current status 1208 * @param currentTime The current time 1209 */ 1210 public final void increaseIdleCount(IdleStatus status, long currentTime) { 1211 if (status == IdleStatus.BOTH_IDLE) { 1212 idleCountForBoth.incrementAndGet(); 1213 lastIdleTimeForBoth = currentTime; 1214 } else if (status == IdleStatus.READER_IDLE) { 1215 idleCountForRead.incrementAndGet(); 1216 lastIdleTimeForRead = currentTime; 1217 } else if (status == IdleStatus.WRITER_IDLE) { 1218 idleCountForWrite.incrementAndGet(); 1219 lastIdleTimeForWrite = currentTime; 1220 } else { 1221 throw new IllegalArgumentException("Unknown idle status: " + status); 1222 } 1223 } 1224 1225 /** 1226 * {@inheritDoc} 1227 */ 1228 public final int getBothIdleCount() { 1229 return getIdleCount(IdleStatus.BOTH_IDLE); 1230 } 1231 1232 /** 1233 * {@inheritDoc} 1234 */ 1235 public final long getLastBothIdleTime() { 1236 return getLastIdleTime(IdleStatus.BOTH_IDLE); 1237 } 1238 1239 /** 1240 * {@inheritDoc} 1241 */ 1242 public final long getLastReaderIdleTime() { 1243 return getLastIdleTime(IdleStatus.READER_IDLE); 1244 } 1245 1246 /** 1247 * {@inheritDoc} 1248 */ 1249 public final long getLastWriterIdleTime() { 1250 return getLastIdleTime(IdleStatus.WRITER_IDLE); 1251 } 1252 1253 /** 1254 * {@inheritDoc} 1255 */ 1256 public final int getReaderIdleCount() { 1257 return getIdleCount(IdleStatus.READER_IDLE); 1258 } 1259 1260 /** 1261 * {@inheritDoc} 1262 */ 1263 public final int getWriterIdleCount() { 1264 return getIdleCount(IdleStatus.WRITER_IDLE); 1265 } 1266 1267 /** 1268 * {@inheritDoc} 1269 */ 1270 public SocketAddress getServiceAddress() { 1271 IoService service = getService(); 1272 if (service instanceof IoAcceptor) { 1273 return ((IoAcceptor) service).getLocalAddress(); 1274 } 1275 1276 return getRemoteAddress(); 1277 } 1278 1279 /** 1280 * {@inheritDoc} 1281 */ 1282 @Override 1283 public final int hashCode() { 1284 return super.hashCode(); 1285 } 1286 1287 /** 1288 * {@inheritDoc} TODO This is a ridiculous implementation. Need to be 1289 * replaced. 1290 */ 1291 @Override 1292 public final boolean equals(Object o) { 1293 return super.equals(o); 1294 } 1295 1296 /** 1297 * {@inheritDoc} 1298 */ 1299 @Override 1300 public String toString() { 1301 if (isConnected() || isClosing()) { 1302 String remote = null; 1303 String local = null; 1304 1305 try { 1306 remote = String.valueOf(getRemoteAddress()); 1307 } catch (Exception e) { 1308 remote = "Cannot get the remote address informations: " + e.getMessage(); 1309 } 1310 1311 try { 1312 local = String.valueOf(getLocalAddress()); 1313 } catch (Exception e) { 1314 } 1315 1316 if (getService() instanceof IoAcceptor) { 1317 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')'; 1318 } 1319 1320 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')'; 1321 } 1322 1323 return "(" + getIdAsString() + ") Session disconnected ..."; 1324 } 1325 1326 /** 1327 * Get the Id as a String 1328 */ 1329 private String getIdAsString() { 1330 String id = Long.toHexString(getId()).toUpperCase(); 1331 1332 if (id.length() <= 8) { 1333 return "0x00000000".substring(0, 10 - id.length()) + id; 1334 } else { 1335 return "0x" + id; 1336 } 1337 } 1338 1339 /** 1340 * TGet the Service name 1341 */ 1342 private String getServiceName() { 1343 TransportMetadata tm = getTransportMetadata(); 1344 if (tm == null) { 1345 return "null"; 1346 } 1347 1348 return tm.getProviderName() + ' ' + tm.getName(); 1349 } 1350 1351 /** 1352 * {@inheritDoc} 1353 */ 1354 public IoService getService() { 1355 return service; 1356 } 1357 1358 /** 1359 * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions 1360 * in the specified collection. 1361 * 1362 * @param sessions The sessions that are notified 1363 * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) 1364 */ 1365 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) { 1366 while (sessions.hasNext()) { 1367 IoSession session = sessions.next(); 1368 1369 if (!session.getCloseFuture().isClosed()) { 1370 notifyIdleSession(session, currentTime); 1371 } 1372 } 1373 } 1374 1375 /** 1376 * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the 1377 * specified {@code session}. 1378 * 1379 * @param session The session that is notified 1380 * @param currentTime the current time (i.e. {@link System#currentTimeMillis()}) 1381 */ 1382 public static void notifyIdleSession(IoSession session, long currentTime) { 1383 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE), 1384 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); 1385 1386 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE), 1387 IdleStatus.READER_IDLE, 1388 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); 1389 1390 notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE), 1391 IdleStatus.WRITER_IDLE, 1392 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); 1393 1394 notifyWriteTimeout(session, currentTime); 1395 } 1396 1397 private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status, 1398 long lastIoTime) { 1399 if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) { 1400 session.getFilterChain().fireSessionIdle(status); 1401 } 1402 } 1403 1404 private static void notifyWriteTimeout(IoSession session, long currentTime) { 1405 1406 long writeTimeout = session.getConfig().getWriteTimeoutInMillis(); 1407 if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout) 1408 && !session.getWriteRequestQueue().isEmpty(session)) { 1409 WriteRequest request = session.getCurrentWriteRequest(); 1410 if (request != null) { 1411 session.setCurrentWriteRequest(null); 1412 WriteTimeoutException cause = new WriteTimeoutException(request); 1413 request.getFuture().setException(cause); 1414 session.getFilterChain().fireExceptionCaught(cause); 1415 // WriteException is an IOException, so we close the session. 1416 session.closeNow(); 1417 } 1418 } 1419 } 1420}