1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.session;
21
22 import java.io.File;
23 import java.io.FileInputStream;
24 import java.io.IOException;
25 import java.net.SocketAddress;
26 import java.nio.channels.FileChannel;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.apache.mina.core.buffer.IoBuffer;
33 import org.apache.mina.core.file.DefaultFileRegion;
34 import org.apache.mina.core.future.CloseFuture;
35 import org.apache.mina.core.future.DefaultCloseFuture;
36 import org.apache.mina.core.future.DefaultReadFuture;
37 import org.apache.mina.core.future.DefaultWriteFuture;
38 import org.apache.mina.core.future.IoFutureListener;
39 import org.apache.mina.core.future.ReadFuture;
40 import org.apache.mina.core.future.WriteFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoAcceptor;
43 import org.apache.mina.core.service.IoProcessor;
44 import org.apache.mina.core.service.IoService;
45 import org.apache.mina.core.service.TransportMetadata;
46 import org.apache.mina.core.write.DefaultWriteRequest;
47 import org.apache.mina.core.write.WriteRequest;
48 import org.apache.mina.core.write.WriteRequestQueue;
49 import org.apache.mina.core.write.WriteToClosedSessionException;
50 import org.apache.mina.util.CircularQueue;
51 import org.apache.mina.util.ExceptionMonitor;
52
53
54
55
56
57
58
59
60 public abstract class AbstractIoSession implements IoSession {
61
62 private static final AttributeKey READY_READ_FUTURES_KEY =
63 new AttributeKey(AbstractIoSession.class, "readyReadFutures");
64
65 private static final AttributeKey WAITING_READ_FUTURES_KEY =
66 new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
67
68 private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
69 new IoFutureListener<CloseFuture>() {
70 public void operationComplete(CloseFuture future) {
71 AbstractIoSession s = (AbstractIoSession) future.getSession();
72 s.scheduledWriteBytes.set(0);
73 s.scheduledWriteMessages.set(0);
74 s.readBytesThroughput = 0;
75 s.readMessagesThroughput = 0;
76 s.writtenBytesThroughput = 0;
77 s.writtenMessagesThroughput = 0;
78 }
79 };
80
81
82
83
84
85 private static final WriteRequest CLOSE_REQUEST =
86 new DefaultWriteRequest(new Object());
87
88 private final Object lock = new Object();
89
90 private IoSessionAttributeMap attributes;
91 private WriteRequestQueue writeRequestQueue;
92 private WriteRequest currentWriteRequest;
93
94
95 private final long creationTime;
96
97
98
99
100 private final CloseFuture closeFuture = new DefaultCloseFuture(this);
101
102 private volatile boolean closing;
103 private volatile TrafficMask trafficMask = TrafficMask.ALL;
104
105
106 private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
107 private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
108 private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
109
110 private long readBytes;
111 private long writtenBytes;
112 private long readMessages;
113 private long writtenMessages;
114 private long lastReadTime;
115 private long lastWriteTime;
116
117 private long lastThroughputCalculationTime;
118 private long lastReadBytes;
119 private long lastWrittenBytes;
120 private long lastReadMessages;
121 private long lastWrittenMessages;
122 private double readBytesThroughput;
123 private double writtenBytesThroughput;
124 private double readMessagesThroughput;
125 private double writtenMessagesThroughput;
126
127 private int idleCountForBoth;
128 private int idleCountForRead;
129 private int idleCountForWrite;
130
131 private long lastIdleTimeForBoth;
132 private long lastIdleTimeForRead;
133 private long lastIdleTimeForWrite;
134
135 private boolean deferDecreaseReadBuffer = true;
136
137
138
139
140 protected AbstractIoSession() {
141
142 long currentTime = System.currentTimeMillis();
143 creationTime = currentTime;
144 lastThroughputCalculationTime = currentTime;
145 lastReadTime = currentTime;
146 lastWriteTime = currentTime;
147 lastIdleTimeForBoth = currentTime;
148 lastIdleTimeForRead = currentTime;
149 lastIdleTimeForWrite = currentTime;
150
151
152 closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
153 }
154
155
156
157
158
159
160
161 public final long getId() {
162 return hashCode() & 0xFFFFFFFFL;
163 }
164
165
166
167
168 public abstract IoProcessor getProcessor();
169
170
171
172
173 public final boolean isConnected() {
174 return !closeFuture.isClosed();
175 }
176
177
178
179
180 public final boolean isClosing() {
181 return closing || closeFuture.isClosed();
182 }
183
184
185
186
187 public final CloseFuture getCloseFuture() {
188 return closeFuture;
189 }
190
191
192
193
194 public final boolean isScheduledForFlush() {
195 return scheduledForFlush.get();
196 }
197
198
199
200
201 public final boolean setScheduledForFlush(boolean flag) {
202 if (flag) {
203 return scheduledForFlush.compareAndSet(false, true);
204 } else {
205 scheduledForFlush.set(false);
206 return true;
207 }
208 }
209
210
211
212
213 public final CloseFuture close(boolean rightNow) {
214 if (rightNow) {
215 return close();
216 } else {
217 return closeOnFlush();
218 }
219 }
220
221
222
223
224 public final CloseFuture close() {
225 synchronized (lock) {
226 if (isClosing()) {
227 return closeFuture;
228 } else {
229 closing = true;
230 }
231 }
232
233 getFilterChain().fireFilterClose();
234 return closeFuture;
235 }
236
237
238
239
240 public final CloseFuture closeOnFlush() {
241 getWriteRequestQueue().offer(this, CLOSE_REQUEST);
242 getProcessor().flush(this);
243 return closeFuture;
244 }
245
246
247
248
249 public final ReadFuture read() {
250 if (!getConfig().isUseReadOperation()) {
251 throw new IllegalStateException("useReadOperation is not enabled.");
252 }
253
254 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
255 ReadFuture future;
256 synchronized (readyReadFutures) {
257 future = readyReadFutures.poll();
258 if (future != null) {
259 if (future.isClosed()) {
260
261 readyReadFutures.offer(future);
262 }
263 } else {
264 future = new DefaultReadFuture(this);
265 getWaitingReadFutures().offer(future);
266 }
267 }
268
269 return future;
270 }
271
272
273
274
275 public final void offerReadFuture(Object message) {
276 newReadFuture().setRead(message);
277 }
278
279
280
281
282 public final void offerFailedReadFuture(Throwable exception) {
283 newReadFuture().setException(exception);
284 }
285
286
287
288
289 public final void offerClosedReadFuture() {
290 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
291 synchronized (readyReadFutures) {
292 newReadFuture().setClosed();
293 }
294 }
295
296
297
298
299 private ReadFuture newReadFuture() {
300 Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
301 Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
302 ReadFuture future;
303 synchronized (readyReadFutures) {
304 future = waitingReadFutures.poll();
305 if (future == null) {
306 future = new DefaultReadFuture(this);
307 readyReadFutures.offer(future);
308 }
309 }
310 return future;
311 }
312
313
314
315
316 private Queue<ReadFuture> getReadyReadFutures() {
317 Queue<ReadFuture> readyReadFutures =
318 (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
319 if (readyReadFutures == null) {
320 readyReadFutures = new CircularQueue<ReadFuture>();
321
322 Queue<ReadFuture> oldReadyReadFutures =
323 (Queue<ReadFuture>) setAttributeIfAbsent(
324 READY_READ_FUTURES_KEY, readyReadFutures);
325 if (oldReadyReadFutures != null) {
326 readyReadFutures = oldReadyReadFutures;
327 }
328 }
329 return readyReadFutures;
330 }
331
332
333
334
335 private Queue<ReadFuture> getWaitingReadFutures() {
336 Queue<ReadFuture> waitingReadyReadFutures =
337 (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
338 if (waitingReadyReadFutures == null) {
339 waitingReadyReadFutures = new CircularQueue<ReadFuture>();
340
341 Queue<ReadFuture> oldWaitingReadyReadFutures =
342 (Queue<ReadFuture>) setAttributeIfAbsent(
343 WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
344 if (oldWaitingReadyReadFutures != null) {
345 waitingReadyReadFutures = oldWaitingReadyReadFutures;
346 }
347 }
348 return waitingReadyReadFutures;
349 }
350
351
352
353
354 public final WriteFuture write(Object message) {
355 return write(message, null);
356 }
357
358
359
360
361 public final WriteFuture write(Object message, SocketAddress remoteAddress) {
362 if (message == null) {
363 throw new NullPointerException("message");
364 }
365
366 if (!getTransportMetadata().isConnectionless() &&
367 remoteAddress != null) {
368 throw new UnsupportedOperationException();
369 }
370
371 if (isClosing() || !isConnected()) {
372 WriteFuture future = new DefaultWriteFuture(this);
373 WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
374 future.setException(new WriteToClosedSessionException(request));
375 return future;
376 }
377
378 FileChannel openedFileChannel = null;
379 try {
380 if (message instanceof IoBuffer
381 && !((IoBuffer) message).hasRemaining()) {
382 throw new IllegalArgumentException(
383 "message is empty. Forgot to call flip()?");
384 } else if (message instanceof FileChannel) {
385 FileChannel fileChannel = (FileChannel) message;
386 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
387 } else if (message instanceof File) {
388 File file = (File) message;
389 openedFileChannel = new FileInputStream(file).getChannel();
390 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
391 }
392 } catch (IOException e) {
393 ExceptionMonitor.getInstance().exceptionCaught(e);
394 return DefaultWriteFuture.newNotWrittenFuture(this, e);
395 }
396
397 WriteFuture future = new DefaultWriteFuture(this);
398 getFilterChain().fireFilterWrite(
399 new DefaultWriteRequest(message, future, remoteAddress));
400
401 if (openedFileChannel != null) {
402
403 final FileChannel finalChannel = openedFileChannel;
404 future.addListener(new IoFutureListener<WriteFuture>() {
405 public void operationComplete(WriteFuture future) {
406 try {
407 finalChannel.close();
408 } catch (IOException e) {
409 ExceptionMonitor.getInstance().exceptionCaught(e);
410 }
411 }
412 });
413 }
414
415 return future;
416 }
417
418
419
420
421 public final Object getAttachment() {
422 return getAttribute("");
423 }
424
425
426
427
428 public final Object setAttachment(Object attachment) {
429 return setAttribute("", attachment);
430 }
431
432
433
434
435 public final Object getAttribute(Object key) {
436 return getAttribute(key, null);
437 }
438
439
440
441
442 public final Object getAttribute(Object key, Object defaultValue) {
443 return attributes.getAttribute(this, key, defaultValue);
444 }
445
446
447
448
449 public final Object setAttribute(Object key, Object value) {
450 return attributes.setAttribute(this, key, value);
451 }
452
453
454
455
456 public final Object setAttribute(Object key) {
457 return setAttribute(key, Boolean.TRUE);
458 }
459
460
461
462
463 public final Object setAttributeIfAbsent(Object key, Object value) {
464 return attributes.setAttributeIfAbsent(this, key, value);
465 }
466
467
468
469
470 public final Object setAttributeIfAbsent(Object key) {
471 return setAttributeIfAbsent(key, Boolean.TRUE);
472 }
473
474
475
476
477 public final Object removeAttribute(Object key) {
478 return attributes.removeAttribute(this, key);
479 }
480
481
482
483
484 public final boolean removeAttribute(Object key, Object value) {
485 return attributes.removeAttribute(this, key, value);
486 }
487
488
489
490
491 public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
492 return attributes.replaceAttribute(this, key, oldValue, newValue);
493 }
494
495
496
497
498 public final boolean containsAttribute(Object key) {
499 return attributes.containsAttribute(this, key);
500 }
501
502
503
504
505 public final Set<Object> getAttributeKeys() {
506 return attributes.getAttributeKeys(this);
507 }
508
509
510
511
512 public final IoSessionAttributeMap getAttributeMap() {
513 return attributes;
514 }
515
516
517
518
519 public final void setAttributeMap(IoSessionAttributeMap attributes) {
520 this.attributes = attributes;
521 }
522
523
524
525
526 public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
527 this.writeRequestQueue =
528 new CloseRequestAwareWriteRequestQueue(writeRequestQueue);
529 }
530
531
532
533
534 public final TrafficMask getTrafficMask() {
535 return trafficMask;
536 }
537
538
539
540
541 public final void setTrafficMask(TrafficMask trafficMask) {
542 if (trafficMask == null) {
543 throw new NullPointerException("trafficMask");
544 }
545
546 if (isClosing() || !isConnected()) {
547 return;
548 }
549
550 getFilterChain().fireFilterSetTrafficMask(trafficMask);
551 }
552
553
554
555
556 public final void setTrafficMaskNow(TrafficMask trafficMask) {
557 this.trafficMask = trafficMask;
558 }
559
560
561
562
563 public final void suspendRead() {
564 setTrafficMask(getTrafficMask().and(TrafficMask.READ.not()));
565 }
566
567
568
569
570 public final void suspendWrite() {
571 setTrafficMask(getTrafficMask().and(TrafficMask.WRITE.not()));
572 }
573
574
575
576
577 public final void resumeRead() {
578 setTrafficMask(getTrafficMask().or(TrafficMask.READ));
579 }
580
581
582
583
584 public final void resumeWrite() {
585 setTrafficMask(getTrafficMask().or(TrafficMask.WRITE));
586 }
587
588
589
590
591 public final long getReadBytes() {
592 return readBytes;
593 }
594
595
596
597
598 public final long getWrittenBytes() {
599 return writtenBytes;
600 }
601
602
603
604
605 public final long getReadMessages() {
606 return readMessages;
607 }
608
609
610
611
612 public final long getWrittenMessages() {
613 return writtenMessages;
614 }
615
616
617
618
619 public final double getReadBytesThroughput() {
620 return readBytesThroughput;
621 }
622
623
624
625
626 public final double getWrittenBytesThroughput() {
627 return writtenBytesThroughput;
628 }
629
630
631
632
633 public final double getReadMessagesThroughput() {
634 return readMessagesThroughput;
635 }
636
637
638
639
640 public final double getWrittenMessagesThroughput() {
641 return writtenMessagesThroughput;
642 }
643
644
645
646
647
648
649
650
651
652
653
654
655 public final void updateThroughput(long currentTime, boolean force) {
656 int interval = (int) (currentTime - lastThroughputCalculationTime);
657
658 long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
659 if (minInterval == 0 || interval < minInterval) {
660 if (!force) {
661 return;
662 }
663 }
664
665 readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
666 writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
667 readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
668 writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
669
670 lastReadBytes = readBytes;
671 lastWrittenBytes = writtenBytes;
672 lastReadMessages = readMessages;
673 lastWrittenMessages = writtenMessages;
674
675 lastThroughputCalculationTime = currentTime;
676 }
677
678
679
680
681 public final long getScheduledWriteBytes() {
682 return scheduledWriteBytes.get();
683 }
684
685
686
687
688 public final int getScheduledWriteMessages() {
689 return scheduledWriteMessages.get();
690 }
691
692
693
694
695 protected void setScheduledWriteBytes(int byteCount){
696 scheduledWriteBytes.set(byteCount);
697 }
698
699
700
701
702 protected void setScheduledWriteMessages(int messages) {
703 scheduledWriteMessages.set(messages);
704 }
705
706
707
708
709 public final void increaseReadBytes(long increment, long currentTime) {
710 if (increment <= 0) {
711 return;
712 }
713
714 readBytes += increment;
715 lastReadTime = currentTime;
716 idleCountForBoth = 0;
717 idleCountForRead = 0;
718
719 if (getService() instanceof AbstractIoService) {
720 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
721 }
722 }
723
724
725
726
727 public final void increaseReadMessages(long currentTime) {
728 readMessages++;
729 lastReadTime = currentTime;
730 idleCountForBoth = 0;
731 idleCountForRead = 0;
732
733 if (getService() instanceof AbstractIoService) {
734 ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
735 }
736 }
737
738
739
740
741 public final void increaseWrittenBytes(int increment, long currentTime) {
742 if (increment <= 0) {
743 return;
744 }
745
746 writtenBytes += increment;
747 lastWriteTime = currentTime;
748 idleCountForBoth = 0;
749 idleCountForWrite = 0;
750
751 if (getService() instanceof AbstractIoService) {
752 ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
753 }
754
755 increaseScheduledWriteBytes(-increment);
756 }
757
758
759
760
761 public final void increaseWrittenMessages(
762 WriteRequest request, long currentTime) {
763 Object message = request.getMessage();
764 if (message instanceof IoBuffer) {
765 IoBuffer b = (IoBuffer) message;
766 if (b.hasRemaining()) {
767 return;
768 }
769 }
770
771 writtenMessages++;
772 lastWriteTime = currentTime;
773 if (getService() instanceof AbstractIoService) {
774 ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
775 }
776
777 decreaseScheduledWriteMessages();
778 }
779
780
781
782
783 public final void increaseScheduledWriteBytes(int increment) {
784 scheduledWriteBytes.addAndGet(increment);
785 if (getService() instanceof AbstractIoService) {
786 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
787 }
788 }
789
790
791
792
793 public final void increaseScheduledWriteMessages() {
794 scheduledWriteMessages.incrementAndGet();
795 if (getService() instanceof AbstractIoService) {
796 ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
797 }
798 }
799
800
801
802
803 private void decreaseScheduledWriteMessages() {
804 scheduledWriteMessages.decrementAndGet();
805 if (getService() instanceof AbstractIoService) {
806 ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
807 }
808 }
809
810
811
812
813 public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
814 Object message = request.getMessage();
815 if (message instanceof IoBuffer) {
816 IoBuffer b = (IoBuffer) message;
817 if (b.hasRemaining()) {
818 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
819 } else {
820 decreaseScheduledWriteMessages();
821 }
822 } else {
823 decreaseScheduledWriteMessages();
824 }
825 }
826
827
828
829
830 public final WriteRequestQueue getWriteRequestQueue() {
831 if (writeRequestQueue == null) {
832 throw new IllegalStateException();
833 }
834 return writeRequestQueue;
835 }
836
837
838
839
840 public final WriteRequest getCurrentWriteRequest() {
841 return currentWriteRequest;
842 }
843
844
845
846
847 public final Object getCurrentWriteMessage() {
848 WriteRequest req = getCurrentWriteRequest();
849 if (req == null) {
850 return null;
851 }
852 return req.getMessage();
853 }
854
855
856
857
858 public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
859 this.currentWriteRequest = currentWriteRequest;
860 }
861
862
863
864
865 public final void increaseReadBufferSize() {
866 int newReadBufferSize = getConfig().getReadBufferSize() << 1;
867 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
868 getConfig().setReadBufferSize(newReadBufferSize);
869 } else {
870 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
871 }
872
873 deferDecreaseReadBuffer = true;
874 }
875
876
877
878
879 public final void decreaseReadBufferSize() {
880 if (deferDecreaseReadBuffer) {
881 deferDecreaseReadBuffer = false;
882 return;
883 }
884
885 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
886 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
887 }
888
889 deferDecreaseReadBuffer = true;
890 }
891
892
893
894
895 public final long getCreationTime() {
896 return creationTime;
897 }
898
899
900
901
902 public final long getLastIoTime() {
903 return Math.max(lastReadTime, lastWriteTime);
904 }
905
906
907
908
909 public final long getLastReadTime() {
910 return lastReadTime;
911 }
912
913
914
915
916 public final long getLastWriteTime() {
917 return lastWriteTime;
918 }
919
920
921
922
923 public final boolean isIdle(IdleStatus status) {
924 if (status == IdleStatus.BOTH_IDLE) {
925 return idleCountForBoth > 0;
926 }
927
928 if (status == IdleStatus.READER_IDLE) {
929 return idleCountForRead > 0;
930 }
931
932 if (status == IdleStatus.WRITER_IDLE) {
933 return idleCountForWrite > 0;
934 }
935
936 throw new IllegalArgumentException("Unknown idle status: " + status);
937 }
938
939
940
941
942 public final boolean isBothIdle() {
943 return isIdle(IdleStatus.BOTH_IDLE);
944 }
945
946
947
948
949 public final boolean isReaderIdle() {
950 return isIdle(IdleStatus.READER_IDLE);
951 }
952
953
954
955
956 public final boolean isWriterIdle() {
957 return isIdle(IdleStatus.WRITER_IDLE);
958 }
959
960
961
962
963 public final int getIdleCount(IdleStatus status) {
964 if (getConfig().getIdleTime(status) == 0) {
965 if (status == IdleStatus.BOTH_IDLE) {
966 idleCountForBoth = 0;
967 }
968
969 if (status == IdleStatus.READER_IDLE) {
970 idleCountForRead = 0;
971 }
972
973 if (status == IdleStatus.WRITER_IDLE) {
974 idleCountForWrite = 0;
975 }
976 }
977
978 if (status == IdleStatus.BOTH_IDLE) {
979 return idleCountForBoth;
980 }
981
982 if (status == IdleStatus.READER_IDLE) {
983 return idleCountForRead;
984 }
985
986 if (status == IdleStatus.WRITER_IDLE) {
987 return idleCountForWrite;
988 }
989
990 throw new IllegalArgumentException("Unknown idle status: " + status);
991 }
992
993
994
995
996 public final long getLastIdleTime(IdleStatus status) {
997 if (status == IdleStatus.BOTH_IDLE) {
998 return lastIdleTimeForBoth;
999 }
1000
1001 if (status == IdleStatus.READER_IDLE) {
1002 return lastIdleTimeForRead;
1003 }
1004
1005 if (status == IdleStatus.WRITER_IDLE) {
1006 return lastIdleTimeForWrite;
1007 }
1008
1009 throw new IllegalArgumentException("Unknown idle status: " + status);
1010 }
1011
1012
1013
1014
1015 public final void increaseIdleCount(IdleStatus status, long currentTime) {
1016 if (status == IdleStatus.BOTH_IDLE) {
1017 idleCountForBoth++;
1018 lastIdleTimeForBoth = currentTime;
1019 } else if (status == IdleStatus.READER_IDLE) {
1020 idleCountForRead++;
1021 lastIdleTimeForRead = currentTime;
1022 } else if (status == IdleStatus.WRITER_IDLE) {
1023 idleCountForWrite++;
1024 lastIdleTimeForWrite = currentTime;
1025 } else {
1026 throw new IllegalArgumentException("Unknown idle status: " + status);
1027 }
1028 }
1029
1030
1031
1032
1033 public final int getBothIdleCount() {
1034 return getIdleCount(IdleStatus.BOTH_IDLE);
1035 }
1036
1037
1038
1039
1040 public final long getLastBothIdleTime() {
1041 return getLastIdleTime(IdleStatus.BOTH_IDLE);
1042 }
1043
1044
1045
1046
1047 public final long getLastReaderIdleTime() {
1048 return getLastIdleTime(IdleStatus.READER_IDLE);
1049 }
1050
1051
1052
1053
1054 public final long getLastWriterIdleTime() {
1055 return getLastIdleTime(IdleStatus.WRITER_IDLE);
1056 }
1057
1058
1059
1060
1061 public final int getReaderIdleCount() {
1062 return getIdleCount(IdleStatus.READER_IDLE);
1063 }
1064
1065
1066
1067
1068 public final int getWriterIdleCount() {
1069 return getIdleCount(IdleStatus.WRITER_IDLE);
1070 }
1071
1072
1073
1074
1075 public SocketAddress getServiceAddress() {
1076 IoService service = getService();
1077 if (service instanceof IoAcceptor) {
1078 return ((IoAcceptor) service).getLocalAddress();
1079 } else {
1080 return getRemoteAddress();
1081 }
1082 }
1083
1084
1085
1086
1087 @Override
1088 public final int hashCode() {
1089 return super.hashCode();
1090 }
1091
1092
1093
1094
1095
1096 @Override
1097 public final boolean equals(Object o) {
1098 return super.equals(o);
1099 }
1100
1101
1102
1103
1104 @Override
1105 public String toString() {
1106 if (getService() instanceof IoAcceptor) {
1107 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1108 getRemoteAddress() + " => " + getLocalAddress() + ')';
1109 } else {
1110 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1111 getLocalAddress() + " => " + getRemoteAddress() + ')';
1112 }
1113 }
1114
1115
1116
1117
1118 private String getIdAsString() {
1119 String id = Long.toHexString(getId()).toUpperCase();
1120
1121
1122
1123 while (id.length() < 8) {
1124 id = '0' + id;
1125 }
1126 id = "0x" + id;
1127
1128 return id;
1129 }
1130
1131
1132
1133
1134 private String getServiceName() {
1135 TransportMetadata tm = getTransportMetadata();
1136 if (tm == null) {
1137 return "null";
1138 } else {
1139 return tm.getProviderName() + ' ' + tm.getName();
1140 }
1141 }
1142
1143
1144
1145
1146 private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {
1147
1148 private final WriteRequestQueue q;
1149
1150 public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
1151 this.q = q;
1152 }
1153
1154 public synchronized WriteRequest poll(IoSession session) {
1155 WriteRequest answer = q.poll(session);
1156 if (answer == CLOSE_REQUEST) {
1157 AbstractIoSession.this.close();
1158 dispose(session);
1159 answer = null;
1160 }
1161 return answer;
1162 }
1163
1164 public void offer(IoSession session, WriteRequest e) {
1165 q.offer(session, e);
1166 }
1167
1168 public boolean isEmpty(IoSession session) {
1169 return q.isEmpty(session);
1170 }
1171
1172 public void clear(IoSession session) {
1173 q.clear(session);
1174 }
1175
1176 public void dispose(IoSession session) {
1177 q.dispose(session);
1178 }
1179 }
1180 }