1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.future;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27 import org.apache.mina.core.service.IoProcessor;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.util.ExceptionMonitor;
30
31
32
33
34
35
36
37
38
39 public class DefaultIoFuture implements IoFuture {
40
41
42 private static final int DEAD_LOCK_CHECK_INTERVAL = 5000;
43
44 private final IoSession session;
45
46
47 private final Object lock;
48 private IoFutureListener<?> firstListener;
49 private List<IoFutureListener<?>> otherListeners;
50 private Object result;
51 private boolean ready;
52 private int waiters;
53
54
55
56
57
58
59 public DefaultIoFuture(IoSession session) {
60 this.session = session;
61 this.lock = this;
62 }
63
64
65
66
67 public IoSession getSession() {
68 return session;
69 }
70
71
72
73
74 @Deprecated
75 public void join() {
76 awaitUninterruptibly();
77 }
78
79
80
81
82 @Deprecated
83 public boolean join(long timeoutMillis) {
84 return awaitUninterruptibly(timeoutMillis);
85 }
86
87
88
89
90 public IoFuture await() throws InterruptedException {
91 synchronized (lock) {
92 while (!ready) {
93 waiters++;
94 try {
95
96
97
98 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
99 } finally {
100 waiters--;
101 if (!ready) {
102 checkDeadLock();
103 }
104 }
105 }
106 }
107 return this;
108 }
109
110
111
112
113 public boolean await(long timeout, TimeUnit unit)
114 throws InterruptedException {
115 return await(unit.toMillis(timeout));
116 }
117
118
119
120
121 public boolean await(long timeoutMillis) throws InterruptedException {
122 return await0(timeoutMillis, true);
123 }
124
125
126
127
128 public IoFuture awaitUninterruptibly() {
129 synchronized (lock) {
130 while (!ready) {
131 waiters++;
132 try {
133 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
134 } catch (InterruptedException ie) {
135
136 } finally {
137 waiters--;
138 if (!ready) {
139 checkDeadLock();
140 }
141 }
142 }
143 }
144
145 return this;
146 }
147
148
149
150
151 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
152 return awaitUninterruptibly(unit.toMillis(timeout));
153 }
154
155
156
157
158 public boolean awaitUninterruptibly(long timeoutMillis) {
159 try {
160 return await0(timeoutMillis, false);
161 } catch (InterruptedException e) {
162 throw new InternalError();
163 }
164 }
165
166 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
167 long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
168 long waitTime = timeoutMillis;
169
170 synchronized (lock) {
171 if (ready) {
172 return ready;
173 } else if (waitTime <= 0) {
174 return ready;
175 }
176
177 waiters++;
178 try {
179 for (;;) {
180 try {
181 lock.wait(Math.min(waitTime, DEAD_LOCK_CHECK_INTERVAL));
182 } catch (InterruptedException e) {
183 if (interruptable) {
184 throw e;
185 }
186 }
187
188 if (ready) {
189 return true;
190 } else {
191 waitTime = timeoutMillis
192 - (System.currentTimeMillis() - startTime);
193 if (waitTime <= 0) {
194 return ready;
195 }
196 }
197 }
198 } finally {
199 waiters--;
200 if (!ready) {
201 checkDeadLock();
202 }
203 }
204 }
205 }
206
207
208
209
210
211
212
213 private void checkDeadLock() {
214
215 if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
216 this instanceof ReadFuture || this instanceof ConnectFuture)) {
217 return;
218 }
219
220
221
222
223
224
225
226 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
227
228
229 for (StackTraceElement s: stackTrace) {
230 if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
231 IllegalStateException e = new IllegalStateException( "t" );
232 e.getStackTrace();
233 throw new IllegalStateException(
234 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
235 ".await() was invoked from an I/O processor thread. " +
236 "Please use " + IoFutureListener.class.getSimpleName() +
237 " or configure a proper thread model alternatively.");
238 }
239 }
240
241
242 for (StackTraceElement s: stackTrace) {
243 try {
244 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
245 if (IoProcessor.class.isAssignableFrom(cls)) {
246 throw new IllegalStateException(
247 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
248 ".await() was invoked from an I/O processor thread. " +
249 "Please use " + IoFutureListener.class.getSimpleName() +
250 " or configure a proper thread model alternatively.");
251 }
252 } catch (Exception cnfe) {
253
254 }
255 }
256 }
257
258
259
260
261 public boolean isDone() {
262 synchronized (lock) {
263 return ready;
264 }
265 }
266
267
268
269
270 public void setValue(Object newValue) {
271 synchronized (lock) {
272
273 if (ready) {
274 return;
275 }
276
277 result = newValue;
278 ready = true;
279 if (waiters > 0) {
280 lock.notifyAll();
281 }
282 }
283
284 notifyListeners();
285 }
286
287
288
289
290 protected Object getValue() {
291 synchronized (lock) {
292 return result;
293 }
294 }
295
296
297
298
299 public IoFuture addListener(IoFutureListener<?> listener) {
300 if (listener == null) {
301 throw new NullPointerException("listener");
302 }
303
304 boolean notifyNow = false;
305 synchronized (lock) {
306 if (ready) {
307 notifyNow = true;
308 } else {
309 if (firstListener == null) {
310 firstListener = listener;
311 } else {
312 if (otherListeners == null) {
313 otherListeners = new ArrayList<IoFutureListener<?>>(1);
314 }
315 otherListeners.add(listener);
316 }
317 }
318 }
319
320 if (notifyNow) {
321 notifyListener(listener);
322 }
323 return this;
324 }
325
326
327
328
329 public IoFuture removeListener(IoFutureListener<?> listener) {
330 if (listener == null) {
331 throw new NullPointerException("listener");
332 }
333
334 synchronized (lock) {
335 if (!ready) {
336 if (listener == firstListener) {
337 if (otherListeners != null && !otherListeners.isEmpty()) {
338 firstListener = otherListeners.remove(0);
339 } else {
340 firstListener = null;
341 }
342 } else if (otherListeners != null) {
343 otherListeners.remove(listener);
344 }
345 }
346 }
347
348 return this;
349 }
350
351 private void notifyListeners() {
352
353
354
355 if (firstListener != null) {
356 notifyListener(firstListener);
357 firstListener = null;
358
359 if (otherListeners != null) {
360 for (IoFutureListener<?> l : otherListeners) {
361 notifyListener(l);
362 }
363 otherListeners = null;
364 }
365 }
366 }
367
368 @SuppressWarnings("unchecked")
369 private void notifyListener(IoFutureListener l) {
370 try {
371 l.operationComplete(this);
372 } catch (Throwable t) {
373 ExceptionMonitor.getInstance().exceptionCaught(t);
374 }
375 }
376 }