1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.session;
21
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.net.SocketAddress;
26 import java.nio.channels.FileChannel;
27 import java.util.Iterator;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
34
35 import org.apache.mina.core.buffer.IoBuffer;
36 import org.apache.mina.core.file.DefaultFileRegion;
37 import org.apache.mina.core.file.FilenameFileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.future.CloseFuture;
40 import org.apache.mina.core.future.DefaultCloseFuture;
41 import org.apache.mina.core.future.DefaultReadFuture;
42 import org.apache.mina.core.future.DefaultWriteFuture;
43 import org.apache.mina.core.future.IoFutureListener;
44 import org.apache.mina.core.future.ReadFuture;
45 import org.apache.mina.core.future.WriteFuture;
46 import org.apache.mina.core.service.AbstractIoService;
47 import org.apache.mina.core.service.IoAcceptor;
48 import org.apache.mina.core.service.IoProcessor;
49 import org.apache.mina.core.service.IoService;
50 import org.apache.mina.core.service.TransportMetadata;
51 import org.apache.mina.core.write.DefaultWriteRequest;
52 import org.apache.mina.core.write.WriteException;
53 import org.apache.mina.core.write.WriteRequest;
54 import org.apache.mina.core.write.WriteRequestQueue;
55 import org.apache.mina.core.write.WriteTimeoutException;
56 import org.apache.mina.core.write.WriteToClosedSessionException;
57 import org.apache.mina.util.ExceptionMonitor;
58
59
60
61
62
63
64
65 public abstract class AbstractIoSession implements IoSession {
66
67 private static final AttributeKey READY_READ_FUTURES_KEY =
68 new AttributeKey(AbstractIoSession.class, "readyReadFutures");
69
70 private static final AttributeKey WAITING_READ_FUTURES_KEY =
71 new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
72
73 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
74 new IoFutureListener<CloseFuture>() {
75 public void operationComplete(CloseFuture future) {
76 AbstractIoSession session = (AbstractIoSession) future.getSession();
77 session.scheduledWriteBytes.set(0);
78 session.scheduledWriteMessages.set(0);
79 session.readBytesThroughput = 0;
80 session.readMessagesThroughput = 0;
81 session.writtenBytesThroughput = 0;
82 session.writtenMessagesThroughput = 0;
83 }
84 };
85
86
87
88
89
90 private static final WriteRequest CLOSE_REQUEST =
91 new DefaultWriteRequest(new Object());
92
93 private final Object lock = new Object();
94
95 private IoSessionAttributeMap attributes;
96 private WriteRequestQueue writeRequestQueue;
97 private WriteRequest currentWriteRequest;
98
99
100 private final long creationTime;
101
102
103 private static AtomicLong idGenerator = new AtomicLong(0);
104
105
106 private long sessionId;
107
108
109
110
111 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
112
113 private volatile boolean closing;
114
115
116 private boolean readSuspended=false;
117 private boolean writeSuspended=false;
118
119
120 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
121 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
122 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
123
124 private long readBytes;
125 private long writtenBytes;
126 private long readMessages;
127 private long writtenMessages;
128 private long lastReadTime;
129 private long lastWriteTime;
130
131 private long lastThroughputCalculationTime;
132 private long lastReadBytes;
133 private long lastWrittenBytes;
134 private long lastReadMessages;
135 private long lastWrittenMessages;
136 private double readBytesThroughput;
137 private double writtenBytesThroughput;
138 private double readMessagesThroughput;
139 private double writtenMessagesThroughput;
140
141 private AtomicInteger idleCountForBoth = new AtomicInteger();
142 private AtomicInteger idleCountForRead = new AtomicInteger();
143 private AtomicInteger idleCountForWrite = new AtomicInteger();
144
145 private long lastIdleTimeForBoth;
146 private long lastIdleTimeForRead;
147 private long lastIdleTimeForWrite;
148
149 private boolean deferDecreaseReadBuffer = true;
150
151
152
153
154 protected AbstractIoSession() {
155
156 long currentTime = System.currentTimeMillis();
157 creationTime = currentTime;
158 lastThroughputCalculationTime = currentTime;
159 lastReadTime = currentTime;
160 lastWriteTime = currentTime;
161 lastIdleTimeForBoth = currentTime;
162 lastIdleTimeForRead = currentTime;
163 lastIdleTimeForWrite = currentTime;
164
165
166 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
167
168
169 sessionId = idGenerator.incrementAndGet();
170 }
171
172
173
174
175
176
177
178 public final long getId() {
179 return sessionId;
180 }
181
182
183
184
185 public abstract IoProcessor getProcessor();
186
187
188
189
190 public final boolean isConnected() {
191 return !closeFuture.isClosed();
192 }
193
194
195
196
197 public final boolean isClosing() {
198 return closing || closeFuture.isClosed();
199 }
200
201
202
203
204 public final CloseFuture getCloseFuture() {
205 return closeFuture;
206 }
207
208
209
210
211
212 public final boolean isScheduledForFlush() {
213 return scheduledForFlush.get();
214 }
215
216
217
218
219 public final void scheduledForFlush() {
220 scheduledForFlush.set(true);
221 }
222
223
224
225
226 public final void unscheduledForFlush() {
227 scheduledForFlush.set(false);
228 }
229
230
231
232
233
234
235
236
237 public final boolean setScheduledForFlush(boolean schedule) {
238 if (schedule) {
239
240
241
242 return scheduledForFlush.compareAndSet(false, schedule);
243 }
244
245 scheduledForFlush.set(schedule);
246 return true;
247 }
248
249
250
251
252 public final CloseFuture close(boolean rightNow) {
253 if (rightNow) {
254 return close();
255 }
256
257 return closeOnFlush();
258 }
259
260
261
262
263 public final CloseFuture close() {
264 synchronized (lock) {
265 if (isClosing()) {
266 return closeFuture;
267 }
268
269 closing = true;
270 }
271
272 getFilterChain().fireFilterClose();
273 return closeFuture;
274 }
275
276 private final CloseFuture closeOnFlush() {
277 getWriteRequestQueue().offer(this, CLOSE_REQUEST);
278 getProcessor().flush(this);
279 return closeFuture;
280 }
281
282
283
284
285 public final ReadFuture read() {
286 if (!getConfig().isUseReadOperation()) {
287 throw new IllegalStateException("useReadOperation is not enabled.");
288 }
289
290 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
291 ReadFuture future;
292 synchronized (readyReadFutures) {
293 future = readyReadFutures.poll();
294 if (future != null) {
295 if (future.isClosed()) {
296
297 readyReadFutures.offer(future);
298 }
299 } else {
300 future = new DefaultReadFuture(this);
301 getWaitingReadFutures().offer(future);
302 }
303 }
304
305 return future;
306 }
307
308
309
310
311 public final void offerReadFuture(Object message) {
312 newReadFuture().setRead(message);
313 }
314
315
316
317
318 public final void offerFailedReadFuture(Throwable exception) {
319 newReadFuture().setException(exception);
320 }
321
322
323
324
325 public final void offerClosedReadFuture() {
326 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
327 synchronized (readyReadFutures) {
328 newReadFuture().setClosed();
329 }
330 }
331
332
333
334
335 private ReadFuture newReadFuture() {
336 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
337 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
338 ReadFuture future;
339 synchronized (readyReadFutures) {
340 future = waitingReadFutures.poll();
341 if (future == null) {
342 future = new DefaultReadFuture(this);
343 readyReadFutures.offer(future);
344 }
345 }
346 return future;
347 }
348
349
350
351
352 private Queue<ReadFuture> getReadyReadFutures() {
353 Queue<ReadFuture> readyReadFutures =
354 (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
355 if (readyReadFutures == null) {
356 readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
357
358 Queue<ReadFuture> oldReadyReadFutures =
359 (Queue<ReadFuture>) setAttributeIfAbsent(
360 READY_READ_FUTURES_KEY, readyReadFutures);
361 if (oldReadyReadFutures != null) {
362 readyReadFutures = oldReadyReadFutures;
363 }
364 }
365 return readyReadFutures;
366 }
367
368
369
370
371 private Queue<ReadFuture> getWaitingReadFutures() {
372 Queue<ReadFuture> waitingReadyReadFutures =
373 (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
374 if (waitingReadyReadFutures == null) {
375 waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
376
377 Queue<ReadFuture> oldWaitingReadyReadFutures =
378 (Queue<ReadFuture>) setAttributeIfAbsent(
379 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
380 if (oldWaitingReadyReadFutures != null) {
381 waitingReadyReadFutures = oldWaitingReadyReadFutures;
382 }
383 }
384 return waitingReadyReadFutures;
385 }
386
387
388
389
390 public WriteFuture write(Object message) {
391 return write(message, null);
392 }
393
394
395
396
397 public WriteFuture write(Object message, SocketAddress remoteAddress) {
398 if (message == null) {
399 throw new IllegalArgumentException("message");
400 }
401
402
403
404 if (!getTransportMetadata().isConnectionless() &&
405 remoteAddress != null) {
406 throw new UnsupportedOperationException();
407 }
408
409
410
411
412
413 if (isClosing() || !isConnected()) {
414 WriteFuture future = new DefaultWriteFuture(this);
415 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
416 WriteException writeException = new WriteToClosedSessionException(request);
417 future.setException(writeException);
418 return future;
419 }
420
421 FileChannel openedFileChannel = null;
422
423
424
425 try {
426 if (message instanceof IoBuffer
427 && !((IoBuffer) message).hasRemaining()) {
428
429 throw new IllegalArgumentException(
430 "message is empty. Forgot to call flip()?");
431 } else if (message instanceof FileChannel) {
432 FileChannel fileChannel = (FileChannel) message;
433 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
434 } else if (message instanceof File) {
435 File file = (File) message;
436 openedFileChannel = new FileInputStream(file).getChannel();
437 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
438 }
439 } catch (IOException e) {
440 ExceptionMonitor.getInstance().exceptionCaught(e);
441 return DefaultWriteFuture.newNotWrittenFuture(this, e);
442 }
443
444
445 WriteFuture writeFuture = new DefaultWriteFuture(this);
446 WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
447
448
449 IoFilterChain filterChain = getFilterChain();
450 filterChain.fireFilterWrite(writeRequest);
451
452
453
454 if (openedFileChannel != null) {
455
456 final FileChannel finalChannel = openedFileChannel;
457 writeFuture.addListener(new IoFutureListener<WriteFuture>() {
458 public void operationComplete(WriteFuture future) {
459 try {
460 finalChannel.close();
461 } catch (IOException e) {
462 ExceptionMonitor.getInstance().exceptionCaught(e);
463 }
464 }
465 });
466 }
467
468
469 return writeFuture;
470 }
471
472
473
474
475 public final Object getAttachment() {
476 return getAttribute("");
477 }
478
479
480
481
482 public final Object setAttachment(Object attachment) {
483 return setAttribute("", attachment);
484 }
485
486
487
488
489 public final Object getAttribute(Object key) {
490 return getAttribute(key, null);
491 }
492
493
494
495
496 public final Object getAttribute(Object key, Object defaultValue) {
497 return attributes.getAttribute(this, key, defaultValue);
498 }
499
500
501
502
503 public final Object setAttribute(Object key, Object value) {
504 return attributes.setAttribute(this, key, value);
505 }
506
507
508
509
510 public final Object setAttribute(Object key) {
511 return setAttribute(key, Boolean.TRUE);
512 }
513
514
515
516
517 public final Object setAttributeIfAbsent(Object key, Object value) {
518 return attributes.setAttributeIfAbsent(this, key, value);
519 }
520
521
522
523
524 public final Object setAttributeIfAbsent(Object key) {
525 return setAttributeIfAbsent(key, Boolean.TRUE);
526 }
527
528
529
530
531 public final Object removeAttribute(Object key) {
532 return attributes.removeAttribute(this, key);
533 }
534
535
536
537
538 public final boolean removeAttribute(Object key, Object value) {
539 return attributes.removeAttribute(this, key, value);
540 }
541
542
543
544
545 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
546 return attributes.replaceAttribute(this, key, oldValue, newValue);
547 }
548
549
550
551
552 public final boolean containsAttribute(Object key) {
553 return attributes.containsAttribute(this, key);
554 }
555
556
557
558
559 public final Set<Object> getAttributeKeys() {
560 return attributes.getAttributeKeys(this);
561 }
562
563
564
565
566 public final IoSessionAttributeMap getAttributeMap() {
567 return attributes;
568 }
569
570
571
572
573 public final void setAttributeMap(IoSessionAttributeMap attributes) {
574 this.attributes = attributes;
575 }
576
577
578
579
580
581
582 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
583 this.writeRequestQueue =
584 new CloseAwareWriteQueue(writeRequestQueue);
585 }
586
587
588
589
590
591 public final void suspendRead() {
592 readSuspended = true;
593 if (isClosing() || !isConnected()) {
594 return;
595 }
596 getProcessor().updateTrafficControl(this);
597 }
598
599
600
601
602 public final void suspendWrite() {
603 writeSuspended = true;
604 if (isClosing() || !isConnected()) {
605 return;
606 }
607 getProcessor().updateTrafficControl(this);
608 }
609
610
611
612
613 @SuppressWarnings("unchecked")
614 public final void resumeRead() {
615 readSuspended = false;
616 if (isClosing() || !isConnected()) {
617 return;
618 }
619 getProcessor().updateTrafficControl(this);
620 }
621
622
623
624
625 @SuppressWarnings("unchecked")
626 public final void resumeWrite() {
627 writeSuspended = false;
628 if (isClosing() || !isConnected()) {
629 return;
630 }
631 getProcessor().updateTrafficControl(this);
632 }
633
634
635
636
637 public boolean isReadSuspended() {
638 return readSuspended;
639 }
640
641
642
643
644 public boolean isWriteSuspended() {
645 return writeSuspended;
646 }
647
648
649
650
651 public final long getReadBytes() {
652 return readBytes;
653 }
654
655
656
657
658 public final long getWrittenBytes() {
659 return writtenBytes;
660 }
661
662
663
664
665 public final long getReadMessages() {
666 return readMessages;
667 }
668
669
670
671
672 public final long getWrittenMessages() {
673 return writtenMessages;
674 }
675
676
677
678
679 public final double getReadBytesThroughput() {
680 return readBytesThroughput;
681 }
682
683
684
685
686 public final double getWrittenBytesThroughput() {
687 return writtenBytesThroughput;
688 }
689
690
691
692
693 public final double getReadMessagesThroughput() {
694 return readMessagesThroughput;
695 }
696
697
698
699
700 public final double getWrittenMessagesThroughput() {
701 return writtenMessagesThroughput;
702 }
703
704
705
706
707 public final void updateThroughput(long currentTime, boolean force) {
708 int interval = (int) (currentTime - lastThroughputCalculationTime);
709
710 long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
711 if (minInterval == 0 || interval < minInterval) {
712 if (!force) {
713 return;
714 }
715 }
716
717 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
718 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
719 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
720 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
721
722 lastReadBytes = readBytes;
723 lastWrittenBytes = writtenBytes;
724 lastReadMessages = readMessages;
725 lastWrittenMessages = writtenMessages;
726
727 lastThroughputCalculationTime = currentTime;
728 }
729
730
731
732
733 public final long getScheduledWriteBytes() {
734 return scheduledWriteBytes.get();
735 }
736
737
738
739
740 public final int getScheduledWriteMessages() {
741 return scheduledWriteMessages.get();
742 }
743
744
745
746
747 protected void setScheduledWriteBytes(int byteCount){
748 scheduledWriteBytes.set(byteCount);
749 }
750
751
752
753
754 protected void setScheduledWriteMessages(int messages) {
755 scheduledWriteMessages.set(messages);
756 }
757
758
759
760
761 public final void increaseReadBytes(long increment, long currentTime) {
762 if (increment <= 0) {
763 return;
764 }
765
766 readBytes += increment;
767 lastReadTime = currentTime;
768 idleCountForBoth.set(0);
769 idleCountForRead.set(0);
770
771 if (getService() instanceof AbstractIoService) {
772 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
773 }
774 }
775
776
777
778
779 public final void increaseReadMessages(long currentTime) {
780 readMessages++;
781 lastReadTime = currentTime;
782 idleCountForBoth.set(0);
783 idleCountForRead.set(0);
784
785 if (getService() instanceof AbstractIoService) {
786 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
787 }
788 }
789
790
791
792
793 public final void increaseWrittenBytes(int increment, long currentTime) {
794 if (increment <= 0) {
795 return;
796 }
797
798 writtenBytes += increment;
799 lastWriteTime = currentTime;
800 idleCountForBoth.set(0);
801 idleCountForWrite.set(0);
802
803 if (getService() instanceof AbstractIoService) {
804 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
805 }
806
807 increaseScheduledWriteBytes(-increment);
808 }
809
810
811
812
813 public final void increaseWrittenMessages(
814 WriteRequest request, long currentTime) {
815 Object message = request.getMessage();
816 if (message instanceof IoBuffer) {
817 IoBuffer b = (IoBuffer) message;
818 if (b.hasRemaining()) {
819 return;
820 }
821 }
822
823 writtenMessages++;
824 lastWriteTime = currentTime;
825 if (getService() instanceof AbstractIoService) {
826 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
827 }
828
829 decreaseScheduledWriteMessages();
830 }
831
832
833
834
835 public final void increaseScheduledWriteBytes(int increment) {
836 scheduledWriteBytes.addAndGet(increment);
837 if (getService() instanceof AbstractIoService) {
838 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
839 }
840 }
841
842
843
844
845 public final void increaseScheduledWriteMessages() {
846 scheduledWriteMessages.incrementAndGet();
847 if (getService() instanceof AbstractIoService) {
848 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
849 }
850 }
851
852
853
854
855 private void decreaseScheduledWriteMessages() {
856 scheduledWriteMessages.decrementAndGet();
857 if (getService() instanceof AbstractIoService) {
858 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
859 }
860 }
861
862
863
864
865 public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
866 Object message = request.getMessage();
867 if (message instanceof IoBuffer) {
868 IoBuffer b = (IoBuffer) message;
869 if (b.hasRemaining()) {
870 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
871 } else {
872 decreaseScheduledWriteMessages();
873 }
874 } else {
875 decreaseScheduledWriteMessages();
876 }
877 }
878
879
880
881
882 public final WriteRequestQueue getWriteRequestQueue() {
883 if (writeRequestQueue == null) {
884 throw new IllegalStateException();
885 }
886 return writeRequestQueue;
887 }
888
889
890
891
892 public final WriteRequest getCurrentWriteRequest() {
893 return currentWriteRequest;
894 }
895
896
897
898
899 public final Object getCurrentWriteMessage() {
900 WriteRequest req = getCurrentWriteRequest();
901 if (req == null) {
902 return null;
903 }
904 return req.getMessage();
905 }
906
907
908
909
910 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
911 this.currentWriteRequest = currentWriteRequest;
912 }
913
914
915
916
917 public final void increaseReadBufferSize() {
918 int newReadBufferSize = getConfig().getReadBufferSize() << 1;
919 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
920 getConfig().setReadBufferSize(newReadBufferSize);
921 } else {
922 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
923 }
924
925 deferDecreaseReadBuffer = true;
926 }
927
928
929
930
931 public final void decreaseReadBufferSize() {
932 if (deferDecreaseReadBuffer) {
933 deferDecreaseReadBuffer = false;
934 return;
935 }
936
937 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
938 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
939 }
940
941 deferDecreaseReadBuffer = true;
942 }
943
944
945
946
947 public final long getCreationTime() {
948 return creationTime;
949 }
950
951
952
953
954 public final long getLastIoTime() {
955 return Math.max(lastReadTime, lastWriteTime);
956 }
957
958
959
960
961 public final long getLastReadTime() {
962 return lastReadTime;
963 }
964
965
966
967
968 public final long getLastWriteTime() {
969 return lastWriteTime;
970 }
971
972
973
974
975 public final boolean isIdle(IdleStatus status) {
976 if (status == IdleStatus.BOTH_IDLE) {
977 return idleCountForBoth.get() > 0;
978 }
979
980 if (status == IdleStatus.READER_IDLE) {
981 return idleCountForRead.get() > 0;
982 }
983
984 if (status == IdleStatus.WRITER_IDLE) {
985 return idleCountForWrite.get() > 0;
986 }
987
988 throw new IllegalArgumentException("Unknown idle status: " + status);
989 }
990
991
992
993
994 public final boolean isBothIdle() {
995 return isIdle(IdleStatus.BOTH_IDLE);
996 }
997
998
999
1000
1001 public final boolean isReaderIdle() {
1002 return isIdle(IdleStatus.READER_IDLE);
1003 }
1004
1005
1006
1007
1008 public final boolean isWriterIdle() {
1009 return isIdle(IdleStatus.WRITER_IDLE);
1010 }
1011
1012
1013
1014
1015 public final int getIdleCount(IdleStatus status) {
1016 if (getConfig().getIdleTime(status) == 0) {
1017 if (status == IdleStatus.BOTH_IDLE) {
1018 idleCountForBoth.set(0);
1019 }
1020
1021 if (status == IdleStatus.READER_IDLE) {
1022 idleCountForRead.set(0);
1023 }
1024
1025 if (status == IdleStatus.WRITER_IDLE) {
1026 idleCountForWrite.set(0);
1027 }
1028 }
1029
1030 if (status == IdleStatus.BOTH_IDLE) {
1031 return idleCountForBoth.get();
1032 }
1033
1034 if (status == IdleStatus.READER_IDLE) {
1035 return idleCountForRead.get();
1036 }
1037
1038 if (status == IdleStatus.WRITER_IDLE) {
1039 return idleCountForWrite.get();
1040 }
1041
1042 throw new IllegalArgumentException("Unknown idle status: " + status);
1043 }
1044
1045
1046
1047
1048 public final long getLastIdleTime(IdleStatus status) {
1049 if (status == IdleStatus.BOTH_IDLE) {
1050 return lastIdleTimeForBoth;
1051 }
1052
1053 if (status == IdleStatus.READER_IDLE) {
1054 return lastIdleTimeForRead;
1055 }
1056
1057 if (status == IdleStatus.WRITER_IDLE) {
1058 return lastIdleTimeForWrite;
1059 }
1060
1061 throw new IllegalArgumentException("Unknown idle status: " + status);
1062 }
1063
1064
1065
1066
1067 public final void increaseIdleCount(IdleStatus status, long currentTime) {
1068 if (status == IdleStatus.BOTH_IDLE) {
1069 idleCountForBoth.incrementAndGet();
1070 lastIdleTimeForBoth = currentTime;
1071 } else if (status == IdleStatus.READER_IDLE) {
1072 idleCountForRead.incrementAndGet();
1073 lastIdleTimeForRead = currentTime;
1074 } else if (status == IdleStatus.WRITER_IDLE) {
1075 idleCountForWrite.incrementAndGet();
1076 lastIdleTimeForWrite = currentTime;
1077 } else {
1078 throw new IllegalArgumentException("Unknown idle status: " + status);
1079 }
1080 }
1081
1082
1083
1084
1085 public final int getBothIdleCount() {
1086 return getIdleCount(IdleStatus.BOTH_IDLE);
1087 }
1088
1089
1090
1091
1092 public final long getLastBothIdleTime() {
1093 return getLastIdleTime(IdleStatus.BOTH_IDLE);
1094 }
1095
1096
1097
1098
1099 public final long getLastReaderIdleTime() {
1100 return getLastIdleTime(IdleStatus.READER_IDLE);
1101 }
1102
1103
1104
1105
1106 public final long getLastWriterIdleTime() {
1107 return getLastIdleTime(IdleStatus.WRITER_IDLE);
1108 }
1109
1110
1111
1112
1113 public final int getReaderIdleCount() {
1114 return getIdleCount(IdleStatus.READER_IDLE);
1115 }
1116
1117
1118
1119
1120 public final int getWriterIdleCount() {
1121 return getIdleCount(IdleStatus.WRITER_IDLE);
1122 }
1123
1124
1125
1126
1127 public SocketAddress getServiceAddress() {
1128 IoService service = getService();
1129 if (service instanceof IoAcceptor) {
1130 return ((IoAcceptor) service).getLocalAddress();
1131 }
1132
1133 return getRemoteAddress();
1134 }
1135
1136
1137
1138
1139 @Override
1140 public final int hashCode() {
1141 return super.hashCode();
1142 }
1143
1144
1145
1146
1147
1148 @Override
1149 public final boolean equals(Object o) {
1150 return super.equals(o);
1151 }
1152
1153
1154
1155
1156 @Override
1157 public String toString() {
1158 if (isConnected()||isClosing()) {
1159 try {
1160 SocketAddress remote = getRemoteAddress();
1161 SocketAddress local = getLocalAddress();
1162
1163 if (getService() instanceof IoAcceptor) {
1164 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1165 remote + " => " + local + ')';
1166 }
1167
1168 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1169 local + " => " + remote + ')';
1170 } catch (Exception e) {
1171 return "Session is disconnecting ...";
1172 }
1173 }
1174
1175 return "Session disconnected ...";
1176 }
1177
1178
1179
1180
1181 private String getIdAsString() {
1182 String id = Long.toHexString(getId()).toUpperCase();
1183
1184
1185
1186 while (id.length() < 8) {
1187 id = '0' + id;
1188 }
1189 id = "0x" + id;
1190
1191 return id;
1192 }
1193
1194
1195
1196
1197 private String getServiceName() {
1198 TransportMetadata tm = getTransportMetadata();
1199 if (tm == null) {
1200 return "null";
1201 }
1202
1203 return tm.getProviderName() + ' ' + tm.getName();
1204 }
1205
1206
1207
1208
1209
1210
1211
1212 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1213 IoSession s = null;
1214 while (sessions.hasNext()) {
1215 s = sessions.next();
1216 notifyIdleSession(s, currentTime);
1217 }
1218 }
1219
1220
1221
1222
1223
1224
1225
1226 public static void notifyIdleSession(IoSession session, long currentTime) {
1227 notifyIdleSession0(
1228 session, currentTime,
1229 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1230 IdleStatus.BOTH_IDLE, Math.max(
1231 session.getLastIoTime(),
1232 session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1233
1234 notifyIdleSession0(
1235 session, currentTime,
1236 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1237 IdleStatus.READER_IDLE, Math.max(
1238 session.getLastReadTime(),
1239 session.getLastIdleTime(IdleStatus.READER_IDLE)));
1240
1241 notifyIdleSession0(
1242 session, currentTime,
1243 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1244 IdleStatus.WRITER_IDLE, Math.max(
1245 session.getLastWriteTime(),
1246 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1247
1248 notifyWriteTimeout(session, currentTime);
1249 }
1250
1251 private static void notifyIdleSession0(
1252 IoSession session, long currentTime,
1253 long idleTime, IdleStatus status, long lastIoTime) {
1254 if (idleTime > 0 && lastIoTime != 0
1255 && currentTime - lastIoTime >= idleTime) {
1256 session.getFilterChain().fireSessionIdle(status);
1257 }
1258 }
1259
1260 private static void notifyWriteTimeout(
1261 IoSession session, long currentTime) {
1262
1263 long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1264 if (writeTimeout > 0 &&
1265 currentTime - session.getLastWriteTime() >= writeTimeout &&
1266 !session.getWriteRequestQueue().isEmpty(session)) {
1267 WriteRequest request = session.getCurrentWriteRequest();
1268 if (request != null) {
1269 session.setCurrentWriteRequest(null);
1270 WriteTimeoutException cause = new WriteTimeoutException(request);
1271 request.getFuture().setException(cause);
1272 session.getFilterChain().fireExceptionCaught(cause);
1273
1274 session.close(true);
1275 }
1276 }
1277 }
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287 private class CloseAwareWriteQueue implements WriteRequestQueue {
1288
1289 private final WriteRequestQueue queue;
1290
1291
1292
1293
1294 public CloseAwareWriteQueue(WriteRequestQueue queue) {
1295 this.queue = queue;
1296 }
1297
1298
1299
1300
1301 public synchronized WriteRequest poll(IoSession session) {
1302 WriteRequest answer = queue.poll(session);
1303
1304 if (answer == CLOSE_REQUEST) {
1305 AbstractIoSession.this.close();
1306 dispose(session);
1307 answer = null;
1308 }
1309
1310 return answer;
1311 }
1312
1313
1314
1315
1316 public void offer(IoSession session, WriteRequest e) {
1317 queue.offer(session, e);
1318 }
1319
1320
1321
1322
1323 public boolean isEmpty(IoSession session) {
1324 return queue.isEmpty(session);
1325 }
1326
1327
1328
1329
1330 public void clear(IoSession session) {
1331 queue.clear(session);
1332 }
1333
1334
1335
1336
1337 public void dispose(IoSession session) {
1338 queue.dispose(session);
1339 }
1340 }
1341 }