1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.io.IOException;
23 import java.net.PortUnreachableException;
24 import java.nio.channels.ClosedSelectorException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicReference;
35
36 import org.apache.mina.core.buffer.IoBuffer;
37 import org.apache.mina.core.file.FileRegion;
38 import org.apache.mina.core.filterchain.IoFilterChain;
39 import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40 import org.apache.mina.core.future.DefaultIoFuture;
41 import org.apache.mina.core.service.AbstractIoService;
42 import org.apache.mina.core.service.IoProcessor;
43 import org.apache.mina.core.service.IoServiceListenerSupport;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.IoSession;
46 import org.apache.mina.core.session.IoSessionConfig;
47 import org.apache.mina.core.session.SessionState;
48 import org.apache.mina.core.write.WriteRequest;
49 import org.apache.mina.core.write.WriteRequestQueue;
50 import org.apache.mina.core.write.WriteToClosedSessionException;
51 import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52 import org.apache.mina.util.ExceptionMonitor;
53 import org.apache.mina.util.NamePreservingRunnable;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
69
70 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
71
72
73
74
75
76 private static final long SELECT_TIMEOUT = 1000L;
77
78
79 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
80
81
82 private final String threadName;
83
84
85 private final Executor executor;
86
87
88 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
89
90
91 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
92
93
94 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
95
96
97
98
99
100 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
101
102
103 private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
104
105 private long lastIdleCheckTime;
106
107 private final Object disposalLock = new Object();
108
109 private volatile boolean disposing;
110
111 private volatile boolean disposed;
112
113 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114
115 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116
117
118
119
120
121
122
123
124 protected AbstractPollingIoProcessor(Executor executor) {
125 if (executor == null) {
126 throw new IllegalArgumentException("executor");
127 }
128
129 this.threadName = nextThreadName();
130 this.executor = executor;
131 }
132
133
134
135
136
137
138
139
140
141 private String nextThreadName() {
142 Class<?> cls = getClass();
143 int newThreadId;
144
145 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146
147 if (threadId == null) {
148 newThreadId = 1;
149 } else {
150
151 newThreadId = threadId.incrementAndGet();
152 }
153
154
155 return cls.getSimpleName() + '-' + newThreadId;
156 }
157
158
159
160
161 public final boolean isDisposing() {
162 return disposing;
163 }
164
165
166
167
168 public final boolean isDisposed() {
169 return disposed;
170 }
171
172
173
174
175 public final void dispose() {
176 if (disposed || disposing) {
177 return;
178 }
179
180 synchronized (disposalLock) {
181 disposing = true;
182 startupProcessor();
183 }
184
185 disposalFuture.awaitUninterruptibly();
186 disposed = true;
187 }
188
189
190
191
192
193
194
195
196
197 protected abstract void doDispose() throws Exception;
198
199
200
201
202
203
204
205
206
207
208 protected abstract int select(long timeout) throws Exception;
209
210
211
212
213
214
215
216
217 protected abstract int select() throws Exception;
218
219
220
221
222
223
224
225 protected abstract boolean isSelectorEmpty();
226
227
228
229
230 protected abstract void wakeup();
231
232
233
234
235
236
237
238 protected abstract Iterator<S> allSessions();
239
240
241
242
243
244
245
246 protected abstract Iterator<S> selectedSessions();
247
248
249
250
251
252
253
254 protected abstract SessionState getState(S session);
255
256
257
258
259
260
261
262 protected abstract boolean isWritable(S session);
263
264
265
266
267
268
269
270 protected abstract boolean isReadable(S session);
271
272
273
274
275
276
277
278
279 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
280
281
282
283
284
285
286
287
288 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
289
290
291
292
293
294
295
296 protected abstract boolean isInterestedInRead(S session);
297
298
299
300
301
302
303
304 protected abstract boolean isInterestedInWrite(S session);
305
306
307
308
309
310
311
312 protected abstract void init(S session) throws Exception;
313
314
315
316
317
318
319
320 protected abstract void destroy(S session) throws Exception;
321
322
323
324
325
326
327
328
329
330
331 protected abstract int read(S session, IoBuffer buf) throws Exception;
332
333
334
335
336
337
338
339
340
341
342
343
344 protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
345
346
347
348
349
350
351
352
353
354
355
356
357
358 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
359
360
361
362
363 public final void add(S session) {
364 if (disposed || disposing) {
365 throw new IllegalStateException("Already disposed.");
366 }
367
368
369 newSessions.add(session);
370 startupProcessor();
371 }
372
373
374
375
376 public final void remove(S session) {
377 scheduleRemove(session);
378 startupProcessor();
379 }
380
381 private void scheduleRemove(S session) {
382 if (!removingSessions.contains(session)) {
383 removingSessions.add(session);
384 }
385 }
386
387
388
389
390 public void write(S session, WriteRequest writeRequest) {
391 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
392
393 writeRequestQueue.offer(session, writeRequest);
394
395 if (!session.isWriteSuspended()) {
396 this.flush(session);
397 }
398 }
399
400
401
402
403 public final void flush(S session) {
404
405
406 if (session.setScheduledForFlush(true)) {
407 flushingSessions.add(session);
408 wakeup();
409 }
410 }
411
412 private void scheduleFlush(S session) {
413
414
415 if (session.setScheduledForFlush(true)) {
416 flushingSessions.add(session);
417 }
418 }
419
420
421
422
423
424
425 public final void updateTrafficMask(S session) {
426 trafficControllingSessions.add(session);
427 wakeup();
428 }
429
430
431
432
433
434 private void startupProcessor() {
435 Processor processor = processorRef.get();
436
437 if (processor == null) {
438 processor = new Processor();
439
440 if (processorRef.compareAndSet(null, processor)) {
441 executor.execute(new NamePreservingRunnable(processor, threadName));
442 }
443 }
444
445
446
447 wakeup();
448 }
449
450
451
452
453
454
455
456
457 abstract protected void registerNewSelector() throws IOException;
458
459
460
461
462
463
464
465
466
467 abstract protected boolean isBrokenConnection() throws IOException;
468
469
470
471
472
473
474
475 private int handleNewSessions() {
476 int addedSessions = 0;
477
478 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
479 if (addNow(session)) {
480
481 addedSessions++;
482 }
483 }
484
485 return addedSessions;
486 }
487
488
489
490
491
492
493
494
495 private boolean addNow(S session) {
496 boolean registered = false;
497
498 try {
499 init(session);
500 registered = true;
501
502
503 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
504 chainBuilder.buildFilterChain(session.getFilterChain());
505
506
507
508
509 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
510 listeners.fireSessionCreated(session);
511 } catch (Exception e) {
512 ExceptionMonitor.getInstance().exceptionCaught(e);
513
514 try {
515 destroy(session);
516 } catch (Exception e1) {
517 ExceptionMonitor.getInstance().exceptionCaught(e1);
518 } finally {
519 registered = false;
520 }
521 }
522
523 return registered;
524 }
525
526 private int removeSessions() {
527 int removedSessions = 0;
528
529 for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
530 SessionState state = getState(session);
531
532
533 switch (state) {
534 case OPENED:
535
536 if (removeNow(session)) {
537 removedSessions++;
538 }
539
540 break;
541
542 case CLOSING:
543
544
545 removedSessions++;
546 break;
547
548 case OPENING:
549
550
551 newSessions.remove(session);
552
553 if (removeNow(session)) {
554 removedSessions++;
555 }
556
557 break;
558
559 default:
560 throw new IllegalStateException(String.valueOf(state));
561 }
562 }
563
564 return removedSessions;
565 }
566
567 private boolean removeNow(S session) {
568 clearWriteRequestQueue(session);
569
570 try {
571 destroy(session);
572 return true;
573 } catch (Exception e) {
574 IoFilterChain filterChain = session.getFilterChain();
575 filterChain.fireExceptionCaught(e);
576 } finally {
577 try {
578 clearWriteRequestQueue(session);
579 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
580 } catch (Exception e) {
581
582
583
584 IoFilterChain filterChain = session.getFilterChain();
585 filterChain.fireExceptionCaught(e);
586 }
587 }
588
589 return false;
590 }
591
592 private void clearWriteRequestQueue(S session) {
593 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
594 WriteRequest req;
595
596 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
597
598 if ((req = writeRequestQueue.poll(session)) != null) {
599 Object message = req.getMessage();
600
601 if (message instanceof IoBuffer) {
602 IoBuffer buf = (IoBuffer) message;
603
604
605
606 if (buf.hasRemaining()) {
607 buf.reset();
608 failedRequests.add(req);
609 } else {
610 IoFilterChain filterChain = session.getFilterChain();
611 filterChain.fireMessageSent(req);
612 }
613 } else {
614 failedRequests.add(req);
615 }
616
617
618 while ((req = writeRequestQueue.poll(session)) != null) {
619 failedRequests.add(req);
620 }
621 }
622
623
624 if (!failedRequests.isEmpty()) {
625 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
626
627 for (WriteRequest r : failedRequests) {
628 session.decreaseScheduledBytesAndMessages(r);
629 r.getFuture().setException(cause);
630 }
631
632 IoFilterChain filterChain = session.getFilterChain();
633 filterChain.fireExceptionCaught(cause);
634 }
635 }
636
637 private void process() throws Exception {
638 for (Iterator<S> i = selectedSessions(); i.hasNext();) {
639 S session = i.next();
640 process(session);
641 i.remove();
642 }
643 }
644
645
646
647
648 private void process(S session) {
649
650 if (isReadable(session) && !session.isReadSuspended()) {
651 read(session);
652 }
653
654
655 if (isWritable(session) && !session.isWriteSuspended()) {
656
657 if (session.setScheduledForFlush(true)) {
658 flushingSessions.add(session);
659 }
660 }
661 }
662
663 private void read(S session) {
664 IoSessionConfig config = session.getConfig();
665 int bufferSize = config.getReadBufferSize();
666 IoBuffer buf = IoBuffer.allocate(bufferSize);
667
668 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
669
670 try {
671 int readBytes = 0;
672 int ret;
673
674 try {
675 if (hasFragmentation) {
676
677 while ((ret = read(session, buf)) > 0) {
678 readBytes += ret;
679
680 if (!buf.hasRemaining()) {
681 break;
682 }
683 }
684 } else {
685 ret = read(session, buf);
686
687 if (ret > 0) {
688 readBytes = ret;
689 }
690 }
691 } finally {
692 buf.flip();
693 }
694
695 if (readBytes > 0) {
696 IoFilterChain filterChain = session.getFilterChain();
697 filterChain.fireMessageReceived(buf);
698 buf = null;
699
700 if (hasFragmentation) {
701 if (readBytes << 1 < config.getReadBufferSize()) {
702 session.decreaseReadBufferSize();
703 } else if (readBytes == config.getReadBufferSize()) {
704 session.increaseReadBufferSize();
705 }
706 }
707 }
708
709 if (ret < 0) {
710
711 IoFilterChain filterChain = session.getFilterChain();
712 filterChain.fireInputClosed();
713 }
714 } catch (Exception e) {
715 if (e instanceof IOException) {
716 if (!(e instanceof PortUnreachableException)
717 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
718 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
719 scheduleRemove(session);
720 }
721 }
722
723 IoFilterChain filterChain = session.getFilterChain();
724 filterChain.fireExceptionCaught(e);
725 }
726 }
727
728 private void notifyIdleSessions(long currentTime) throws Exception {
729
730 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
731 lastIdleCheckTime = currentTime;
732 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
733 }
734 }
735
736
737
738
739 private void flush(long currentTime) {
740 if (flushingSessions.isEmpty()) {
741 return;
742 }
743
744 do {
745 S session = flushingSessions.poll();
746
747
748 if (session == null) {
749
750 break;
751 }
752
753
754
755 session.unscheduledForFlush();
756
757 SessionState state = getState(session);
758
759 switch (state) {
760 case OPENED:
761 try {
762 boolean flushedAll = flushNow(session, currentTime);
763
764 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
765 && !session.isScheduledForFlush()) {
766 scheduleFlush(session);
767 }
768 } catch (Exception e) {
769 scheduleRemove(session);
770 session.close(true);
771 IoFilterChain filterChain = session.getFilterChain();
772 filterChain.fireExceptionCaught(e);
773 }
774
775 break;
776
777 case CLOSING:
778
779 break;
780
781 case OPENING:
782
783
784
785 scheduleFlush(session);
786 return;
787
788 default:
789 throw new IllegalStateException(String.valueOf(state));
790 }
791
792 } while (!flushingSessions.isEmpty());
793 }
794
795 private boolean flushNow(S session, long currentTime) {
796 if (!session.isConnected()) {
797 scheduleRemove(session);
798 return false;
799 }
800
801 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
802
803 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
804
805
806
807
808 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
809 + (session.getConfig().getMaxReadBufferSize() >>> 1);
810 int writtenBytes = 0;
811 WriteRequest req = null;
812
813 try {
814
815 setInterestedInWrite(session, false);
816
817 do {
818
819 req = session.getCurrentWriteRequest();
820
821 if (req == null) {
822 req = writeRequestQueue.poll(session);
823
824 if (req == null) {
825 break;
826 }
827
828 session.setCurrentWriteRequest(req);
829 }
830
831 int localWrittenBytes = 0;
832 Object message = req.getMessage();
833
834 if (message instanceof IoBuffer) {
835 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
836 currentTime);
837
838 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
839
840 writtenBytes += localWrittenBytes;
841 setInterestedInWrite(session, true);
842 return false;
843 }
844 } else if (message instanceof FileRegion) {
845 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
846 currentTime);
847
848
849
850
851
852
853 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
854 writtenBytes += localWrittenBytes;
855 setInterestedInWrite(session, true);
856 return false;
857 }
858 } else {
859 throw new IllegalStateException("Don't know how to handle message of type '"
860 + message.getClass().getName() + "'. Are you missing a protocol encoder?");
861 }
862
863 if (localWrittenBytes == 0) {
864
865 setInterestedInWrite(session, true);
866 return false;
867 }
868
869 writtenBytes += localWrittenBytes;
870
871 if (writtenBytes >= maxWrittenBytes) {
872
873 scheduleFlush(session);
874 return false;
875 }
876
877 if (message instanceof IoBuffer) {
878 ((IoBuffer) message).free();
879 }
880 } while (writtenBytes < maxWrittenBytes);
881 } catch (Exception e) {
882 if (req != null) {
883 req.getFuture().setException(e);
884 }
885
886 IoFilterChain filterChain = session.getFilterChain();
887 filterChain.fireExceptionCaught(e);
888 return false;
889 }
890
891 return true;
892 }
893
894 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
895 throws Exception {
896 IoBuffer buf = (IoBuffer) req.getMessage();
897 int localWrittenBytes = 0;
898
899 if (buf.hasRemaining()) {
900 int length;
901
902 if (hasFragmentation) {
903 length = Math.min(buf.remaining(), maxLength);
904 } else {
905 length = buf.remaining();
906 }
907
908 try {
909 localWrittenBytes = write(session, buf, length);
910 } catch (IOException ioe) {
911
912
913 buf.free();
914 session.close(true);
915 destroy(session);
916
917 return 0;
918 }
919
920 }
921
922 session.increaseWrittenBytes(localWrittenBytes, currentTime);
923
924 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
925
926 int pos = buf.position();
927 buf.reset();
928
929 fireMessageSent(session, req);
930
931
932 buf.position(pos);
933 }
934
935 return localWrittenBytes;
936 }
937
938 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
939 throws Exception {
940 int localWrittenBytes;
941 FileRegion region = (FileRegion) req.getMessage();
942
943 if (region.getRemainingBytes() > 0) {
944 int length;
945
946 if (hasFragmentation) {
947 length = (int) Math.min(region.getRemainingBytes(), maxLength);
948 } else {
949 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
950 }
951
952 localWrittenBytes = transferFile(session, region, length);
953 region.update(localWrittenBytes);
954 } else {
955 localWrittenBytes = 0;
956 }
957
958 session.increaseWrittenBytes(localWrittenBytes, currentTime);
959
960 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
961 fireMessageSent(session, req);
962 }
963
964 return localWrittenBytes;
965 }
966
967 private void fireMessageSent(S session, WriteRequest req) {
968 session.setCurrentWriteRequest(null);
969 IoFilterChain filterChain = session.getFilterChain();
970 filterChain.fireMessageSent(req);
971 }
972
973
974
975
976 private void updateTrafficMask() {
977 int queueSize = trafficControllingSessions.size();
978
979 while (queueSize > 0) {
980 S session = trafficControllingSessions.poll();
981
982 if (session == null) {
983
984 return;
985 }
986
987 SessionState state = getState(session);
988
989 switch (state) {
990 case OPENED:
991 updateTrafficControl(session);
992
993 break;
994
995 case CLOSING:
996 break;
997
998 case OPENING:
999
1000
1001
1002
1003 trafficControllingSessions.add(session);
1004 break;
1005
1006 default:
1007 throw new IllegalStateException(String.valueOf(state));
1008 }
1009
1010
1011
1012
1013
1014
1015 queueSize--;
1016 }
1017 }
1018
1019
1020
1021
1022 public void updateTrafficControl(S session) {
1023
1024 try {
1025 setInterestedInRead(session, !session.isReadSuspended());
1026 } catch (Exception e) {
1027 IoFilterChain filterChain = session.getFilterChain();
1028 filterChain.fireExceptionCaught(e);
1029 }
1030
1031 try {
1032 setInterestedInWrite(session,
1033 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1034 } catch (Exception e) {
1035 IoFilterChain filterChain = session.getFilterChain();
1036 filterChain.fireExceptionCaught(e);
1037 }
1038 }
1039
1040
1041
1042
1043
1044
1045 private class Processor implements Runnable {
1046 public void run() {
1047 assert (processorRef.get() == this);
1048
1049 int nSessions = 0;
1050 lastIdleCheckTime = System.currentTimeMillis();
1051
1052 for (;;) {
1053 try {
1054
1055
1056
1057
1058 long t0 = System.currentTimeMillis();
1059 int selected = select(SELECT_TIMEOUT);
1060 long t1 = System.currentTimeMillis();
1061 long delta = (t1 - t0);
1062
1063 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
1064
1065
1066 if (isBrokenConnection()) {
1067 LOG.warn("Broken connection");
1068 } else {
1069 LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083 registerNewSelector();
1084 }
1085 }
1086
1087
1088 nSessions += handleNewSessions();
1089
1090 updateTrafficMask();
1091
1092
1093
1094 if (selected > 0) {
1095
1096
1097 process();
1098 }
1099
1100
1101 long currentTime = System.currentTimeMillis();
1102 flush(currentTime);
1103
1104
1105 nSessions -= removeSessions();
1106
1107
1108 notifyIdleSessions(currentTime);
1109
1110
1111
1112 if (nSessions == 0) {
1113 processorRef.set(null);
1114
1115 if (newSessions.isEmpty() && isSelectorEmpty()) {
1116
1117 assert (processorRef.get() != this);
1118 break;
1119 }
1120
1121 assert (processorRef.get() != this);
1122
1123 if (!processorRef.compareAndSet(null, this)) {
1124
1125 assert (processorRef.get() != this);
1126 break;
1127 }
1128
1129 assert (processorRef.get() == this);
1130 }
1131
1132
1133
1134 if (isDisposing()) {
1135 boolean hasKeys = false;
1136
1137 for (Iterator<S> i = allSessions(); i.hasNext();) {
1138 IoSession session = i.next();
1139
1140 if (session.isActive()) {
1141 scheduleRemove((S)session);
1142 hasKeys = true;
1143 }
1144 }
1145
1146 if (hasKeys) {
1147 wakeup();
1148 }
1149 }
1150 } catch (ClosedSelectorException cse) {
1151
1152
1153 ExceptionMonitor.getInstance().exceptionCaught(cse);
1154 break;
1155 } catch (Exception e) {
1156 ExceptionMonitor.getInstance().exceptionCaught(e);
1157
1158 try {
1159 Thread.sleep(1000);
1160 } catch (InterruptedException e1) {
1161 ExceptionMonitor.getInstance().exceptionCaught(e1);
1162 }
1163 }
1164 }
1165
1166 try {
1167 synchronized (disposalLock) {
1168 if (disposing) {
1169 doDispose();
1170 }
1171 }
1172 } catch (Exception e) {
1173 ExceptionMonitor.getInstance().exceptionCaught(e);
1174 } finally {
1175 disposalFuture.setValue(true);
1176 }
1177 }
1178 }
1179 }