1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.session;
21
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.net.SocketAddress;
26 import java.nio.channels.FileChannel;
27 import java.util.Iterator;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.mina.core.buffer.IoBuffer;
35 import org.apache.mina.core.file.DefaultFileRegion;
36 import org.apache.mina.core.filterchain.IoFilterChain;
37 import org.apache.mina.core.future.CloseFuture;
38 import org.apache.mina.core.future.DefaultCloseFuture;
39 import org.apache.mina.core.future.DefaultReadFuture;
40 import org.apache.mina.core.future.DefaultWriteFuture;
41 import org.apache.mina.core.future.IoFutureListener;
42 import org.apache.mina.core.future.ReadFuture;
43 import org.apache.mina.core.future.WriteFuture;
44 import org.apache.mina.core.service.AbstractIoService;
45 import org.apache.mina.core.service.IoAcceptor;
46 import org.apache.mina.core.service.IoProcessor;
47 import org.apache.mina.core.service.IoService;
48 import org.apache.mina.core.service.TransportMetadata;
49 import org.apache.mina.core.write.DefaultWriteRequest;
50 import org.apache.mina.core.write.WriteException;
51 import org.apache.mina.core.write.WriteRequest;
52 import org.apache.mina.core.write.WriteRequestQueue;
53 import org.apache.mina.core.write.WriteTimeoutException;
54 import org.apache.mina.core.write.WriteToClosedSessionException;
55 import org.apache.mina.util.CircularQueue;
56 import org.apache.mina.util.ExceptionMonitor;
57
58
59
60
61
62
63
64
65 public abstract class AbstractIoSession implements IoSession {
66
67 private static final AttributeKey READY_READ_FUTURES_KEY =
68 new AttributeKey(AbstractIoSession.class, "readyReadFutures");
69
70 private static final AttributeKey WAITING_READ_FUTURES_KEY =
71 new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
72
73 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
74 new IoFutureListener<CloseFuture>() {
75 public void operationComplete(CloseFuture future) {
76 AbstractIoSession s = (AbstractIoSession) future.getSession();
77 s.scheduledWriteBytes.set(0);
78 s.scheduledWriteMessages.set(0);
79 s.readBytesThroughput = 0;
80 s.readMessagesThroughput = 0;
81 s.writtenBytesThroughput = 0;
82 s.writtenMessagesThroughput = 0;
83 }
84 };
85
86
87
88
89
90 private static final WriteRequest CLOSE_REQUEST =
91 new DefaultWriteRequest(new Object());
92
93 private final Object lock = new Object();
94
95 private IoSessionAttributeMap attributes;
96 private WriteRequestQueue writeRequestQueue;
97 private WriteRequest currentWriteRequest;
98
99
100 private final long creationTime;
101
102
103 private static AtomicLong idGenerator = new AtomicLong(0);
104
105
106 private long sessionId;
107
108
109
110
111 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
112
113 private volatile boolean closing;
114
115
116 private boolean readSuspended=false;
117 private boolean writeSuspended=false;
118
119
120 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
121 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
122 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
123
124 private long readBytes;
125 private long writtenBytes;
126 private long readMessages;
127 private long writtenMessages;
128 private long lastReadTime;
129 private long lastWriteTime;
130
131 private long lastThroughputCalculationTime;
132 private long lastReadBytes;
133 private long lastWrittenBytes;
134 private long lastReadMessages;
135 private long lastWrittenMessages;
136 private double readBytesThroughput;
137 private double writtenBytesThroughput;
138 private double readMessagesThroughput;
139 private double writtenMessagesThroughput;
140
141 private int idleCountForBoth;
142 private int idleCountForRead;
143 private int idleCountForWrite;
144
145 private long lastIdleTimeForBoth;
146 private long lastIdleTimeForRead;
147 private long lastIdleTimeForWrite;
148
149 private boolean deferDecreaseReadBuffer = true;
150
151
152
153
154 protected AbstractIoSession() {
155
156 long currentTime = System.currentTimeMillis();
157 creationTime = currentTime;
158 lastThroughputCalculationTime = currentTime;
159 lastReadTime = currentTime;
160 lastWriteTime = currentTime;
161 lastIdleTimeForBoth = currentTime;
162 lastIdleTimeForRead = currentTime;
163 lastIdleTimeForWrite = currentTime;
164
165
166 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
167
168
169 sessionId = idGenerator.incrementAndGet();
170 }
171
172
173
174
175
176
177
178 public final long getId() {
179 return sessionId;
180 }
181
182
183
184
185 public abstract IoProcessor getProcessor();
186
187
188
189
190 public final boolean isConnected() {
191 return !closeFuture.isClosed();
192 }
193
194
195
196
197 public final boolean isClosing() {
198 return closing || closeFuture.isClosed();
199 }
200
201
202
203
204 public final CloseFuture getCloseFuture() {
205 return closeFuture;
206 }
207
208
209
210
211 public final boolean isScheduledForFlush() {
212 return scheduledForFlush.get();
213 }
214
215
216
217
218 public final boolean setScheduledForFlush(boolean flag) {
219 if (flag) {
220 return scheduledForFlush.compareAndSet(false, true);
221 } else {
222 scheduledForFlush.set(false);
223 return true;
224 }
225 }
226
227
228
229
230 public final CloseFuture close(boolean rightNow) {
231 if (rightNow) {
232 return close();
233 } else {
234 return closeOnFlush();
235 }
236 }
237
238
239
240
241 public final CloseFuture close() {
242 synchronized (lock) {
243 if (isClosing()) {
244 return closeFuture;
245 } else {
246 closing = true;
247 }
248 }
249
250 getFilterChain().fireFilterClose();
251 return closeFuture;
252 }
253
254 private final CloseFuture closeOnFlush() {
255 getWriteRequestQueue().offer(this, CLOSE_REQUEST);
256 getProcessor().flush(this);
257 return closeFuture;
258 }
259
260
261
262
263 public final ReadFuture read() {
264 if (!getConfig().isUseReadOperation()) {
265 throw new IllegalStateException("useReadOperation is not enabled.");
266 }
267
268 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
269 ReadFuture future;
270 synchronized (readyReadFutures) {
271 future = readyReadFutures.poll();
272 if (future != null) {
273 if (future.isClosed()) {
274
275 readyReadFutures.offer(future);
276 }
277 } else {
278 future = new DefaultReadFuture(this);
279 getWaitingReadFutures().offer(future);
280 }
281 }
282
283 return future;
284 }
285
286
287
288
289 public final void offerReadFuture(Object message) {
290 newReadFuture().setRead(message);
291 }
292
293
294
295
296 public final void offerFailedReadFuture(Throwable exception) {
297 newReadFuture().setException(exception);
298 }
299
300
301
302
303 public final void offerClosedReadFuture() {
304 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
305 synchronized (readyReadFutures) {
306 newReadFuture().setClosed();
307 }
308 }
309
310
311
312
313 private ReadFuture newReadFuture() {
314 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
315 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
316 ReadFuture future;
317 synchronized (readyReadFutures) {
318 future = waitingReadFutures.poll();
319 if (future == null) {
320 future = new DefaultReadFuture(this);
321 readyReadFutures.offer(future);
322 }
323 }
324 return future;
325 }
326
327
328
329
330 private Queue<ReadFuture> getReadyReadFutures() {
331 Queue<ReadFuture> readyReadFutures =
332 (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
333 if (readyReadFutures == null) {
334 readyReadFutures = new CircularQueue<ReadFuture>();
335
336 Queue<ReadFuture> oldReadyReadFutures =
337 (Queue<ReadFuture>) setAttributeIfAbsent(
338 READY_READ_FUTURES_KEY, readyReadFutures);
339 if (oldReadyReadFutures != null) {
340 readyReadFutures = oldReadyReadFutures;
341 }
342 }
343 return readyReadFutures;
344 }
345
346
347
348
349 private Queue<ReadFuture> getWaitingReadFutures() {
350 Queue<ReadFuture> waitingReadyReadFutures =
351 (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
352 if (waitingReadyReadFutures == null) {
353 waitingReadyReadFutures = new CircularQueue<ReadFuture>();
354
355 Queue<ReadFuture> oldWaitingReadyReadFutures =
356 (Queue<ReadFuture>) setAttributeIfAbsent(
357 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
358 if (oldWaitingReadyReadFutures != null) {
359 waitingReadyReadFutures = oldWaitingReadyReadFutures;
360 }
361 }
362 return waitingReadyReadFutures;
363 }
364
365
366
367
368 public WriteFuture write(Object message) {
369 return write(message, null);
370 }
371
372
373
374
375 public WriteFuture write(Object message, SocketAddress remoteAddress) {
376 if (message == null) {
377 throw new NullPointerException("message");
378 }
379
380
381
382 if (!getTransportMetadata().isConnectionless() &&
383 remoteAddress != null) {
384 throw new UnsupportedOperationException();
385 }
386
387
388
389
390
391 if (isClosing() || !isConnected()) {
392 WriteFuture future = new DefaultWriteFuture(this);
393 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
394 WriteException writeException = new WriteToClosedSessionException(request);
395 future.setException(writeException);
396 return future;
397 }
398
399 FileChannel openedFileChannel = null;
400
401
402
403 try {
404 if (message instanceof IoBuffer
405 && !((IoBuffer) message).hasRemaining()) {
406
407 throw new IllegalArgumentException(
408 "message is empty. Forgot to call flip()?");
409 } else if (message instanceof FileChannel) {
410 FileChannel fileChannel = (FileChannel) message;
411 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
412 } else if (message instanceof File) {
413 File file = (File) message;
414 openedFileChannel = new FileInputStream(file).getChannel();
415 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
416 }
417 } catch (IOException e) {
418 ExceptionMonitor.getInstance().exceptionCaught(e);
419 return DefaultWriteFuture.newNotWrittenFuture(this, e);
420 }
421
422
423 WriteFuture writeFuture = new DefaultWriteFuture(this);
424 WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
425
426
427 IoFilterChain filterChain = getFilterChain();
428 filterChain.fireFilterWrite(writeRequest);
429
430
431
432 if (openedFileChannel != null) {
433
434 final FileChannel finalChannel = openedFileChannel;
435 writeFuture.addListener(new IoFutureListener<WriteFuture>() {
436 public void operationComplete(WriteFuture future) {
437 try {
438 finalChannel.close();
439 } catch (IOException e) {
440 ExceptionMonitor.getInstance().exceptionCaught(e);
441 }
442 }
443 });
444 }
445
446
447 return writeFuture;
448 }
449
450
451
452
453 public final Object getAttachment() {
454 return getAttribute("");
455 }
456
457
458
459
460 public final Object setAttachment(Object attachment) {
461 return setAttribute("", attachment);
462 }
463
464
465
466
467 public final Object getAttribute(Object key) {
468 return getAttribute(key, null);
469 }
470
471
472
473
474 public final Object getAttribute(Object key, Object defaultValue) {
475 return attributes.getAttribute(this, key, defaultValue);
476 }
477
478
479
480
481 public final Object setAttribute(Object key, Object value) {
482 return attributes.setAttribute(this, key, value);
483 }
484
485
486
487
488 public final Object setAttribute(Object key) {
489 return setAttribute(key, Boolean.TRUE);
490 }
491
492
493
494
495 public final Object setAttributeIfAbsent(Object key, Object value) {
496 return attributes.setAttributeIfAbsent(this, key, value);
497 }
498
499
500
501
502 public final Object setAttributeIfAbsent(Object key) {
503 return setAttributeIfAbsent(key, Boolean.TRUE);
504 }
505
506
507
508
509 public final Object removeAttribute(Object key) {
510 return attributes.removeAttribute(this, key);
511 }
512
513
514
515
516 public final boolean removeAttribute(Object key, Object value) {
517 return attributes.removeAttribute(this, key, value);
518 }
519
520
521
522
523 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
524 return attributes.replaceAttribute(this, key, oldValue, newValue);
525 }
526
527
528
529
530 public final boolean containsAttribute(Object key) {
531 return attributes.containsAttribute(this, key);
532 }
533
534
535
536
537 public final Set<Object> getAttributeKeys() {
538 return attributes.getAttributeKeys(this);
539 }
540
541
542
543
544 public final IoSessionAttributeMap getAttributeMap() {
545 return attributes;
546 }
547
548
549
550
551 public final void setAttributeMap(IoSessionAttributeMap attributes) {
552 this.attributes = attributes;
553 }
554
555
556
557
558
559
560 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
561 this.writeRequestQueue =
562 new CloseAwareWriteQueue(writeRequestQueue);
563 }
564
565
566
567
568
569 public final void suspendRead() {
570 readSuspended = true;
571 if (isClosing() || !isConnected()) {
572 return;
573 }
574 getProcessor().updateTrafficControl(this);
575 }
576
577
578
579
580 public final void suspendWrite() {
581 writeSuspended = true;
582 if (isClosing() || !isConnected()) {
583 return;
584 }
585 getProcessor().updateTrafficControl(this);
586 }
587
588
589
590
591 @SuppressWarnings("unchecked")
592 public final void resumeRead() {
593 readSuspended = false;
594 if (isClosing() || !isConnected()) {
595 return;
596 }
597 getProcessor().updateTrafficControl(this);
598 }
599
600
601
602
603 @SuppressWarnings("unchecked")
604 public final void resumeWrite() {
605 writeSuspended = false;
606 if (isClosing() || !isConnected()) {
607 return;
608 }
609 getProcessor().updateTrafficControl(this);
610 }
611
612
613
614
615 public boolean isReadSuspended() {
616 return readSuspended;
617 }
618
619
620
621
622 public boolean isWriteSuspended() {
623 return writeSuspended;
624 }
625
626
627
628
629 public final long getReadBytes() {
630 return readBytes;
631 }
632
633
634
635
636 public final long getWrittenBytes() {
637 return writtenBytes;
638 }
639
640
641
642
643 public final long getReadMessages() {
644 return readMessages;
645 }
646
647
648
649
650 public final long getWrittenMessages() {
651 return writtenMessages;
652 }
653
654
655
656
657 public final double getReadBytesThroughput() {
658 return readBytesThroughput;
659 }
660
661
662
663
664 public final double getWrittenBytesThroughput() {
665 return writtenBytesThroughput;
666 }
667
668
669
670
671 public final double getReadMessagesThroughput() {
672 return readMessagesThroughput;
673 }
674
675
676
677
678 public final double getWrittenMessagesThroughput() {
679 return writtenMessagesThroughput;
680 }
681
682
683
684
685 public final void updateThroughput(long currentTime, boolean force) {
686 int interval = (int) (currentTime - lastThroughputCalculationTime);
687
688 long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
689 if (minInterval == 0 || interval < minInterval) {
690 if (!force) {
691 return;
692 }
693 }
694
695 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
696 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
697 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
698 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
699
700 lastReadBytes = readBytes;
701 lastWrittenBytes = writtenBytes;
702 lastReadMessages = readMessages;
703 lastWrittenMessages = writtenMessages;
704
705 lastThroughputCalculationTime = currentTime;
706 }
707
708
709
710
711 public final long getScheduledWriteBytes() {
712 return scheduledWriteBytes.get();
713 }
714
715
716
717
718 public final int getScheduledWriteMessages() {
719 return scheduledWriteMessages.get();
720 }
721
722
723
724
725 protected void setScheduledWriteBytes(int byteCount){
726 scheduledWriteBytes.set(byteCount);
727 }
728
729
730
731
732 protected void setScheduledWriteMessages(int messages) {
733 scheduledWriteMessages.set(messages);
734 }
735
736
737
738
739 public final void increaseReadBytes(long increment, long currentTime) {
740 if (increment <= 0) {
741 return;
742 }
743
744 readBytes += increment;
745 lastReadTime = currentTime;
746 idleCountForBoth = 0;
747 idleCountForRead = 0;
748
749 if (getService() instanceof AbstractIoService) {
750 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
751 }
752 }
753
754
755
756
757 public final void increaseReadMessages(long currentTime) {
758 readMessages++;
759 lastReadTime = currentTime;
760 idleCountForBoth = 0;
761 idleCountForRead = 0;
762
763 if (getService() instanceof AbstractIoService) {
764 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
765 }
766 }
767
768
769
770
771 public final void increaseWrittenBytes(int increment, long currentTime) {
772 if (increment <= 0) {
773 return;
774 }
775
776 writtenBytes += increment;
777 lastWriteTime = currentTime;
778 idleCountForBoth = 0;
779 idleCountForWrite = 0;
780
781 if (getService() instanceof AbstractIoService) {
782 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
783 }
784
785 increaseScheduledWriteBytes(-increment);
786 }
787
788
789
790
791 public final void increaseWrittenMessages(
792 WriteRequest request, long currentTime) {
793 Object message = request.getMessage();
794 if (message instanceof IoBuffer) {
795 IoBuffer b = (IoBuffer) message;
796 if (b.hasRemaining()) {
797 return;
798 }
799 }
800
801 writtenMessages++;
802 lastWriteTime = currentTime;
803 if (getService() instanceof AbstractIoService) {
804 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
805 }
806
807 decreaseScheduledWriteMessages();
808 }
809
810
811
812
813 public final void increaseScheduledWriteBytes(int increment) {
814 scheduledWriteBytes.addAndGet(increment);
815 if (getService() instanceof AbstractIoService) {
816 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
817 }
818 }
819
820
821
822
823 public final void increaseScheduledWriteMessages() {
824 scheduledWriteMessages.incrementAndGet();
825 if (getService() instanceof AbstractIoService) {
826 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
827 }
828 }
829
830
831
832
833 private void decreaseScheduledWriteMessages() {
834 scheduledWriteMessages.decrementAndGet();
835 if (getService() instanceof AbstractIoService) {
836 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
837 }
838 }
839
840
841
842
843 public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
844 Object message = request.getMessage();
845 if (message instanceof IoBuffer) {
846 IoBuffer b = (IoBuffer) message;
847 if (b.hasRemaining()) {
848 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
849 } else {
850 decreaseScheduledWriteMessages();
851 }
852 } else {
853 decreaseScheduledWriteMessages();
854 }
855 }
856
857
858
859
860 public final WriteRequestQueue getWriteRequestQueue() {
861 if (writeRequestQueue == null) {
862 throw new IllegalStateException();
863 }
864 return writeRequestQueue;
865 }
866
867
868
869
870 public final WriteRequest getCurrentWriteRequest() {
871 return currentWriteRequest;
872 }
873
874
875
876
877 public final Object getCurrentWriteMessage() {
878 WriteRequest req = getCurrentWriteRequest();
879 if (req == null) {
880 return null;
881 }
882 return req.getMessage();
883 }
884
885
886
887
888 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
889 this.currentWriteRequest = currentWriteRequest;
890 }
891
892
893
894
895 public final void increaseReadBufferSize() {
896 int newReadBufferSize = getConfig().getReadBufferSize() << 1;
897 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
898 getConfig().setReadBufferSize(newReadBufferSize);
899 } else {
900 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
901 }
902
903 deferDecreaseReadBuffer = true;
904 }
905
906
907
908
909 public final void decreaseReadBufferSize() {
910 if (deferDecreaseReadBuffer) {
911 deferDecreaseReadBuffer = false;
912 return;
913 }
914
915 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
916 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
917 }
918
919 deferDecreaseReadBuffer = true;
920 }
921
922
923
924
925 public final long getCreationTime() {
926 return creationTime;
927 }
928
929
930
931
932 public final long getLastIoTime() {
933 return Math.max(lastReadTime, lastWriteTime);
934 }
935
936
937
938
939 public final long getLastReadTime() {
940 return lastReadTime;
941 }
942
943
944
945
946 public final long getLastWriteTime() {
947 return lastWriteTime;
948 }
949
950
951
952
953 public final boolean isIdle(IdleStatus status) {
954 if (status == IdleStatus.BOTH_IDLE) {
955 return idleCountForBoth > 0;
956 }
957
958 if (status == IdleStatus.READER_IDLE) {
959 return idleCountForRead > 0;
960 }
961
962 if (status == IdleStatus.WRITER_IDLE) {
963 return idleCountForWrite > 0;
964 }
965
966 throw new IllegalArgumentException("Unknown idle status: " + status);
967 }
968
969
970
971
972 public final boolean isBothIdle() {
973 return isIdle(IdleStatus.BOTH_IDLE);
974 }
975
976
977
978
979 public final boolean isReaderIdle() {
980 return isIdle(IdleStatus.READER_IDLE);
981 }
982
983
984
985
986 public final boolean isWriterIdle() {
987 return isIdle(IdleStatus.WRITER_IDLE);
988 }
989
990
991
992
993 public final int getIdleCount(IdleStatus status) {
994 if (getConfig().getIdleTime(status) == 0) {
995 if (status == IdleStatus.BOTH_IDLE) {
996 idleCountForBoth = 0;
997 }
998
999 if (status == IdleStatus.READER_IDLE) {
1000 idleCountForRead = 0;
1001 }
1002
1003 if (status == IdleStatus.WRITER_IDLE) {
1004 idleCountForWrite = 0;
1005 }
1006 }
1007
1008 if (status == IdleStatus.BOTH_IDLE) {
1009 return idleCountForBoth;
1010 }
1011
1012 if (status == IdleStatus.READER_IDLE) {
1013 return idleCountForRead;
1014 }
1015
1016 if (status == IdleStatus.WRITER_IDLE) {
1017 return idleCountForWrite;
1018 }
1019
1020 throw new IllegalArgumentException("Unknown idle status: " + status);
1021 }
1022
1023
1024
1025
1026 public final long getLastIdleTime(IdleStatus status) {
1027 if (status == IdleStatus.BOTH_IDLE) {
1028 return lastIdleTimeForBoth;
1029 }
1030
1031 if (status == IdleStatus.READER_IDLE) {
1032 return lastIdleTimeForRead;
1033 }
1034
1035 if (status == IdleStatus.WRITER_IDLE) {
1036 return lastIdleTimeForWrite;
1037 }
1038
1039 throw new IllegalArgumentException("Unknown idle status: " + status);
1040 }
1041
1042
1043
1044
1045 public final void increaseIdleCount(IdleStatus status, long currentTime) {
1046 if (status == IdleStatus.BOTH_IDLE) {
1047 idleCountForBoth++;
1048 lastIdleTimeForBoth = currentTime;
1049 } else if (status == IdleStatus.READER_IDLE) {
1050 idleCountForRead++;
1051 lastIdleTimeForRead = currentTime;
1052 } else if (status == IdleStatus.WRITER_IDLE) {
1053 idleCountForWrite++;
1054 lastIdleTimeForWrite = currentTime;
1055 } else {
1056 throw new IllegalArgumentException("Unknown idle status: " + status);
1057 }
1058 }
1059
1060
1061
1062
1063 public final int getBothIdleCount() {
1064 return getIdleCount(IdleStatus.BOTH_IDLE);
1065 }
1066
1067
1068
1069
1070 public final long getLastBothIdleTime() {
1071 return getLastIdleTime(IdleStatus.BOTH_IDLE);
1072 }
1073
1074
1075
1076
1077 public final long getLastReaderIdleTime() {
1078 return getLastIdleTime(IdleStatus.READER_IDLE);
1079 }
1080
1081
1082
1083
1084 public final long getLastWriterIdleTime() {
1085 return getLastIdleTime(IdleStatus.WRITER_IDLE);
1086 }
1087
1088
1089
1090
1091 public final int getReaderIdleCount() {
1092 return getIdleCount(IdleStatus.READER_IDLE);
1093 }
1094
1095
1096
1097
1098 public final int getWriterIdleCount() {
1099 return getIdleCount(IdleStatus.WRITER_IDLE);
1100 }
1101
1102
1103
1104
1105 public SocketAddress getServiceAddress() {
1106 IoService service = getService();
1107 if (service instanceof IoAcceptor) {
1108 return ((IoAcceptor) service).getLocalAddress();
1109 } else {
1110 return getRemoteAddress();
1111 }
1112 }
1113
1114
1115
1116
1117 @Override
1118 public final int hashCode() {
1119 return super.hashCode();
1120 }
1121
1122
1123
1124
1125
1126 @Override
1127 public final boolean equals(Object o) {
1128 return super.equals(o);
1129 }
1130
1131
1132
1133
1134 @Override
1135 public String toString() {
1136 if (isConnected()||isClosing()) {
1137 if (getService() instanceof IoAcceptor) {
1138 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1139 getRemoteAddress() + " => " + getLocalAddress() + ')';
1140 } else {
1141 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1142 getLocalAddress() + " => " + getRemoteAddress() + ')';
1143 }
1144 } else {
1145 return "Session disconnected ...";
1146 }
1147
1148 }
1149
1150
1151
1152
1153 private String getIdAsString() {
1154 String id = Long.toHexString(getId()).toUpperCase();
1155
1156
1157
1158 while (id.length() < 8) {
1159 id = '0' + id;
1160 }
1161 id = "0x" + id;
1162
1163 return id;
1164 }
1165
1166
1167
1168
1169 private String getServiceName() {
1170 TransportMetadata tm = getTransportMetadata();
1171 if (tm == null) {
1172 return "null";
1173 } else {
1174 return tm.getProviderName() + ' ' + tm.getName();
1175 }
1176 }
1177
1178
1179
1180
1181
1182
1183
1184 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1185 IoSession s = null;
1186 while (sessions.hasNext()) {
1187 s = sessions.next();
1188 notifyIdleSession(s, currentTime);
1189 }
1190 }
1191
1192
1193
1194
1195
1196
1197
1198 public static void notifyIdleSession(IoSession session, long currentTime) {
1199 notifyIdleSession0(
1200 session, currentTime,
1201 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1202 IdleStatus.BOTH_IDLE, Math.max(
1203 session.getLastIoTime(),
1204 session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1205
1206 notifyIdleSession0(
1207 session, currentTime,
1208 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1209 IdleStatus.READER_IDLE, Math.max(
1210 session.getLastReadTime(),
1211 session.getLastIdleTime(IdleStatus.READER_IDLE)));
1212
1213 notifyIdleSession0(
1214 session, currentTime,
1215 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1216 IdleStatus.WRITER_IDLE, Math.max(
1217 session.getLastWriteTime(),
1218 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1219
1220 notifyWriteTimeout(session, currentTime);
1221 }
1222
1223 private static void notifyIdleSession0(
1224 IoSession session, long currentTime,
1225 long idleTime, IdleStatus status, long lastIoTime) {
1226 if (idleTime > 0 && lastIoTime != 0
1227 && currentTime - lastIoTime >= idleTime) {
1228 session.getFilterChain().fireSessionIdle(status);
1229 }
1230 }
1231
1232 private static void notifyWriteTimeout(
1233 IoSession session, long currentTime) {
1234
1235 long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1236 if (writeTimeout > 0 &&
1237 currentTime - session.getLastWriteTime() >= writeTimeout &&
1238 !session.getWriteRequestQueue().isEmpty(session)) {
1239 WriteRequest request = session.getCurrentWriteRequest();
1240 if (request != null) {
1241 session.setCurrentWriteRequest(null);
1242 WriteTimeoutException cause = new WriteTimeoutException(request);
1243 request.getFuture().setException(cause);
1244 session.getFilterChain().fireExceptionCaught(cause);
1245
1246 session.close(true);
1247 }
1248 }
1249 }
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259 private class CloseAwareWriteQueue implements WriteRequestQueue {
1260
1261 private final WriteRequestQueue q;
1262
1263
1264
1265
1266 public CloseAwareWriteQueue(WriteRequestQueue q) {
1267 this.q = q;
1268 }
1269
1270
1271
1272
1273 public synchronized WriteRequest poll(IoSession session) {
1274 WriteRequest answer = q.poll(session);
1275
1276 if (answer == CLOSE_REQUEST) {
1277 AbstractIoSession.this.close();
1278 dispose(session);
1279 answer = null;
1280 }
1281
1282 return answer;
1283 }
1284
1285
1286
1287
1288 public void offer(IoSession session, WriteRequest e) {
1289 q.offer(session, e);
1290 }
1291
1292
1293
1294
1295 public boolean isEmpty(IoSession session) {
1296 return q.isEmpty(session);
1297 }
1298
1299
1300
1301
1302 public void clear(IoSession session) {
1303 q.clear(session);
1304 }
1305
1306
1307
1308
1309 public void dispose(IoSession session) {
1310 q.dispose(session);
1311 }
1312 }
1313 }