1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.session;
21
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.net.SocketAddress;
26 import java.nio.channels.FileChannel;
27 import java.util.Iterator;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicLong;
33
34 import org.apache.mina.core.buffer.IoBuffer;
35 import org.apache.mina.core.file.DefaultFileRegion;
36 import org.apache.mina.core.filterchain.IoFilterChain;
37 import org.apache.mina.core.future.CloseFuture;
38 import org.apache.mina.core.future.DefaultCloseFuture;
39 import org.apache.mina.core.future.DefaultReadFuture;
40 import org.apache.mina.core.future.DefaultWriteFuture;
41 import org.apache.mina.core.future.IoFutureListener;
42 import org.apache.mina.core.future.ReadFuture;
43 import org.apache.mina.core.future.WriteFuture;
44 import org.apache.mina.core.service.AbstractIoService;
45 import org.apache.mina.core.service.IoAcceptor;
46 import org.apache.mina.core.service.IoProcessor;
47 import org.apache.mina.core.service.IoService;
48 import org.apache.mina.core.service.TransportMetadata;
49 import org.apache.mina.core.write.DefaultWriteRequest;
50 import org.apache.mina.core.write.WriteException;
51 import org.apache.mina.core.write.WriteRequest;
52 import org.apache.mina.core.write.WriteRequestQueue;
53 import org.apache.mina.core.write.WriteTimeoutException;
54 import org.apache.mina.core.write.WriteToClosedSessionException;
55 import org.apache.mina.util.CircularQueue;
56 import org.apache.mina.util.ExceptionMonitor;
57
58
59
60
61
62
63
64 public abstract class AbstractIoSession implements IoSession {
65
66 private static final AttributeKey READY_READ_FUTURES_KEY =
67 new AttributeKey(AbstractIoSession.class, "readyReadFutures");
68
69 private static final AttributeKey WAITING_READ_FUTURES_KEY =
70 new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
71
72 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
73 new IoFutureListener<CloseFuture>() {
74 public void operationComplete(CloseFuture future) {
75 AbstractIoSession session = (AbstractIoSession) future.getSession();
76 session.scheduledWriteBytes.set(0);
77 session.scheduledWriteMessages.set(0);
78 session.readBytesThroughput = 0;
79 session.readMessagesThroughput = 0;
80 session.writtenBytesThroughput = 0;
81 session.writtenMessagesThroughput = 0;
82 }
83 };
84
85
86
87
88
89 private static final WriteRequest CLOSE_REQUEST =
90 new DefaultWriteRequest(new Object());
91
92 private final Object lock = new Object();
93
94 private IoSessionAttributeMap attributes;
95 private WriteRequestQueue writeRequestQueue;
96 private WriteRequest currentWriteRequest;
97
98
99 private final long creationTime;
100
101
102 private static AtomicLong idGenerator = new AtomicLong(0);
103
104
105 private long sessionId;
106
107
108
109
110 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
111
112 private volatile boolean closing;
113
114
115 private boolean readSuspended=false;
116 private boolean writeSuspended=false;
117
118
119 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
120 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
121 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
122
123 private long readBytes;
124 private long writtenBytes;
125 private long readMessages;
126 private long writtenMessages;
127 private long lastReadTime;
128 private long lastWriteTime;
129
130 private long lastThroughputCalculationTime;
131 private long lastReadBytes;
132 private long lastWrittenBytes;
133 private long lastReadMessages;
134 private long lastWrittenMessages;
135 private double readBytesThroughput;
136 private double writtenBytesThroughput;
137 private double readMessagesThroughput;
138 private double writtenMessagesThroughput;
139
140 private int idleCountForBoth;
141 private int idleCountForRead;
142 private int idleCountForWrite;
143
144 private long lastIdleTimeForBoth;
145 private long lastIdleTimeForRead;
146 private long lastIdleTimeForWrite;
147
148 private boolean deferDecreaseReadBuffer = true;
149
150
151
152
153 protected AbstractIoSession() {
154
155 long currentTime = System.currentTimeMillis();
156 creationTime = currentTime;
157 lastThroughputCalculationTime = currentTime;
158 lastReadTime = currentTime;
159 lastWriteTime = currentTime;
160 lastIdleTimeForBoth = currentTime;
161 lastIdleTimeForRead = currentTime;
162 lastIdleTimeForWrite = currentTime;
163
164
165 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
166
167
168 sessionId = idGenerator.incrementAndGet();
169 }
170
171
172
173
174
175
176
177 public final long getId() {
178 return sessionId;
179 }
180
181
182
183
184 public abstract IoProcessor getProcessor();
185
186
187
188
189 public final boolean isConnected() {
190 return !closeFuture.isClosed();
191 }
192
193
194
195
196 public final boolean isClosing() {
197 return closing || closeFuture.isClosed();
198 }
199
200
201
202
203 public final CloseFuture getCloseFuture() {
204 return closeFuture;
205 }
206
207
208
209
210 public final boolean isScheduledForFlush() {
211 return scheduledForFlush.get();
212 }
213
214
215
216
217 public final boolean setScheduledForFlush(boolean flag) {
218 if (flag) {
219 return scheduledForFlush.compareAndSet(false, true);
220 }
221
222 scheduledForFlush.set(false);
223 return true;
224 }
225
226
227
228
229 public final CloseFuture close(boolean rightNow) {
230 if (rightNow) {
231 return close();
232 }
233
234 return closeOnFlush();
235 }
236
237
238
239
240 public final CloseFuture close() {
241 synchronized (lock) {
242 if (isClosing()) {
243 return closeFuture;
244 }
245
246 closing = true;
247 }
248
249 getFilterChain().fireFilterClose();
250 return closeFuture;
251 }
252
253 private final CloseFuture closeOnFlush() {
254 getWriteRequestQueue().offer(this, CLOSE_REQUEST);
255 getProcessor().flush(this);
256 return closeFuture;
257 }
258
259
260
261
262 public final ReadFuture read() {
263 if (!getConfig().isUseReadOperation()) {
264 throw new IllegalStateException("useReadOperation is not enabled.");
265 }
266
267 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
268 ReadFuture future;
269 synchronized (readyReadFutures) {
270 future = readyReadFutures.poll();
271 if (future != null) {
272 if (future.isClosed()) {
273
274 readyReadFutures.offer(future);
275 }
276 } else {
277 future = new DefaultReadFuture(this);
278 getWaitingReadFutures().offer(future);
279 }
280 }
281
282 return future;
283 }
284
285
286
287
288 public final void offerReadFuture(Object message) {
289 newReadFuture().setRead(message);
290 }
291
292
293
294
295 public final void offerFailedReadFuture(Throwable exception) {
296 newReadFuture().setException(exception);
297 }
298
299
300
301
302 public final void offerClosedReadFuture() {
303 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
304 synchronized (readyReadFutures) {
305 newReadFuture().setClosed();
306 }
307 }
308
309
310
311
312 private ReadFuture newReadFuture() {
313 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
314 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
315 ReadFuture future;
316 synchronized (readyReadFutures) {
317 future = waitingReadFutures.poll();
318 if (future == null) {
319 future = new DefaultReadFuture(this);
320 readyReadFutures.offer(future);
321 }
322 }
323 return future;
324 }
325
326
327
328
329 private Queue<ReadFuture> getReadyReadFutures() {
330 Queue<ReadFuture> readyReadFutures =
331 (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
332 if (readyReadFutures == null) {
333 readyReadFutures = new CircularQueue<ReadFuture>();
334
335 Queue<ReadFuture> oldReadyReadFutures =
336 (Queue<ReadFuture>) setAttributeIfAbsent(
337 READY_READ_FUTURES_KEY, readyReadFutures);
338 if (oldReadyReadFutures != null) {
339 readyReadFutures = oldReadyReadFutures;
340 }
341 }
342 return readyReadFutures;
343 }
344
345
346
347
348 private Queue<ReadFuture> getWaitingReadFutures() {
349 Queue<ReadFuture> waitingReadyReadFutures =
350 (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
351 if (waitingReadyReadFutures == null) {
352 waitingReadyReadFutures = new CircularQueue<ReadFuture>();
353
354 Queue<ReadFuture> oldWaitingReadyReadFutures =
355 (Queue<ReadFuture>) setAttributeIfAbsent(
356 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
357 if (oldWaitingReadyReadFutures != null) {
358 waitingReadyReadFutures = oldWaitingReadyReadFutures;
359 }
360 }
361 return waitingReadyReadFutures;
362 }
363
364
365
366
367 public WriteFuture write(Object message) {
368 return write(message, null);
369 }
370
371
372
373
374 public WriteFuture write(Object message, SocketAddress remoteAddress) {
375 if (message == null) {
376 throw new NullPointerException("message");
377 }
378
379
380
381 if (!getTransportMetadata().isConnectionless() &&
382 remoteAddress != null) {
383 throw new UnsupportedOperationException();
384 }
385
386
387
388
389
390 if (isClosing() || !isConnected()) {
391 WriteFuture future = new DefaultWriteFuture(this);
392 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
393 WriteException writeException = new WriteToClosedSessionException(request);
394 future.setException(writeException);
395 return future;
396 }
397
398 FileChannel openedFileChannel = null;
399
400
401
402 try {
403 if (message instanceof IoBuffer
404 && !((IoBuffer) message).hasRemaining()) {
405
406 throw new IllegalArgumentException(
407 "message is empty. Forgot to call flip()?");
408 } else if (message instanceof FileChannel) {
409 FileChannel fileChannel = (FileChannel) message;
410 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
411 } else if (message instanceof File) {
412 File file = (File) message;
413 openedFileChannel = new FileInputStream(file).getChannel();
414 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
415 }
416 } catch (IOException e) {
417 ExceptionMonitor.getInstance().exceptionCaught(e);
418 return DefaultWriteFuture.newNotWrittenFuture(this, e);
419 }
420
421
422 WriteFuture writeFuture = new DefaultWriteFuture(this);
423 WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
424
425
426 IoFilterChain filterChain = getFilterChain();
427 filterChain.fireFilterWrite(writeRequest);
428
429
430
431 if (openedFileChannel != null) {
432
433 final FileChannel finalChannel = openedFileChannel;
434 writeFuture.addListener(new IoFutureListener<WriteFuture>() {
435 public void operationComplete(WriteFuture future) {
436 try {
437 finalChannel.close();
438 } catch (IOException e) {
439 ExceptionMonitor.getInstance().exceptionCaught(e);
440 }
441 }
442 });
443 }
444
445
446 return writeFuture;
447 }
448
449
450
451
452 public final Object getAttachment() {
453 return getAttribute("");
454 }
455
456
457
458
459 public final Object setAttachment(Object attachment) {
460 return setAttribute("", attachment);
461 }
462
463
464
465
466 public final Object getAttribute(Object key) {
467 return getAttribute(key, null);
468 }
469
470
471
472
473 public final Object getAttribute(Object key, Object defaultValue) {
474 return attributes.getAttribute(this, key, defaultValue);
475 }
476
477
478
479
480 public final Object setAttribute(Object key, Object value) {
481 return attributes.setAttribute(this, key, value);
482 }
483
484
485
486
487 public final Object setAttribute(Object key) {
488 return setAttribute(key, Boolean.TRUE);
489 }
490
491
492
493
494 public final Object setAttributeIfAbsent(Object key, Object value) {
495 return attributes.setAttributeIfAbsent(this, key, value);
496 }
497
498
499
500
501 public final Object setAttributeIfAbsent(Object key) {
502 return setAttributeIfAbsent(key, Boolean.TRUE);
503 }
504
505
506
507
508 public final Object removeAttribute(Object key) {
509 return attributes.removeAttribute(this, key);
510 }
511
512
513
514
515 public final boolean removeAttribute(Object key, Object value) {
516 return attributes.removeAttribute(this, key, value);
517 }
518
519
520
521
522 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
523 return attributes.replaceAttribute(this, key, oldValue, newValue);
524 }
525
526
527
528
529 public final boolean containsAttribute(Object key) {
530 return attributes.containsAttribute(this, key);
531 }
532
533
534
535
536 public final Set<Object> getAttributeKeys() {
537 return attributes.getAttributeKeys(this);
538 }
539
540
541
542
543 public final IoSessionAttributeMap getAttributeMap() {
544 return attributes;
545 }
546
547
548
549
550 public final void setAttributeMap(IoSessionAttributeMap attributes) {
551 this.attributes = attributes;
552 }
553
554
555
556
557
558
559 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
560 this.writeRequestQueue =
561 new CloseAwareWriteQueue(writeRequestQueue);
562 }
563
564
565
566
567
568 public final void suspendRead() {
569 readSuspended = true;
570 if (isClosing() || !isConnected()) {
571 return;
572 }
573 getProcessor().updateTrafficControl(this);
574 }
575
576
577
578
579 public final void suspendWrite() {
580 writeSuspended = true;
581 if (isClosing() || !isConnected()) {
582 return;
583 }
584 getProcessor().updateTrafficControl(this);
585 }
586
587
588
589
590 @SuppressWarnings("unchecked")
591 public final void resumeRead() {
592 readSuspended = false;
593 if (isClosing() || !isConnected()) {
594 return;
595 }
596 getProcessor().updateTrafficControl(this);
597 }
598
599
600
601
602 @SuppressWarnings("unchecked")
603 public final void resumeWrite() {
604 writeSuspended = false;
605 if (isClosing() || !isConnected()) {
606 return;
607 }
608 getProcessor().updateTrafficControl(this);
609 }
610
611
612
613
614 public boolean isReadSuspended() {
615 return readSuspended;
616 }
617
618
619
620
621 public boolean isWriteSuspended() {
622 return writeSuspended;
623 }
624
625
626
627
628 public final long getReadBytes() {
629 return readBytes;
630 }
631
632
633
634
635 public final long getWrittenBytes() {
636 return writtenBytes;
637 }
638
639
640
641
642 public final long getReadMessages() {
643 return readMessages;
644 }
645
646
647
648
649 public final long getWrittenMessages() {
650 return writtenMessages;
651 }
652
653
654
655
656 public final double getReadBytesThroughput() {
657 return readBytesThroughput;
658 }
659
660
661
662
663 public final double getWrittenBytesThroughput() {
664 return writtenBytesThroughput;
665 }
666
667
668
669
670 public final double getReadMessagesThroughput() {
671 return readMessagesThroughput;
672 }
673
674
675
676
677 public final double getWrittenMessagesThroughput() {
678 return writtenMessagesThroughput;
679 }
680
681
682
683
684 public final void updateThroughput(long currentTime, boolean force) {
685 int interval = (int) (currentTime - lastThroughputCalculationTime);
686
687 long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
688 if (minInterval == 0 || interval < minInterval) {
689 if (!force) {
690 return;
691 }
692 }
693
694 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
695 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
696 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
697 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
698
699 lastReadBytes = readBytes;
700 lastWrittenBytes = writtenBytes;
701 lastReadMessages = readMessages;
702 lastWrittenMessages = writtenMessages;
703
704 lastThroughputCalculationTime = currentTime;
705 }
706
707
708
709
710 public final long getScheduledWriteBytes() {
711 return scheduledWriteBytes.get();
712 }
713
714
715
716
717 public final int getScheduledWriteMessages() {
718 return scheduledWriteMessages.get();
719 }
720
721
722
723
724 protected void setScheduledWriteBytes(int byteCount){
725 scheduledWriteBytes.set(byteCount);
726 }
727
728
729
730
731 protected void setScheduledWriteMessages(int messages) {
732 scheduledWriteMessages.set(messages);
733 }
734
735
736
737
738 public final void increaseReadBytes(long increment, long currentTime) {
739 if (increment <= 0) {
740 return;
741 }
742
743 readBytes += increment;
744 lastReadTime = currentTime;
745 idleCountForBoth = 0;
746 idleCountForRead = 0;
747
748 if (getService() instanceof AbstractIoService) {
749 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
750 }
751 }
752
753
754
755
756 public final void increaseReadMessages(long currentTime) {
757 readMessages++;
758 lastReadTime = currentTime;
759 idleCountForBoth = 0;
760 idleCountForRead = 0;
761
762 if (getService() instanceof AbstractIoService) {
763 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
764 }
765 }
766
767
768
769
770 public final void increaseWrittenBytes(int increment, long currentTime) {
771 if (increment <= 0) {
772 return;
773 }
774
775 writtenBytes += increment;
776 lastWriteTime = currentTime;
777 idleCountForBoth = 0;
778 idleCountForWrite = 0;
779
780 if (getService() instanceof AbstractIoService) {
781 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
782 }
783
784 increaseScheduledWriteBytes(-increment);
785 }
786
787
788
789
790 public final void increaseWrittenMessages(
791 WriteRequest request, long currentTime) {
792 Object message = request.getMessage();
793 if (message instanceof IoBuffer) {
794 IoBuffer b = (IoBuffer) message;
795 if (b.hasRemaining()) {
796 return;
797 }
798 }
799
800 writtenMessages++;
801 lastWriteTime = currentTime;
802 if (getService() instanceof AbstractIoService) {
803 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
804 }
805
806 decreaseScheduledWriteMessages();
807 }
808
809
810
811
812 public final void increaseScheduledWriteBytes(int increment) {
813 scheduledWriteBytes.addAndGet(increment);
814 if (getService() instanceof AbstractIoService) {
815 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
816 }
817 }
818
819
820
821
822 public final void increaseScheduledWriteMessages() {
823 scheduledWriteMessages.incrementAndGet();
824 if (getService() instanceof AbstractIoService) {
825 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
826 }
827 }
828
829
830
831
832 private void decreaseScheduledWriteMessages() {
833 scheduledWriteMessages.decrementAndGet();
834 if (getService() instanceof AbstractIoService) {
835 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
836 }
837 }
838
839
840
841
842 public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
843 Object message = request.getMessage();
844 if (message instanceof IoBuffer) {
845 IoBuffer b = (IoBuffer) message;
846 if (b.hasRemaining()) {
847 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
848 } else {
849 decreaseScheduledWriteMessages();
850 }
851 } else {
852 decreaseScheduledWriteMessages();
853 }
854 }
855
856
857
858
859 public final WriteRequestQueue getWriteRequestQueue() {
860 if (writeRequestQueue == null) {
861 throw new IllegalStateException();
862 }
863 return writeRequestQueue;
864 }
865
866
867
868
869 public final WriteRequest getCurrentWriteRequest() {
870 return currentWriteRequest;
871 }
872
873
874
875
876 public final Object getCurrentWriteMessage() {
877 WriteRequest req = getCurrentWriteRequest();
878 if (req == null) {
879 return null;
880 }
881 return req.getMessage();
882 }
883
884
885
886
887 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
888 this.currentWriteRequest = currentWriteRequest;
889 }
890
891
892
893
894 public final void increaseReadBufferSize() {
895 int newReadBufferSize = getConfig().getReadBufferSize() << 1;
896 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
897 getConfig().setReadBufferSize(newReadBufferSize);
898 } else {
899 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
900 }
901
902 deferDecreaseReadBuffer = true;
903 }
904
905
906
907
908 public final void decreaseReadBufferSize() {
909 if (deferDecreaseReadBuffer) {
910 deferDecreaseReadBuffer = false;
911 return;
912 }
913
914 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
915 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
916 }
917
918 deferDecreaseReadBuffer = true;
919 }
920
921
922
923
924 public final long getCreationTime() {
925 return creationTime;
926 }
927
928
929
930
931 public final long getLastIoTime() {
932 return Math.max(lastReadTime, lastWriteTime);
933 }
934
935
936
937
938 public final long getLastReadTime() {
939 return lastReadTime;
940 }
941
942
943
944
945 public final long getLastWriteTime() {
946 return lastWriteTime;
947 }
948
949
950
951
952 public final boolean isIdle(IdleStatus status) {
953 if (status == IdleStatus.BOTH_IDLE) {
954 return idleCountForBoth > 0;
955 }
956
957 if (status == IdleStatus.READER_IDLE) {
958 return idleCountForRead > 0;
959 }
960
961 if (status == IdleStatus.WRITER_IDLE) {
962 return idleCountForWrite > 0;
963 }
964
965 throw new IllegalArgumentException("Unknown idle status: " + status);
966 }
967
968
969
970
971 public final boolean isBothIdle() {
972 return isIdle(IdleStatus.BOTH_IDLE);
973 }
974
975
976
977
978 public final boolean isReaderIdle() {
979 return isIdle(IdleStatus.READER_IDLE);
980 }
981
982
983
984
985 public final boolean isWriterIdle() {
986 return isIdle(IdleStatus.WRITER_IDLE);
987 }
988
989
990
991
992 public final int getIdleCount(IdleStatus status) {
993 if (getConfig().getIdleTime(status) == 0) {
994 if (status == IdleStatus.BOTH_IDLE) {
995 idleCountForBoth = 0;
996 }
997
998 if (status == IdleStatus.READER_IDLE) {
999 idleCountForRead = 0;
1000 }
1001
1002 if (status == IdleStatus.WRITER_IDLE) {
1003 idleCountForWrite = 0;
1004 }
1005 }
1006
1007 if (status == IdleStatus.BOTH_IDLE) {
1008 return idleCountForBoth;
1009 }
1010
1011 if (status == IdleStatus.READER_IDLE) {
1012 return idleCountForRead;
1013 }
1014
1015 if (status == IdleStatus.WRITER_IDLE) {
1016 return idleCountForWrite;
1017 }
1018
1019 throw new IllegalArgumentException("Unknown idle status: " + status);
1020 }
1021
1022
1023
1024
1025 public final long getLastIdleTime(IdleStatus status) {
1026 if (status == IdleStatus.BOTH_IDLE) {
1027 return lastIdleTimeForBoth;
1028 }
1029
1030 if (status == IdleStatus.READER_IDLE) {
1031 return lastIdleTimeForRead;
1032 }
1033
1034 if (status == IdleStatus.WRITER_IDLE) {
1035 return lastIdleTimeForWrite;
1036 }
1037
1038 throw new IllegalArgumentException("Unknown idle status: " + status);
1039 }
1040
1041
1042
1043
1044 public final void increaseIdleCount(IdleStatus status, long currentTime) {
1045 if (status == IdleStatus.BOTH_IDLE) {
1046 idleCountForBoth++;
1047 lastIdleTimeForBoth = currentTime;
1048 } else if (status == IdleStatus.READER_IDLE) {
1049 idleCountForRead++;
1050 lastIdleTimeForRead = currentTime;
1051 } else if (status == IdleStatus.WRITER_IDLE) {
1052 idleCountForWrite++;
1053 lastIdleTimeForWrite = currentTime;
1054 } else {
1055 throw new IllegalArgumentException("Unknown idle status: " + status);
1056 }
1057 }
1058
1059
1060
1061
1062 public final int getBothIdleCount() {
1063 return getIdleCount(IdleStatus.BOTH_IDLE);
1064 }
1065
1066
1067
1068
1069 public final long getLastBothIdleTime() {
1070 return getLastIdleTime(IdleStatus.BOTH_IDLE);
1071 }
1072
1073
1074
1075
1076 public final long getLastReaderIdleTime() {
1077 return getLastIdleTime(IdleStatus.READER_IDLE);
1078 }
1079
1080
1081
1082
1083 public final long getLastWriterIdleTime() {
1084 return getLastIdleTime(IdleStatus.WRITER_IDLE);
1085 }
1086
1087
1088
1089
1090 public final int getReaderIdleCount() {
1091 return getIdleCount(IdleStatus.READER_IDLE);
1092 }
1093
1094
1095
1096
1097 public final int getWriterIdleCount() {
1098 return getIdleCount(IdleStatus.WRITER_IDLE);
1099 }
1100
1101
1102
1103
1104 public SocketAddress getServiceAddress() {
1105 IoService service = getService();
1106 if (service instanceof IoAcceptor) {
1107 return ((IoAcceptor) service).getLocalAddress();
1108 }
1109
1110 return getRemoteAddress();
1111 }
1112
1113
1114
1115
1116 @Override
1117 public final int hashCode() {
1118 return super.hashCode();
1119 }
1120
1121
1122
1123
1124
1125 @Override
1126 public final boolean equals(Object o) {
1127 return super.equals(o);
1128 }
1129
1130
1131
1132
1133 @Override
1134 public String toString() {
1135 if (isConnected()||isClosing()) {
1136 if (getService() instanceof IoAcceptor) {
1137 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1138 getRemoteAddress() + " => " + getLocalAddress() + ')';
1139 }
1140
1141 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1142 getLocalAddress() + " => " + getRemoteAddress() + ')';
1143 }
1144
1145 return "Session disconnected ...";
1146 }
1147
1148
1149
1150
1151 private String getIdAsString() {
1152 String id = Long.toHexString(getId()).toUpperCase();
1153
1154
1155
1156 while (id.length() < 8) {
1157 id = '0' + id;
1158 }
1159 id = "0x" + id;
1160
1161 return id;
1162 }
1163
1164
1165
1166
1167 private String getServiceName() {
1168 TransportMetadata tm = getTransportMetadata();
1169 if (tm == null) {
1170 return "null";
1171 }
1172
1173 return tm.getProviderName() + ' ' + tm.getName();
1174 }
1175
1176
1177
1178
1179
1180
1181
1182 public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1183 IoSession s = null;
1184 while (sessions.hasNext()) {
1185 s = sessions.next();
1186 notifyIdleSession(s, currentTime);
1187 }
1188 }
1189
1190
1191
1192
1193
1194
1195
1196 public static void notifyIdleSession(IoSession session, long currentTime) {
1197 notifyIdleSession0(
1198 session, currentTime,
1199 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1200 IdleStatus.BOTH_IDLE, Math.max(
1201 session.getLastIoTime(),
1202 session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1203
1204 notifyIdleSession0(
1205 session, currentTime,
1206 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1207 IdleStatus.READER_IDLE, Math.max(
1208 session.getLastReadTime(),
1209 session.getLastIdleTime(IdleStatus.READER_IDLE)));
1210
1211 notifyIdleSession0(
1212 session, currentTime,
1213 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1214 IdleStatus.WRITER_IDLE, Math.max(
1215 session.getLastWriteTime(),
1216 session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1217
1218 notifyWriteTimeout(session, currentTime);
1219 }
1220
1221 private static void notifyIdleSession0(
1222 IoSession session, long currentTime,
1223 long idleTime, IdleStatus status, long lastIoTime) {
1224 if (idleTime > 0 && lastIoTime != 0
1225 && currentTime - lastIoTime >= idleTime) {
1226 session.getFilterChain().fireSessionIdle(status);
1227 }
1228 }
1229
1230 private static void notifyWriteTimeout(
1231 IoSession session, long currentTime) {
1232
1233 long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1234 if (writeTimeout > 0 &&
1235 currentTime - session.getLastWriteTime() >= writeTimeout &&
1236 !session.getWriteRequestQueue().isEmpty(session)) {
1237 WriteRequest request = session.getCurrentWriteRequest();
1238 if (request != null) {
1239 session.setCurrentWriteRequest(null);
1240 WriteTimeoutException cause = new WriteTimeoutException(request);
1241 request.getFuture().setException(cause);
1242 session.getFilterChain().fireExceptionCaught(cause);
1243
1244 session.close(true);
1245 }
1246 }
1247 }
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257 private class CloseAwareWriteQueue implements WriteRequestQueue {
1258
1259 private final WriteRequestQueue q;
1260
1261
1262
1263
1264 public CloseAwareWriteQueue(WriteRequestQueue q) {
1265 this.q = q;
1266 }
1267
1268
1269
1270
1271 public synchronized WriteRequest poll(IoSession session) {
1272 WriteRequest answer = q.poll(session);
1273
1274 if (answer == CLOSE_REQUEST) {
1275 AbstractIoSession.this.close();
1276 dispose(session);
1277 answer = null;
1278 }
1279
1280 return answer;
1281 }
1282
1283
1284
1285
1286 public void offer(IoSession session, WriteRequest e) {
1287 q.offer(session, e);
1288 }
1289
1290
1291
1292
1293 public boolean isEmpty(IoSession session) {
1294 return q.isEmpty(session);
1295 }
1296
1297
1298
1299
1300 public void clear(IoSession session) {
1301 q.clear(session);
1302 }
1303
1304
1305
1306
1307 public void dispose(IoSession session) {
1308 q.dispose(session);
1309 }
1310 }
1311 }