1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63
64
65
66
67 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
68
69 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
70
71
72
73
74
75
76
77 private static final int WRITE_SPIN_COUNT = 256;
78
79
80
81
82
83 private static final long SELECT_TIMEOUT = 1000L;
84
85
86 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
87
88
89 private final String threadName;
90
91
92 private final Executor executor;
93
94
95 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
96
97
98 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
99
100
101 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
102
103
104
105
106
107 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
108
109
110 private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
111
112 private long lastIdleCheckTime;
113
114 private final Object disposalLock = new Object();
115
116 private volatile boolean disposing;
117
118 private volatile boolean disposed;
119
120 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
121
122 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
123
124
125
126
127
128
129
130
131 protected AbstractPollingIoProcessor(Executor executor) {
132 if (executor == null) {
133 throw new IllegalArgumentException("executor");
134 }
135
136 this.threadName = nextThreadName();
137 this.executor = executor;
138 }
139
140
141
142
143
144
145
146
147
148 private String nextThreadName() {
149 Class<?> cls = getClass();
150 int newThreadId;
151
152 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
153
154 if (threadId == null) {
155 newThreadId = 1;
156 } else {
157
158 newThreadId = threadId.incrementAndGet();
159 }
160
161
162 return cls.getSimpleName() + '-' + newThreadId;
163 }
164
165
166
167
168 public final boolean isDisposing() {
169 return disposing;
170 }
171
172
173
174
175 public final boolean isDisposed() {
176 return disposed;
177 }
178
179
180
181
182 public final void dispose() {
183 if (disposed || disposing) {
184 return;
185 }
186
187 synchronized (disposalLock) {
188 disposing = true;
189 startupProcessor();
190 }
191
192 disposalFuture.awaitUninterruptibly();
193 disposed = true;
194 }
195
196
197
198
199
200
201
202 protected abstract void doDispose() throws Exception;
203
204
205
206
207
208
209
210
211
212
213 protected abstract int select(long timeout) throws Exception;
214
215
216
217
218
219
220
221
222 protected abstract int select() throws Exception;
223
224
225
226
227
228
229
230 protected abstract boolean isSelectorEmpty();
231
232
233
234
235 protected abstract void wakeup();
236
237
238
239
240
241
242
243 protected abstract Iterator<S> allSessions();
244
245
246
247
248
249
250 protected abstract Iterator<S> selectedSessions();
251
252
253
254
255
256
257
258
259 protected abstract SessionState getState(S session);
260
261
262
263
264
265
266
267
268 protected abstract boolean isWritable(S session);
269
270
271
272
273
274
275
276
277 protected abstract boolean isReadable(S session);
278
279
280
281
282
283
284
285
286
287 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
288
289
290
291
292
293
294
295
296
297 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
298
299
300
301
302
303
304
305
306 protected abstract boolean isInterestedInRead(S session);
307
308
309
310
311
312
313
314
315 protected abstract boolean isInterestedInWrite(S session);
316
317
318
319
320
321
322
323 protected abstract void init(S session) throws Exception;
324
325
326
327
328
329
330
331
332
333 protected abstract void destroy(S session) throws Exception;
334
335
336
337
338
339
340
341
342
343
344
345
346
347 protected abstract int read(S session, IoBuffer buf) throws Exception;
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
383
384
385
386
387 public final void add(S session) {
388 if (disposed || disposing) {
389 throw new IllegalStateException("Already disposed.");
390 }
391
392
393 newSessions.add(session);
394 startupProcessor();
395 }
396
397
398
399
400 public final void remove(S session) {
401 scheduleRemove(session);
402 startupProcessor();
403 }
404
405 private void scheduleRemove(S session) {
406 removingSessions.add(session);
407 }
408
409
410
411
412 public void write(S session, WriteRequest writeRequest) {
413 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
414
415 writeRequestQueue.offer(session, writeRequest);
416
417 if (!session.isWriteSuspended()) {
418 this.flush(session);
419 }
420 }
421
422
423
424
425 public final void flush(S session) {
426
427
428 if (session.setScheduledForFlush(true)) {
429 flushingSessions.add(session);
430 wakeup();
431 }
432 }
433
434 private void scheduleFlush(S session) {
435
436
437 if (session.setScheduledForFlush(true)) {
438 flushingSessions.add(session);
439 }
440 }
441
442
443
444
445 public final void updateTrafficMask(S session) {
446 trafficControllingSessions.add(session);
447 wakeup();
448 }
449
450
451
452
453
454 private void startupProcessor() {
455 Processor processor = processorRef.get();
456
457 if (processor == null) {
458 processor = new Processor();
459
460 if (processorRef.compareAndSet(null, processor)) {
461 executor.execute(new NamePreservingRunnable(processor, threadName));
462 }
463 }
464
465
466
467 wakeup();
468 }
469
470
471
472
473
474
475
476
477
478 abstract protected void registerNewSelector() throws IOException;
479
480
481
482
483
484
485
486
487
488
489 abstract protected boolean isBrokenConnection() throws IOException;
490
491
492
493
494
495
496
497 private int handleNewSessions() {
498 int addedSessions = 0;
499
500 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
501 if (addNow(session)) {
502
503 addedSessions++;
504 }
505 }
506
507 return addedSessions;
508 }
509
510
511
512
513
514
515
516
517
518
519 private boolean addNow(S session) {
520 boolean registered = false;
521
522 try {
523 init(session);
524 registered = true;
525
526
527 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
528 chainBuilder.buildFilterChain(session.getFilterChain());
529
530
531
532
533 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
534 listeners.fireSessionCreated(session);
535 } catch (Throwable e) {
536 ExceptionMonitor.getInstance().exceptionCaught(e);
537
538 try {
539 destroy(session);
540 } catch (Exception e1) {
541 ExceptionMonitor.getInstance().exceptionCaught(e1);
542 } finally {
543 registered = false;
544 }
545 }
546
547 return registered;
548 }
549
550 private int removeSessions() {
551 int removedSessions = 0;
552
553 for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
554 SessionState state = getState(session);
555
556
557 switch (state) {
558 case OPENED:
559
560 if (removeNow(session)) {
561 removedSessions++;
562 }
563
564 break;
565
566 case CLOSING:
567
568 break;
569
570 case OPENING:
571
572
573 newSessions.remove(session);
574
575 if (removeNow(session)) {
576 removedSessions++;
577 }
578
579 break;
580
581 default:
582 throw new IllegalStateException(String.valueOf(state));
583 }
584 }
585
586 return removedSessions;
587 }
588
589 private boolean removeNow(S session) {
590 clearWriteRequestQueue(session);
591
592 try {
593 destroy(session);
594 return true;
595 } catch (Exception e) {
596 IoFilterChain filterChain = session.getFilterChain();
597 filterChain.fireExceptionCaught(e);
598 } finally {
599 clearWriteRequestQueue(session);
600 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
601 }
602 return false;
603 }
604
605 private void clearWriteRequestQueue(S session) {
606 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
607 WriteRequest req;
608
609 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
610
611 if ((req = writeRequestQueue.poll(session)) != null) {
612 Object message = req.getMessage();
613
614 if (message instanceof IoBuffer) {
615 IoBuffer buf = (IoBuffer) message;
616
617
618
619 if (buf.hasRemaining()) {
620 buf.reset();
621 failedRequests.add(req);
622 } else {
623 IoFilterChain filterChain = session.getFilterChain();
624 filterChain.fireMessageSent(req);
625 }
626 } else {
627 failedRequests.add(req);
628 }
629
630
631 while ((req = writeRequestQueue.poll(session)) != null) {
632 failedRequests.add(req);
633 }
634 }
635
636
637 if (!failedRequests.isEmpty()) {
638 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
639
640 for (WriteRequest r : failedRequests) {
641 session.decreaseScheduledBytesAndMessages(r);
642 r.getFuture().setException(cause);
643 }
644
645 IoFilterChain filterChain = session.getFilterChain();
646 filterChain.fireExceptionCaught(cause);
647 }
648 }
649
650 private void process() throws Exception {
651 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
652 S session = i.next();
653 process(session);
654 i.remove();
655 }
656 }
657
658
659
660
661 private void process(S session) {
662
663 if (isReadable(session) && !session.isReadSuspended()) {
664 read(session);
665 }
666
667
668 if (isWritable(session) && !session.isWriteSuspended()) {
669
670 if (session.setScheduledForFlush(true)) {
671 flushingSessions.add(session);
672 }
673 }
674 }
675
676 private void read(S session) {
677 IoSessionConfig config = session.getConfig();
678 int bufferSize = config.getReadBufferSize();
679 IoBuffer buf = IoBuffer.allocate(bufferSize);
680
681 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
682
683 try {
684 int readBytes = 0;
685 int ret;
686
687 try {
688 if (hasFragmentation) {
689
690 while ((ret = read(session, buf)) > 0) {
691 readBytes += ret;
692
693 if (!buf.hasRemaining()) {
694 break;
695 }
696 }
697 } else {
698 ret = read(session, buf);
699
700 if (ret > 0) {
701 readBytes = ret;
702 }
703 }
704 } finally {
705 buf.flip();
706 }
707
708 if (readBytes > 0) {
709 IoFilterChain filterChain = session.getFilterChain();
710 filterChain.fireMessageReceived(buf);
711 buf = null;
712
713 if (hasFragmentation) {
714 if (readBytes << 1 < config.getReadBufferSize()) {
715 session.decreaseReadBufferSize();
716 } else if (readBytes == config.getReadBufferSize()) {
717 session.increaseReadBufferSize();
718 }
719 }
720 }
721
722 if (ret < 0) {
723 scheduleRemove(session);
724 }
725 } catch (Throwable e) {
726 if (e instanceof IOException) {
727 if (!(e instanceof PortUnreachableException)
728 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
729 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
730 scheduleRemove(session);
731 }
732 }
733
734 IoFilterChain filterChain = session.getFilterChain();
735 filterChain.fireExceptionCaught(e);
736 }
737 }
738
739 private static String byteArrayToHex(byte[] barray) {
740 char[] c = new char[barray.length * 2];
741 int pos = 0;
742
743 for (byte b : barray) {
744 int bb = (b & 0x00FF) >> 4;
745 c[pos++] = (char) (bb > 9 ? bb + 0x37 : bb + 0x30);
746 bb = b & 0x0F;
747 c[pos++] = (char) (bb > 9 ? bb + 0x37 : bb + 0x30);
748 if (pos > 60) {
749 break;
750 }
751 }
752
753 return new String(c);
754 }
755
756 private void notifyIdleSessions(long currentTime) throws Exception {
757
758 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
759 lastIdleCheckTime = currentTime;
760 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
761 }
762 }
763
764
765
766
767 private void flush(long currentTime) {
768 if (flushingSessions.isEmpty()) {
769 return;
770 }
771
772 do {
773 S session = flushingSessions.poll();
774
775 if (session == null) {
776
777 break;
778 }
779
780
781
782 session.unscheduledForFlush();
783
784 SessionState state = getState(session);
785
786 switch (state) {
787 case OPENED:
788 try {
789 boolean flushedAll = flushNow(session, currentTime);
790
791 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
792 && !session.isScheduledForFlush()) {
793 scheduleFlush(session);
794 }
795 } catch (Exception e) {
796 scheduleRemove(session);
797 IoFilterChain filterChain = session.getFilterChain();
798 filterChain.fireExceptionCaught(e);
799 }
800
801 break;
802
803 case CLOSING:
804
805 break;
806
807 case OPENING:
808
809
810
811 scheduleFlush(session);
812 return;
813
814 default:
815 throw new IllegalStateException(String.valueOf(state));
816 }
817
818 } while (!flushingSessions.isEmpty());
819 }
820
821 private boolean flushNow(S session, long currentTime) {
822 if (!session.isConnected()) {
823 scheduleRemove(session);
824 return false;
825 }
826
827 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
828
829 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
830
831
832
833
834 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
835 + (session.getConfig().getMaxReadBufferSize() >>> 1);
836 int writtenBytes = 0;
837 WriteRequest req = null;
838
839 try {
840
841 setInterestedInWrite(session, false);
842
843 do {
844
845 req = session.getCurrentWriteRequest();
846
847 if (req == null) {
848 req = writeRequestQueue.poll(session);
849
850 if (req == null) {
851 break;
852 }
853
854 session.setCurrentWriteRequest(req);
855 }
856
857 int localWrittenBytes = 0;
858 Object message = req.getMessage();
859
860 if (message instanceof IoBuffer) {
861 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
862 currentTime);
863
864 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
865
866 writtenBytes += localWrittenBytes;
867 setInterestedInWrite(session, true);
868 return false;
869 }
870 } else if (message instanceof FileRegion) {
871 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
872 currentTime);
873
874
875
876
877
878
879 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
880 writtenBytes += localWrittenBytes;
881 setInterestedInWrite(session, true);
882 return false;
883 }
884 } else {
885 throw new IllegalStateException("Don't know how to handle message of type '"
886 + message.getClass().getName() + "'. Are you missing a protocol encoder?");
887 }
888
889 if (localWrittenBytes == 0) {
890
891 setInterestedInWrite(session, true);
892 return false;
893 }
894
895 writtenBytes += localWrittenBytes;
896
897 if (writtenBytes >= maxWrittenBytes) {
898
899 scheduleFlush(session);
900 return false;
901 }
902 } while (writtenBytes < maxWrittenBytes);
903 } catch (Exception e) {
904 if (req != null) {
905 req.getFuture().setException(e);
906 }
907
908 IoFilterChain filterChain = session.getFilterChain();
909 filterChain.fireExceptionCaught(e);
910 return false;
911 }
912
913 return true;
914 }
915
916 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
917 throws Exception {
918 IoBuffer buf = (IoBuffer) req.getMessage();
919 int localWrittenBytes = 0;
920
921 if (buf.hasRemaining()) {
922 int length;
923
924 if (hasFragmentation) {
925 length = Math.min(buf.remaining(), maxLength);
926 } else {
927 length = buf.remaining();
928 }
929
930 try {
931 localWrittenBytes = write(session, buf, length);
932 } catch (IOException ioe) {
933
934
935 session.close(true);
936 }
937
938 }
939
940 session.increaseWrittenBytes(localWrittenBytes, currentTime);
941
942 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
943
944 int pos = buf.position();
945 buf.reset();
946
947 fireMessageSent(session, req);
948
949
950 buf.position(pos);
951 }
952
953 return localWrittenBytes;
954 }
955
956 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
957 throws Exception {
958 int localWrittenBytes;
959 FileRegion region = (FileRegion) req.getMessage();
960
961 if (region.getRemainingBytes() > 0) {
962 int length;
963
964 if (hasFragmentation) {
965 length = (int) Math.min(region.getRemainingBytes(), maxLength);
966 } else {
967 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
968 }
969
970 localWrittenBytes = transferFile(session, region, length);
971 region.update(localWrittenBytes);
972 } else {
973 localWrittenBytes = 0;
974 }
975
976 session.increaseWrittenBytes(localWrittenBytes, currentTime);
977
978 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
979 fireMessageSent(session, req);
980 }
981
982 return localWrittenBytes;
983 }
984
985 private void fireMessageSent(S session, WriteRequest req) {
986 session.setCurrentWriteRequest(null);
987 IoFilterChain filterChain = session.getFilterChain();
988 filterChain.fireMessageSent(req);
989 }
990
991
992
993
994 private void updateTrafficMask() {
995 int queueSize = trafficControllingSessions.size();
996
997 while (queueSize > 0) {
998 S session = trafficControllingSessions.poll();
999
1000 if (session == null) {
1001
1002 return;
1003 }
1004
1005 SessionState state = getState(session);
1006
1007 switch (state) {
1008 case OPENED:
1009 updateTrafficControl(session);
1010
1011 break;
1012
1013 case CLOSING:
1014 break;
1015
1016 case OPENING:
1017
1018
1019
1020
1021 trafficControllingSessions.add(session);
1022 break;
1023
1024 default:
1025 throw new IllegalStateException(String.valueOf(state));
1026 }
1027
1028
1029
1030
1031
1032 queueSize--;
1033 }
1034 }
1035
1036
1037
1038
1039 public void updateTrafficControl(S session) {
1040
1041 try {
1042 setInterestedInRead(session, !session.isReadSuspended());
1043 } catch (Exception e) {
1044 IoFilterChain filterChain = session.getFilterChain();
1045 filterChain.fireExceptionCaught(e);
1046 }
1047
1048 try {
1049 setInterestedInWrite(session,
1050 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1051 } catch (Exception e) {
1052 IoFilterChain filterChain = session.getFilterChain();
1053 filterChain.fireExceptionCaught(e);
1054 }
1055 }
1056
1057
1058
1059
1060
1061
1062
1063 private class Processor implements Runnable {
1064 public void run() {
1065 assert (processorRef.get() == this);
1066
1067 int nSessions = 0;
1068 lastIdleCheckTime = System.currentTimeMillis();
1069
1070 for (;;) {
1071 try {
1072
1073
1074
1075
1076 long t0 = System.currentTimeMillis();
1077 int selected = select(SELECT_TIMEOUT);
1078 long t1 = System.currentTimeMillis();
1079 long delta = (t1 - t0);
1080
1081 if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
1082
1083
1084 if (isBrokenConnection()) {
1085 LOG.warn("Broken connection");
1086
1087
1088
1089 wakeupCalled.getAndSet(false);
1090
1091 continue;
1092 } else {
1093 LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105 registerNewSelector();
1106 }
1107
1108
1109 wakeupCalled.getAndSet(false);
1110
1111
1112 continue;
1113 }
1114
1115
1116 nSessions += handleNewSessions();
1117
1118 updateTrafficMask();
1119
1120
1121
1122 if (selected > 0) {
1123
1124 process();
1125 }
1126
1127
1128 long currentTime = System.currentTimeMillis();
1129 flush(currentTime);
1130
1131
1132 nSessions -= removeSessions();
1133
1134
1135 notifyIdleSessions(currentTime);
1136
1137
1138
1139 if (nSessions == 0) {
1140 processorRef.set(null);
1141
1142 if (newSessions.isEmpty() && isSelectorEmpty()) {
1143
1144 assert (processorRef.get() != this);
1145 break;
1146 }
1147
1148 assert (processorRef.get() != this);
1149
1150 if (!processorRef.compareAndSet(null, this)) {
1151
1152 assert (processorRef.get() != this);
1153 break;
1154 }
1155
1156 assert (processorRef.get() == this);
1157 }
1158
1159
1160
1161 if (isDisposing()) {
1162 for (Iterator<S> i = allSessions(); i.hasNext();) {
1163 scheduleRemove(i.next());
1164 }
1165
1166 wakeup();
1167 }
1168 } catch (ClosedSelectorException cse) {
1169
1170 break;
1171 } catch (Throwable t) {
1172 ExceptionMonitor.getInstance().exceptionCaught(t);
1173
1174 try {
1175 Thread.sleep(1000);
1176 } catch (InterruptedException e1) {
1177 ExceptionMonitor.getInstance().exceptionCaught(e1);
1178 }
1179 }
1180 }
1181
1182 try {
1183 synchronized (disposalLock) {
1184 if (disposing) {
1185 doDispose();
1186 }
1187 }
1188 } catch (Throwable t) {
1189 ExceptionMonitor.getInstance().exceptionCaught(t);
1190 } finally {
1191 disposalFuture.setValue(true);
1192 }
1193 }
1194 }
1195 }