1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
69
70 private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
71
72
73
74
75
76 private static final long SELECT_TIMEOUT = 1000L;
77
78
79 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
80
81
82 private final String threadName;
83
84
85 private final Executor executor;
86
87
88 private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
89
90
91 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
92
93
94 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
95
96
97
98
99
100 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();
101
102
103 private final AtomicReference<Processor> processorRef = new AtomicReference<>();
104
105 private long lastIdleCheckTime;
106
107 private final Object disposalLock = new Object();
108
109 private volatile boolean disposing;
110
111 private volatile boolean disposed;
112
113 private final DefaultIoFuturee.html#DefaultIoFuture">DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114
115 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116
117
118
119
120
121
122
123
124 protected AbstractPollingIoProcessor(Executor executor) {
125 if (executor == null) {
126 throw new IllegalArgumentException("executor");
127 }
128
129 this.threadName = nextThreadName();
130 this.executor = executor;
131 }
132
133
134
135
136
137
138
139
140
141 private String nextThreadName() {
142 Class<?> cls = getClass();
143 int newThreadId;
144
145 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146
147 if (threadId == null) {
148 newThreadId = 1;
149 } else {
150
151 newThreadId = threadId.incrementAndGet();
152 }
153
154
155 return cls.getSimpleName() + '-' + newThreadId;
156 }
157
158
159
160
161 @Override
162 public final boolean isDisposing() {
163 return disposing;
164 }
165
166
167
168
169 @Override
170 public final boolean isDisposed() {
171 return disposed;
172 }
173
174
175
176
177 @Override
178 public final void dispose() {
179 if (disposed || disposing) {
180 return;
181 }
182
183 synchronized (disposalLock) {
184 disposing = true;
185 startupProcessor();
186 }
187
188 disposalFuture.awaitUninterruptibly();
189 disposed = true;
190 }
191
192
193
194
195
196
197
198
199
200 protected abstract void doDispose() throws Exception;
201
202
203
204
205
206
207
208
209
210
211 protected abstract int select(long timeout) throws Exception;
212
213
214
215
216
217
218
219
220 protected abstract int select() throws Exception;
221
222
223
224
225
226
227
228
229 protected abstract boolean isSelectorEmpty();
230
231
232
233
234 protected abstract void wakeup();
235
236
237
238
239
240
241
242 protected abstract Iterator<S> allSessions();
243
244
245
246
247
248
249 protected abstract int allSessionsCount();
250
251
252
253
254
255
256
257 protected abstract Iterator<S> selectedSessions();
258
259
260
261
262
263
264
265
266 protected abstract SessionState getState(S session);
267
268
269
270
271
272
273
274
275 protected abstract boolean isWritable(S session);
276
277
278
279
280
281
282
283
284 protected abstract boolean isReadable(S session);
285
286
287
288
289
290
291
292
293
294
295
296 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
297
298
299
300
301
302
303
304
305
306
307
308 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
309
310
311
312
313
314
315
316
317 protected abstract boolean isInterestedInRead(S session);
318
319
320
321
322
323
324
325
326 protected abstract boolean isInterestedInWrite(S session);
327
328
329
330
331
332
333
334
335
336 protected abstract void init(S session) throws Exception;
337
338
339
340
341
342
343
344
345
346 protected abstract void destroy(S session) throws Exception;
347
348
349
350
351
352
353
354
355
356
357
358
359
360 protected abstract int read(S session, IoBuffer buf) throws Exception;
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377 protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
396
397
398
399
400 @Override
401 public final void add(S session) {
402 if (disposed || disposing) {
403 throw new IllegalStateException("Already disposed.");
404 }
405
406
407 newSessions.add(session);
408 startupProcessor();
409 }
410
411
412
413
414 @Override
415 public final void remove(S session) {
416 scheduleRemove(session);
417 startupProcessor();
418 }
419
420 private void scheduleRemove(S session) {
421 if (!removingSessions.contains(session)) {
422 removingSessions.add(session);
423 }
424 }
425
426
427
428
429 @Override
430 public void write(S session, WriteRequest writeRequest) {
431 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
432
433 writeRequestQueue.offer(session, writeRequest);
434
435 if (!session.isWriteSuspended()) {
436 this.flush(session);
437 }
438 }
439
440
441
442
443 @Override
444 public final void flush(S session) {
445
446
447 if (session.setScheduledForFlush(true)) {
448 flushingSessions.add(session);
449 wakeup();
450 }
451 }
452
453
454
455
456
457
458
459 public final void updateTrafficMask(S session) {
460 trafficControllingSessions.add(session);
461 wakeup();
462 }
463
464
465
466
467
468 private void startupProcessor() {
469 Processor processor = processorRef.get();
470
471 if (processor == null) {
472 processor = new Processor();
473
474 if (processorRef.compareAndSet(null, processor)) {
475 executor.execute(new NamePreservingRunnable(processor, threadName));
476 }
477 }
478
479
480
481 wakeup();
482 }
483
484
485
486
487
488
489
490
491
492 protected abstract void registerNewSelector() throws IOException;
493
494
495
496
497
498
499
500
501
502
503 protected abstract boolean isBrokenConnection() throws IOException;
504
505 private void read(S session) {
506 IoSessionConfig config = session.getConfig();
507 int bufferSize = config.getReadBufferSize();
508 IoBuffer buf = IoBuffer.allocate(bufferSize);
509
510 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
511
512 try {
513 int readBytes = 0;
514 int ret;
515
516 try {
517 if (hasFragmentation) {
518
519 while ((ret = read(session, buf)) > 0) {
520 readBytes += ret;
521
522 if (!buf.hasRemaining()) {
523 break;
524 }
525 }
526 } else {
527 ret = read(session, buf);
528
529 if (ret > 0) {
530 readBytes = ret;
531 }
532 }
533 } finally {
534 buf.flip();
535 }
536
537 if (readBytes > 0) {
538 IoFilterChain filterChain = session.getFilterChain();
539 filterChain.fireMessageReceived(buf);
540 buf = null;
541
542 if (hasFragmentation) {
543 if (readBytes << 1 < config.getReadBufferSize()) {
544 session.decreaseReadBufferSize();
545 } else if (readBytes == config.getReadBufferSize()) {
546 session.increaseReadBufferSize();
547 }
548 }
549 } else {
550
551 buf.free();
552 }
553
554 if (ret < 0) {
555 IoFilterChain filterChain = session.getFilterChain();
556 filterChain.fireInputClosed();
557 }
558 } catch (Exception e) {
559 if ((e instanceof IOException) &&
560 (!(e instanceof PortUnreachableException)
561 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
562 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable())) {
563 scheduleRemove(session);
564 }
565
566 IoFilterChain filterChain = session.getFilterChain();
567 filterChain.fireExceptionCaught(e);
568 }
569 }
570
571
572
573
574 @Override
575 public void updateTrafficControl(S session) {
576
577 try {
578 setInterestedInRead(session, !session.isReadSuspended());
579 } catch (Exception e) {
580 IoFilterChain filterChain = session.getFilterChain();
581 filterChain.fireExceptionCaught(e);
582 }
583
584 try {
585 setInterestedInWrite(session,
586 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
587 } catch (Exception e) {
588 IoFilterChain filterChain = session.getFilterChain();
589 filterChain.fireExceptionCaught(e);
590 }
591 }
592
593
594
595
596
597
598 private class Processor implements Runnable {
599
600
601
602 @Override
603 public void run() {
604 assert processorRef.get() == this;
605
606 lastIdleCheckTime = System.currentTimeMillis();
607 int nbTries = 10;
608
609 for (;;) {
610 try {
611
612
613
614
615 long t0 = System.currentTimeMillis();
616 int selected = select(SELECT_TIMEOUT);
617 long t1 = System.currentTimeMillis();
618 long delta = t1 - t0;
619
620 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
621
622
623 if (isBrokenConnection()) {
624 LOG.warn("Broken connection");
625 } else {
626
627
628
629
630
631
632
633
634
635
636
637
638
639 if (nbTries == 0) {
640 LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
641 registerNewSelector();
642 nbTries = 10;
643 } else {
644 nbTries--;
645 }
646 }
647 } else {
648 nbTries = 10;
649 }
650
651
652 if(handleNewSessions() == 0) {
653
654
655 if (allSessionsCount() == 0) {
656 processorRef.set(null);
657
658 if (newSessions.isEmpty() && isSelectorEmpty()) {
659
660 assert processorRef.get() != this;
661 break;
662 }
663
664 assert processorRef.get() != this;
665
666 if (!processorRef.compareAndSet(null, this)) {
667
668 assert processorRef.get() != this;
669 break;
670 }
671
672 assert processorRef.get() == this;
673 }
674 }
675
676 updateTrafficMask();
677
678
679
680 if (selected > 0) {
681
682
683 process();
684 }
685
686
687 long currentTime = System.currentTimeMillis();
688 flush(currentTime);
689
690
691 notifyIdleSessions(currentTime);
692
693
694 removeSessions();
695
696
697
698 if (isDisposing()) {
699 boolean hasKeys = false;
700
701 for (Iterator<S> i = allSessions(); i.hasNext();) {
702 IoSession session = i.next();
703
704 scheduleRemove((S) session);
705
706 if (session.isActive()) {
707 hasKeys = true;
708 }
709 }
710
711 wakeup();
712 }
713 } catch (ClosedSelectorException cse) {
714
715
716 ExceptionMonitor.getInstance().exceptionCaught(cse);
717 break;
718 } catch (Exception e) {
719 ExceptionMonitor.getInstance().exceptionCaught(e);
720
721 try {
722 Thread.sleep(1000);
723 } catch (InterruptedException e1) {
724 ExceptionMonitor.getInstance().exceptionCaught(e1);
725 }
726 }
727 }
728
729 try {
730 synchronized (disposalLock) {
731 if (disposing) {
732 doDispose();
733 }
734 }
735 } catch (Exception e) {
736 ExceptionMonitor.getInstance().exceptionCaught(e);
737 } finally {
738 disposalFuture.setValue(true);
739 }
740 }
741
742
743
744
745
746
747
748 private int handleNewSessions() {
749 int addedSessions = 0;
750
751 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
752 if (addNow(session)) {
753
754 addedSessions++;
755 }
756 }
757
758 return addedSessions;
759 }
760
761 private void notifyIdleSessions(long currentTime) throws Exception {
762
763 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
764 lastIdleCheckTime = currentTime;
765 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
766 }
767 }
768
769
770
771
772 private void updateTrafficMask() {
773 int queueSize = trafficControllingSessions.size();
774
775 while (queueSize > 0) {
776 S session = trafficControllingSessions.poll();
777
778 if (session == null) {
779
780 return;
781 }
782
783 SessionState state = getState(session);
784
785 switch (state) {
786 case OPENED:
787 updateTrafficControl(session);
788
789 break;
790
791 case CLOSING:
792 break;
793
794 case OPENING:
795
796
797
798
799 trafficControllingSessions.add(session);
800 break;
801
802 default:
803 throw new IllegalStateException(String.valueOf(state));
804 }
805
806
807
808
809
810
811 queueSize--;
812 }
813 }
814
815
816
817
818
819
820
821
822
823 private boolean addNow(S session) {
824 boolean registered = false;
825
826 try {
827 init(session);
828 registered = true;
829
830
831 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
832 chainBuilder.buildFilterChain(session.getFilterChain());
833
834
835
836
837 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
838 listeners.fireSessionCreated(session);
839 } catch (Exception e) {
840 ExceptionMonitor.getInstance().exceptionCaught(e);
841
842 try {
843 destroy(session);
844 } catch (Exception e1) {
845 ExceptionMonitor.getInstance().exceptionCaught(e1);
846 } finally {
847 registered = false;
848 }
849 }
850
851 return registered;
852 }
853
854 private int removeSessions() {
855 int removedSessions = 0;
856
857 for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
858 SessionState state = getState(session);
859
860
861 switch (state) {
862 case OPENED:
863
864 if (removeNow(session)) {
865 removedSessions++;
866 }
867
868 break;
869
870 case CLOSING:
871
872
873 removedSessions++;
874 break;
875
876 case OPENING:
877
878
879 newSessions.remove(session);
880
881 if (removeNow(session)) {
882 removedSessions++;
883 }
884
885 break;
886
887 default:
888 throw new IllegalStateException(String.valueOf(state));
889 }
890 }
891
892 return removedSessions;
893 }
894
895
896
897
898 private void flush(long currentTime) {
899 if (flushingSessions.isEmpty()) {
900 return;
901 }
902
903 do {
904 S session = flushingSessions.poll();
905
906
907 if (session == null) {
908
909 break;
910 }
911
912
913
914
915
916 session.unscheduledForFlush();
917
918 SessionState state = getState(session);
919
920 switch (state) {
921 case OPENED:
922 try {
923 boolean flushedAll = flushNow(session, currentTime);
924
925 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
926 && !session.isScheduledForFlush()) {
927 scheduleFlush(session);
928 }
929 } catch (Exception e) {
930 scheduleRemove(session);
931 session.closeNow();
932 IoFilterChain filterChain = session.getFilterChain();
933 filterChain.fireExceptionCaught(e);
934 }
935
936 break;
937
938 case CLOSING:
939
940 break;
941
942 case OPENING:
943
944
945
946 scheduleFlush(session);
947 return;
948
949 default:
950 throw new IllegalStateException(String.valueOf(state));
951 }
952
953 } while (!flushingSessions.isEmpty());
954 }
955
956 private boolean flushNow(S session, long currentTime) {
957 if (!session.isConnected()) {
958 scheduleRemove(session);
959 return false;
960 }
961
962 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
963
964 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
965
966
967
968
969 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
970 + (session.getConfig().getMaxReadBufferSize() >>> 1);
971 int writtenBytes = 0;
972 WriteRequest req = null;
973
974 try {
975
976 setInterestedInWrite(session, false);
977
978 do {
979
980 req = session.getCurrentWriteRequest();
981
982 if (req == null) {
983 req = writeRequestQueue.poll(session);
984
985 if (req == null) {
986 break;
987 }
988
989 session.setCurrentWriteRequest(req);
990 }
991
992 int localWrittenBytes;
993 Object message = req.getMessage();
994
995 if (message instanceof IoBuffer) {
996 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
997 currentTime);
998
999 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
1000
1001 setInterestedInWrite(session, true);
1002
1003 return false;
1004 }
1005 } else if (message instanceof FileRegion) {
1006 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
1007 currentTime);
1008
1009
1010
1011
1012
1013
1014 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
1015 setInterestedInWrite(session, true);
1016
1017 return false;
1018 }
1019 } else {
1020 throw new IllegalStateException("Don't know how to handle message of type '"
1021 + message.getClass().getName() + "'. Are you missing a protocol encoder?");
1022 }
1023
1024 if (localWrittenBytes == 0) {
1025
1026
1027 if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
1028 setInterestedInWrite(session, true);
1029 return false;
1030 }
1031 } else {
1032 writtenBytes += localWrittenBytes;
1033
1034 if (writtenBytes >= maxWrittenBytes) {
1035
1036 scheduleFlush(session);
1037 return false;
1038 }
1039 }
1040
1041 if (message instanceof IoBuffer) {
1042 ((IoBuffer) message).free();
1043 }
1044 } while (writtenBytes < maxWrittenBytes);
1045 } catch (Exception e) {
1046 if (req != null) {
1047 req.getFuture().setException(e);
1048 }
1049
1050 IoFilterChain filterChain = session.getFilterChain();
1051 filterChain.fireExceptionCaught(e);
1052 return false;
1053 }
1054
1055 return true;
1056 }
1057
1058 private void scheduleFlush(S session) {
1059
1060
1061 if (session.setScheduledForFlush(true)) {
1062 flushingSessions.add(session);
1063 }
1064 }
1065
1066 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
1067 throws Exception {
1068 int localWrittenBytes;
1069 FileRegion./../../../org/apache/mina/core/file/FileRegion.html#FileRegion">FileRegion region = (FileRegion) req.getMessage();
1070
1071 if (region.getRemainingBytes() > 0) {
1072 int length;
1073
1074 if (hasFragmentation) {
1075 length = (int) Math.min(region.getRemainingBytes(), maxLength);
1076 } else {
1077 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
1078 }
1079
1080 localWrittenBytes = transferFile(session, region, length);
1081 region.update(localWrittenBytes);
1082 } else {
1083 localWrittenBytes = 0;
1084 }
1085
1086 session.increaseWrittenBytes(localWrittenBytes, currentTime);
1087
1088 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
1089 fireMessageSent(session, req);
1090 }
1091
1092 return localWrittenBytes;
1093 }
1094
1095 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
1096 IoBuffer"../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) req.getMessage();
1097 int localWrittenBytes = 0;
1098
1099 if (buf.hasRemaining()) {
1100 int length;
1101
1102 if (hasFragmentation) {
1103 length = Math.min(buf.remaining(), maxLength);
1104 } else {
1105 length = buf.remaining();
1106 }
1107
1108 try {
1109 localWrittenBytes = write(session, buf, length);
1110 } catch (IOException ioe) {
1111
1112
1113 buf.free();
1114 session.closeNow();
1115 this.removeNow(session);
1116
1117 return 0;
1118 }
1119
1120 session.increaseWrittenBytes(localWrittenBytes, currentTime);
1121
1122
1123 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
1124 this.fireMessageSent(session, req);
1125 }
1126 } else {
1127 this.fireMessageSent(session, req);
1128 }
1129
1130 return localWrittenBytes;
1131 }
1132
1133 private boolean removeNow(S session) {
1134 clearWriteRequestQueue(session);
1135
1136 try {
1137 destroy(session);
1138 return true;
1139 } catch (Exception e) {
1140 IoFilterChain filterChain = session.getFilterChain();
1141 filterChain.fireExceptionCaught(e);
1142 } finally {
1143 try {
1144 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
1145 } catch (Exception e) {
1146
1147
1148
1149
1150 IoFilterChain filterChain = session.getFilterChain();
1151 filterChain.fireExceptionCaught(e);
1152 } finally {
1153 clearWriteRequestQueue(session);
1154 }
1155 }
1156
1157 return false;
1158 }
1159
1160 private void clearWriteRequestQueue(S session) {
1161 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
1162 WriteRequest req;
1163
1164 List<WriteRequest> failedRequests = new ArrayList<>();
1165
1166 if ((req = writeRequestQueue.poll(session)) != null) {
1167 Object message = req.getMessage();
1168
1169 if (message instanceof IoBuffer) {
1170 IoBuffer"../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) message;
1171
1172
1173
1174 if (buf.hasRemaining()) {
1175 failedRequests.add(req);
1176 } else {
1177 IoFilterChain filterChain = session.getFilterChain();
1178 filterChain.fireMessageSent(req);
1179 }
1180 } else {
1181 failedRequests.add(req);
1182 }
1183
1184
1185 while ((req = writeRequestQueue.poll(session)) != null) {
1186 failedRequests.add(req);
1187 }
1188 }
1189
1190
1191 if (!failedRequests.isEmpty()) {
1192 WriteToClosedSessionExceptionException.html#WriteToClosedSessionException">WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
1193
1194 for (WriteRequest r : failedRequests) {
1195 session.decreaseScheduledBytesAndMessages(r);
1196 r.getFuture().setException(cause);
1197 }
1198
1199 IoFilterChain filterChain = session.getFilterChain();
1200 filterChain.fireExceptionCaught(cause);
1201 }
1202 }
1203
1204 private void fireMessageSent(S session, WriteRequest req) {
1205 session.setCurrentWriteRequest(null);
1206 IoFilterChain filterChain = session.getFilterChain();
1207 filterChain.fireMessageSent(req);
1208 }
1209
1210 private void process() throws Exception {
1211 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
1212 S session = i.next();
1213 process(session);
1214 i.remove();
1215 }
1216 }
1217
1218
1219
1220
1221 private void process(S session) {
1222
1223 if (isReadable(session) && !session.isReadSuspended()) {
1224 read(session);
1225 }
1226
1227
1228 if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
1229
1230 flushingSessions.add(session);
1231 }
1232 }
1233 }
1234 }