1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.future;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27 import org.apache.mina.core.service.IoProcessor;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.util.ExceptionMonitor;
30
31
32
33
34
35
36
37
38 public class DefaultIoFuture implements IoFuture {
39
40
41 private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
42
43
44 private final IoSession session;
45
46
47 private final Object lock;
48 private IoFutureListener<?> firstListener;
49 private List<IoFutureListener<?>> otherListeners;
50 private Object result;
51 private boolean ready;
52 private int waiters;
53
54
55
56
57
58
59 public DefaultIoFuture(IoSession session) {
60 this.session = session;
61 this.lock = this;
62 }
63
64
65
66
67 public IoSession getSession() {
68 return session;
69 }
70
71
72
73
74 @Deprecated
75 public void join() {
76 awaitUninterruptibly();
77 }
78
79
80
81
82 @Deprecated
83 public boolean join(long timeoutMillis) {
84 return awaitUninterruptibly(timeoutMillis);
85 }
86
87
88
89
90 public IoFuture await() throws InterruptedException {
91 synchronized (lock) {
92 while (!ready) {
93 waiters++;
94 try {
95
96
97
98 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
99 } finally {
100 waiters--;
101 if (!ready) {
102 checkDeadLock();
103 }
104 }
105 }
106 }
107 return this;
108 }
109
110
111
112
113 public boolean await(long timeout, TimeUnit unit)
114 throws InterruptedException {
115 return await(unit.toMillis(timeout));
116 }
117
118
119
120
121 public boolean await(long timeoutMillis) throws InterruptedException {
122 return await0(timeoutMillis, true);
123 }
124
125
126
127
128 public IoFuture awaitUninterruptibly() {
129 try {
130 await0(Long.MAX_VALUE, false);
131 } catch ( InterruptedException ie) {
132
133 }
134
135 return this;
136 }
137
138
139
140
141 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
142 return awaitUninterruptibly(unit.toMillis(timeout));
143 }
144
145
146
147
148 public boolean awaitUninterruptibly(long timeoutMillis) {
149 try {
150 return await0(timeoutMillis, false);
151 } catch (InterruptedException e) {
152 throw new InternalError();
153 }
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
170 long endTime = System.currentTimeMillis() + timeoutMillis;
171
172 if (endTime < 0) {
173 endTime = Long.MAX_VALUE;
174 }
175
176 synchronized (lock) {
177 if (ready) {
178 return ready;
179 } else if (timeoutMillis <= 0) {
180 return ready;
181 }
182
183 waiters++;
184
185 try {
186 for (;;) {
187 try {
188 long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
189 lock.wait(timeOut);
190 } catch (InterruptedException e) {
191 if (interruptable) {
192 throw e;
193 }
194 }
195
196 if (ready) {
197 return true;
198 }
199
200 if (endTime < System.currentTimeMillis()) {
201 return ready;
202 }
203 }
204 } finally {
205 waiters--;
206 if (!ready) {
207 checkDeadLock();
208 }
209 }
210 }
211 }
212
213
214
215
216
217
218
219 private void checkDeadLock() {
220
221 if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
222 this instanceof ReadFuture || this instanceof ConnectFuture)) {
223 return;
224 }
225
226
227
228
229
230
231
232 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
233
234
235 for (StackTraceElement s: stackTrace) {
236 if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
237 IllegalStateException e = new IllegalStateException( "t" );
238 e.getStackTrace();
239 throw new IllegalStateException(
240 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
241 ".await() was invoked from an I/O processor thread. " +
242 "Please use " + IoFutureListener.class.getSimpleName() +
243 " or configure a proper thread model alternatively.");
244 }
245 }
246
247
248 for (StackTraceElement s: stackTrace) {
249 try {
250 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
251 if (IoProcessor.class.isAssignableFrom(cls)) {
252 throw new IllegalStateException(
253 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
254 ".await() was invoked from an I/O processor thread. " +
255 "Please use " + IoFutureListener.class.getSimpleName() +
256 " or configure a proper thread model alternatively.");
257 }
258 } catch (Exception cnfe) {
259
260 }
261 }
262 }
263
264
265
266
267 public boolean isDone() {
268 synchronized (lock) {
269 return ready;
270 }
271 }
272
273
274
275
276 public void setValue(Object newValue) {
277 synchronized (lock) {
278
279 if (ready) {
280 return;
281 }
282
283 result = newValue;
284 ready = true;
285 if (waiters > 0) {
286 lock.notifyAll();
287 }
288 }
289
290 notifyListeners();
291 }
292
293
294
295
296 protected Object getValue() {
297 synchronized (lock) {
298 return result;
299 }
300 }
301
302
303
304
305 public IoFuture addListener(IoFutureListener<?> listener) {
306 if (listener == null) {
307 throw new IllegalArgumentException("listener");
308 }
309
310 boolean notifyNow = false;
311 synchronized (lock) {
312 if (ready) {
313 notifyNow = true;
314 } else {
315 if (firstListener == null) {
316 firstListener = listener;
317 } else {
318 if (otherListeners == null) {
319 otherListeners = new ArrayList<IoFutureListener<?>>(1);
320 }
321 otherListeners.add(listener);
322 }
323 }
324 }
325
326 if (notifyNow) {
327 notifyListener(listener);
328 }
329 return this;
330 }
331
332
333
334
335 public IoFuture removeListener(IoFutureListener<?> listener) {
336 if (listener == null) {
337 throw new IllegalArgumentException("listener");
338 }
339
340 synchronized (lock) {
341 if (!ready) {
342 if (listener == firstListener) {
343 if (otherListeners != null && !otherListeners.isEmpty()) {
344 firstListener = otherListeners.remove(0);
345 } else {
346 firstListener = null;
347 }
348 } else if (otherListeners != null) {
349 otherListeners.remove(listener);
350 }
351 }
352 }
353
354 return this;
355 }
356
357 private void notifyListeners() {
358
359
360
361 if (firstListener != null) {
362 notifyListener(firstListener);
363 firstListener = null;
364
365 if (otherListeners != null) {
366 for (IoFutureListener<?> l : otherListeners) {
367 notifyListener(l);
368 }
369 otherListeners = null;
370 }
371 }
372 }
373
374 @SuppressWarnings("unchecked")
375 private void notifyListener(IoFutureListener l) {
376 try {
377 l.operationComplete(this);
378 } catch (Throwable t) {
379 ExceptionMonitor.getInstance().exceptionCaught(t);
380 }
381 }
382 }