1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
69
70 private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
71
72
73
74
75
76 private static final long SELECT_TIMEOUT = 1000L;
77
78
79 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
80
81
82 private final String threadName;
83
84
85 private final Executor executor;
86
87
88 private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
89
90
91 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
92
93
94 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
95
96
97
98
99
100 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();
101
102
103 private final AtomicReference<Processor> processorRef = new AtomicReference<>();
104
105 private long lastIdleCheckTime;
106
107 private final Object disposalLock = new Object();
108
109 private volatile boolean disposing;
110
111 private volatile boolean disposed;
112
113 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114
115 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116
117
118
119
120
121
122
123
124 protected AbstractPollingIoProcessor(Executor executor) {
125 if (executor == null) {
126 throw new IllegalArgumentException("executor");
127 }
128
129 this.threadName = nextThreadName();
130 this.executor = executor;
131 }
132
133
134
135
136
137
138
139
140
141 private String nextThreadName() {
142 Class<?> cls = getClass();
143 int newThreadId;
144
145 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146
147 if (threadId == null) {
148 newThreadId = 1;
149 } else {
150
151 newThreadId = threadId.incrementAndGet();
152 }
153
154
155 return cls.getSimpleName() + '-' + newThreadId;
156 }
157
158
159
160
161 @Override
162 public final boolean isDisposing() {
163 return disposing;
164 }
165
166
167
168
169 @Override
170 public final boolean isDisposed() {
171 return disposed;
172 }
173
174
175
176
177 @Override
178 public final void dispose() {
179 if (disposed || disposing) {
180 return;
181 }
182
183 synchronized (disposalLock) {
184 disposing = true;
185 startupProcessor();
186 }
187
188 disposalFuture.awaitUninterruptibly();
189 disposed = true;
190 }
191
192
193
194
195
196
197
198
199
200 protected abstract void doDispose() throws Exception;
201
202
203
204
205
206
207
208
209
210
211 protected abstract int select(long timeout) throws Exception;
212
213
214
215
216
217
218
219
220 protected abstract int select() throws Exception;
221
222
223
224
225
226
227
228 protected abstract boolean isSelectorEmpty();
229
230
231
232
233 protected abstract void wakeup();
234
235
236
237
238
239
240
241 protected abstract Iterator<S> allSessions();
242
243
244
245
246
247
248
249 protected abstract Iterator<S> selectedSessions();
250
251
252
253
254
255
256
257 protected abstract SessionState getState(S session);
258
259
260
261
262
263
264
265
266 protected abstract boolean isWritable(S session);
267
268
269
270
271
272
273
274 protected abstract boolean isReadable(S session);
275
276
277
278
279
280
281
282
283 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
284
285
286
287
288
289
290
291
292 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
293
294
295
296
297
298
299
300 protected abstract boolean isInterestedInRead(S session);
301
302
303
304
305
306
307
308 protected abstract boolean isInterestedInWrite(S session);
309
310
311
312
313
314
315
316 protected abstract void init(S session) throws Exception;
317
318
319
320
321
322
323
324 protected abstract void destroy(S session) throws Exception;
325
326
327
328
329
330
331
332
333
334
335 protected abstract int read(S session, IoBuffer buf) throws Exception;
336
337
338
339
340
341
342
343
344
345
346
347
348 protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
349
350
351
352
353
354
355
356
357
358
359
360
361
362 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
363
364
365
366
367 @Override
368 public final void add(S session) {
369 if (disposed || disposing) {
370 throw new IllegalStateException("Already disposed.");
371 }
372
373
374 newSessions.add(session);
375 startupProcessor();
376 }
377
378
379
380
381 @Override
382 public final void remove(S session) {
383 scheduleRemove(session);
384 startupProcessor();
385 }
386
387 private void scheduleRemove(S session) {
388 if (!removingSessions.contains(session)) {
389 removingSessions.add(session);
390 }
391 }
392
393
394
395
396 @Override
397 public void write(S session, WriteRequest writeRequest) {
398 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
399
400 writeRequestQueue.offer(session, writeRequest);
401
402 if (!session.isWriteSuspended()) {
403 this.flush(session);
404 }
405 }
406
407
408
409
410 @Override
411 public final void flush(S session) {
412
413
414 if (session.setScheduledForFlush(true)) {
415 flushingSessions.add(session);
416 wakeup();
417 }
418 }
419
420 private void scheduleFlush(S session) {
421
422
423 if (session.setScheduledForFlush(true)) {
424 flushingSessions.add(session);
425 }
426 }
427
428
429
430
431
432
433 public final void updateTrafficMask(S session) {
434 trafficControllingSessions.add(session);
435 wakeup();
436 }
437
438
439
440
441
442 private void startupProcessor() {
443 Processor processor = processorRef.get();
444
445 if (processor == null) {
446 processor = new Processor();
447
448 if (processorRef.compareAndSet(null, processor)) {
449 executor.execute(new NamePreservingRunnable(processor, threadName));
450 }
451 }
452
453
454
455 wakeup();
456 }
457
458
459
460
461
462
463
464
465 protected abstract void registerNewSelector() throws IOException;
466
467
468
469
470
471
472
473
474
475 protected abstract boolean isBrokenConnection() throws IOException;
476
477
478
479
480
481
482
483 private int handleNewSessions() {
484 int addedSessions = 0;
485
486 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
487 if (addNow(session)) {
488
489 addedSessions++;
490 }
491 }
492
493 return addedSessions;
494 }
495
496
497
498
499
500
501
502
503 private boolean addNow(S session) {
504 boolean registered = false;
505
506 try {
507 init(session);
508 registered = true;
509
510
511 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
512 chainBuilder.buildFilterChain(session.getFilterChain());
513
514
515
516
517 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
518 listeners.fireSessionCreated(session);
519 } catch (Exception e) {
520 ExceptionMonitor.getInstance().exceptionCaught(e);
521
522 try {
523 destroy(session);
524 } catch (Exception e1) {
525 ExceptionMonitor.getInstance().exceptionCaught(e1);
526 } finally {
527 registered = false;
528 }
529 }
530
531 return registered;
532 }
533
534 private int removeSessions() {
535 int removedSessions = 0;
536
537 for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
538 SessionState state = getState(session);
539
540
541 switch (state) {
542 case OPENED:
543
544 if (removeNow(session)) {
545 removedSessions++;
546 }
547
548 break;
549
550 case CLOSING:
551
552
553 removedSessions++;
554 break;
555
556 case OPENING:
557
558
559 newSessions.remove(session);
560
561 if (removeNow(session)) {
562 removedSessions++;
563 }
564
565 break;
566
567 default:
568 throw new IllegalStateException(String.valueOf(state));
569 }
570 }
571
572 return removedSessions;
573 }
574
575 private boolean removeNow(S session) {
576 clearWriteRequestQueue(session);
577
578 try {
579 destroy(session);
580 return true;
581 } catch (Exception e) {
582 IoFilterChain filterChain = session.getFilterChain();
583 filterChain.fireExceptionCaught(e);
584 } finally {
585 try {
586 clearWriteRequestQueue(session);
587 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
588 } catch (Exception e) {
589
590
591
592 IoFilterChain filterChain = session.getFilterChain();
593 filterChain.fireExceptionCaught(e);
594 }
595 }
596
597 return false;
598 }
599
600 private void clearWriteRequestQueue(S session) {
601 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
602 WriteRequest req;
603
604 List<WriteRequest> failedRequests = new ArrayList<>();
605
606 if ((req = writeRequestQueue.poll(session)) != null) {
607 Object message = req.getMessage();
608
609 if (message instanceof IoBuffer) {
610 IoBuffer buf = (IoBuffer) message;
611
612
613
614 if (buf.hasRemaining()) {
615 buf.reset();
616 failedRequests.add(req);
617 } else {
618 IoFilterChain filterChain = session.getFilterChain();
619 filterChain.fireMessageSent(req);
620 }
621 } else {
622 failedRequests.add(req);
623 }
624
625
626 while ((req = writeRequestQueue.poll(session)) != null) {
627 failedRequests.add(req);
628 }
629 }
630
631
632 if (!failedRequests.isEmpty()) {
633 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
634
635 for (WriteRequest r : failedRequests) {
636 session.decreaseScheduledBytesAndMessages(r);
637 r.getFuture().setException(cause);
638 }
639
640 IoFilterChain filterChain = session.getFilterChain();
641 filterChain.fireExceptionCaught(cause);
642 }
643 }
644
645 private void process() throws Exception {
646 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
647 S session = i.next();
648 process(session);
649 i.remove();
650 }
651 }
652
653
654
655
656 private void process(S session) {
657
658 if (isReadable(session) && !session.isReadSuspended()) {
659 read(session);
660 }
661
662
663 if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
664
665 flushingSessions.add(session);
666 }
667 }
668
669 private void read(S session) {
670 IoSessionConfig config = session.getConfig();
671 int bufferSize = config.getReadBufferSize();
672 IoBuffer buf = IoBuffer.allocate(bufferSize);
673
674 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
675
676 try {
677 int readBytes = 0;
678 int ret;
679
680 try {
681 if (hasFragmentation) {
682
683 while ((ret = read(session, buf)) > 0) {
684 readBytes += ret;
685
686 if (!buf.hasRemaining()) {
687 break;
688 }
689 }
690 } else {
691 ret = read(session, buf);
692
693 if (ret > 0) {
694 readBytes = ret;
695 }
696 }
697 } finally {
698 buf.flip();
699 }
700
701 if (readBytes > 0) {
702 IoFilterChain filterChain = session.getFilterChain();
703 filterChain.fireMessageReceived(buf);
704 buf = null;
705
706 if (hasFragmentation) {
707 if (readBytes << 1 < config.getReadBufferSize()) {
708 session.decreaseReadBufferSize();
709 } else if (readBytes == config.getReadBufferSize()) {
710 session.increaseReadBufferSize();
711 }
712 }
713 }
714
715 if (ret < 0) {
716 IoFilterChain filterChain = session.getFilterChain();
717 filterChain.fireInputClosed();
718 }
719 } catch (Exception e) {
720 if (e instanceof IOException) {
721 if (!(e instanceof PortUnreachableException)
722 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
723 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
724 scheduleRemove(session);
725 }
726 }
727
728 IoFilterChain filterChain = session.getFilterChain();
729 filterChain.fireExceptionCaught(e);
730 }
731 }
732
733 private void notifyIdleSessions(long currentTime) throws Exception {
734
735 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
736 lastIdleCheckTime = currentTime;
737 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
738 }
739 }
740
741
742
743
744 private void flush(long currentTime) {
745 if (flushingSessions.isEmpty()) {
746 return;
747 }
748
749 do {
750 S session = flushingSessions.poll();
751
752
753 if (session == null) {
754
755 break;
756 }
757
758
759
760 session.unscheduledForFlush();
761
762 SessionState state = getState(session);
763
764 switch (state) {
765 case OPENED:
766 try {
767 boolean flushedAll = flushNow(session, currentTime);
768
769 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
770 && !session.isScheduledForFlush()) {
771 scheduleFlush(session);
772 }
773 } catch (Exception e) {
774 scheduleRemove(session);
775 session.closeNow();
776 IoFilterChain filterChain = session.getFilterChain();
777 filterChain.fireExceptionCaught(e);
778 }
779
780 break;
781
782 case CLOSING:
783
784 break;
785
786 case OPENING:
787
788
789
790 scheduleFlush(session);
791 return;
792
793 default:
794 throw new IllegalStateException(String.valueOf(state));
795 }
796
797 } while (!flushingSessions.isEmpty());
798 }
799
800 private boolean flushNow(S session, long currentTime) {
801 if (!session.isConnected()) {
802 scheduleRemove(session);
803 return false;
804 }
805
806 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
807
808 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
809
810
811
812
813 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
814 + (session.getConfig().getMaxReadBufferSize() >>> 1);
815 int writtenBytes = 0;
816 WriteRequest req = null;
817
818 try {
819
820 setInterestedInWrite(session, false);
821
822 do {
823
824 req = session.getCurrentWriteRequest();
825
826 if (req == null) {
827 req = writeRequestQueue.poll(session);
828
829 if (req == null) {
830 break;
831 }
832
833 session.setCurrentWriteRequest(req);
834 }
835
836 int localWrittenBytes;
837 Object message = req.getMessage();
838
839 if (message instanceof IoBuffer) {
840 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
841 currentTime);
842
843 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
844
845 writtenBytes += localWrittenBytes;
846 setInterestedInWrite(session, true);
847 return false;
848 }
849 } else if (message instanceof FileRegion) {
850 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
851 currentTime);
852
853
854
855
856
857
858 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
859 writtenBytes += localWrittenBytes;
860 setInterestedInWrite(session, true);
861 return false;
862 }
863 } else {
864 throw new IllegalStateException("Don't know how to handle message of type '"
865 + message.getClass().getName() + "'. Are you missing a protocol encoder?");
866 }
867
868 if (localWrittenBytes == 0) {
869
870
871 if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
872 setInterestedInWrite(session, true);
873 return false;
874 }
875 } else {
876 writtenBytes += localWrittenBytes;
877
878 if (writtenBytes >= maxWrittenBytes) {
879
880 scheduleFlush(session);
881 return false;
882 }
883 }
884
885 if (message instanceof IoBuffer) {
886 ((IoBuffer) message).free();
887 }
888 } while (writtenBytes < maxWrittenBytes);
889 } catch (Exception e) {
890 if (req != null) {
891 req.getFuture().setException(e);
892 }
893
894 IoFilterChain filterChain = session.getFilterChain();
895 filterChain.fireExceptionCaught(e);
896 return false;
897 }
898
899 return true;
900 }
901
902 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
903 throws Exception {
904 IoBuffer buf = (IoBuffer) req.getMessage();
905 int localWrittenBytes = 0;
906
907 if (buf.hasRemaining()) {
908 int length;
909
910 if (hasFragmentation) {
911 length = Math.min(buf.remaining(), maxLength);
912 } else {
913 length = buf.remaining();
914 }
915
916 try {
917 localWrittenBytes = write(session, buf, length);
918 } catch (IOException ioe) {
919
920
921 buf.free();
922 session.closeNow();
923 removeNow(session);
924
925 return 0;
926 }
927 }
928
929 session.increaseWrittenBytes(localWrittenBytes, currentTime);
930
931
932 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
933
934 Object originalMessage = req.getOriginalRequest().getMessage();
935
936 if (originalMessage instanceof IoBuffer) {
937 buf = ((IoBuffer)req.getOriginalRequest().getMessage());
938
939 int pos = buf.position();
940 buf.reset();
941 fireMessageSent(session, req);
942
943 buf.position(pos);
944 } else {
945 fireMessageSent(session, req);
946 }
947 }
948
949 return localWrittenBytes;
950 }
951
952 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
953 throws Exception {
954 int localWrittenBytes;
955 FileRegion region = (FileRegion) req.getMessage();
956
957 if (region.getRemainingBytes() > 0) {
958 int length;
959
960 if (hasFragmentation) {
961 length = (int) Math.min(region.getRemainingBytes(), maxLength);
962 } else {
963 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
964 }
965
966 localWrittenBytes = transferFile(session, region, length);
967 region.update(localWrittenBytes);
968 } else {
969 localWrittenBytes = 0;
970 }
971
972 session.increaseWrittenBytes(localWrittenBytes, currentTime);
973
974 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
975 fireMessageSent(session, req);
976 }
977
978 return localWrittenBytes;
979 }
980
981 private void fireMessageSent(S session, WriteRequest req) {
982 session.setCurrentWriteRequest(null);
983 IoFilterChain filterChain = session.getFilterChain();
984 filterChain.fireMessageSent(req);
985 }
986
987
988
989
990 private void updateTrafficMask() {
991 int queueSize = trafficControllingSessions.size();
992
993 while (queueSize > 0) {
994 S session = trafficControllingSessions.poll();
995
996 if (session == null) {
997
998 return;
999 }
1000
1001 SessionState state = getState(session);
1002
1003 switch (state) {
1004 case OPENED:
1005 updateTrafficControl(session);
1006
1007 break;
1008
1009 case CLOSING:
1010 break;
1011
1012 case OPENING:
1013
1014
1015
1016
1017 trafficControllingSessions.add(session);
1018 break;
1019
1020 default:
1021 throw new IllegalStateException(String.valueOf(state));
1022 }
1023
1024
1025
1026
1027
1028
1029 queueSize--;
1030 }
1031 }
1032
1033
1034
1035
1036 @Override
1037 public void updateTrafficControl(S session) {
1038
1039 try {
1040 setInterestedInRead(session, !session.isReadSuspended());
1041 } catch (Exception e) {
1042 IoFilterChain filterChain = session.getFilterChain();
1043 filterChain.fireExceptionCaught(e);
1044 }
1045
1046 try {
1047 setInterestedInWrite(session,
1048 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1049 } catch (Exception e) {
1050 IoFilterChain filterChain = session.getFilterChain();
1051 filterChain.fireExceptionCaught(e);
1052 }
1053 }
1054
1055
1056
1057
1058
1059
1060 private class Processor implements Runnable {
1061 public void run() {
1062 assert (processorRef.get() == this);
1063
1064 int nSessions = 0;
1065 lastIdleCheckTime = System.currentTimeMillis();
1066 int nbTries = 10;
1067
1068 for (;;) {
1069 try {
1070
1071
1072
1073
1074 long t0 = System.currentTimeMillis();
1075 int selected = select(SELECT_TIMEOUT);
1076 long t1 = System.currentTimeMillis();
1077 long delta = t1 - t0;
1078
1079 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
1080
1081
1082 if (isBrokenConnection()) {
1083 LOG.warn("Broken connection");
1084 } else {
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098 if (nbTries == 0) {
1099 LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
1100 registerNewSelector();
1101 nbTries = 10;
1102 } else {
1103 nbTries--;
1104 }
1105 }
1106 } else {
1107 nbTries = 10;
1108 }
1109
1110
1111 nSessions += handleNewSessions();
1112
1113 updateTrafficMask();
1114
1115
1116
1117 if (selected > 0) {
1118
1119
1120 process();
1121 }
1122
1123
1124 long currentTime = System.currentTimeMillis();
1125 flush(currentTime);
1126
1127
1128 nSessions -= removeSessions();
1129
1130
1131 notifyIdleSessions(currentTime);
1132
1133
1134
1135 if (nSessions == 0) {
1136 processorRef.set(null);
1137
1138 if (newSessions.isEmpty() && isSelectorEmpty()) {
1139
1140 assert (processorRef.get() != this);
1141 break;
1142 }
1143
1144 assert (processorRef.get() != this);
1145
1146 if (!processorRef.compareAndSet(null, this)) {
1147
1148 assert (processorRef.get() != this);
1149 break;
1150 }
1151
1152 assert (processorRef.get() == this);
1153 }
1154
1155
1156
1157 if (isDisposing()) {
1158 boolean hasKeys = false;
1159
1160 for (Iterator<S> i = allSessions(); i.hasNext();) {
1161 IoSession session = i.next();
1162
1163 if (session.isActive()) {
1164 scheduleRemove((S)session);
1165 hasKeys = true;
1166 }
1167 }
1168
1169 if (hasKeys) {
1170 wakeup();
1171 }
1172 }
1173 } catch (ClosedSelectorException cse) {
1174
1175
1176 ExceptionMonitor.getInstance().exceptionCaught(cse);
1177 break;
1178 } catch (Exception e) {
1179 ExceptionMonitor.getInstance().exceptionCaught(e);
1180
1181 try {
1182 Thread.sleep(1000);
1183 } catch (InterruptedException e1) {
1184 ExceptionMonitor.getInstance().exceptionCaught(e1);
1185 }
1186 }
1187 }
1188
1189 try {
1190 synchronized (disposalLock) {
1191 if (disposing) {
1192 doDispose();
1193 }
1194 }
1195 } catch (Exception e) {
1196 ExceptionMonitor.getInstance().exceptionCaught(e);
1197 } finally {
1198 disposalFuture.setValue(true);
1199 }
1200 }
1201 }
1202 }