1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.future;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27 import org.apache.mina.core.service.IoProcessor;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.util.ExceptionMonitor;
30
31
32
33
34
35
36
37
38 public class DefaultIoFuture implements IoFuture {
39
40
41 private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
42
43
44 private final IoSession session;
45
46
47 private final Object lock;
48 private IoFutureListener<?> firstListener;
49 private List<IoFutureListener<?>> otherListeners;
50 private Object result;
51 private boolean ready;
52 private int waiters;
53
54
55
56
57
58
59 public DefaultIoFuture(IoSession session) {
60 this.session = session;
61 this.lock = this;
62 }
63
64
65
66
67 public IoSession getSession() {
68 return session;
69 }
70
71
72
73
74 @Deprecated
75 public void join() {
76 awaitUninterruptibly();
77 }
78
79
80
81
82 @Deprecated
83 public boolean join(long timeoutMillis) {
84 return awaitUninterruptibly(timeoutMillis);
85 }
86
87
88
89
90 public IoFuture await() throws InterruptedException {
91 synchronized (lock) {
92 while (!ready) {
93 waiters++;
94 try {
95
96
97
98 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
99 } finally {
100 waiters--;
101 if (!ready) {
102 checkDeadLock();
103 }
104 }
105 }
106 }
107 return this;
108 }
109
110
111
112
113 public boolean await(long timeout, TimeUnit unit)
114 throws InterruptedException {
115 return await(unit.toMillis(timeout));
116 }
117
118
119
120
121 public boolean await(long timeoutMillis) throws InterruptedException {
122 return await0(timeoutMillis, true);
123 }
124
125
126
127
128 public IoFuture awaitUninterruptibly() {
129 try {
130 await0(Long.MAX_VALUE, false);
131 } catch ( InterruptedException ie) {
132
133 }
134
135 return this;
136 }
137
138
139
140
141 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
142 return awaitUninterruptibly(unit.toMillis(timeout));
143 }
144
145
146
147
148 public boolean awaitUninterruptibly(long timeoutMillis) {
149 try {
150 return await0(timeoutMillis, false);
151 } catch (InterruptedException e) {
152 throw new InternalError();
153 }
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
170 long endTime = System.currentTimeMillis() + timeoutMillis;
171
172 synchronized (lock) {
173 if (ready) {
174 return ready;
175 } else if (timeoutMillis <= 0) {
176 return ready;
177 }
178
179 waiters++;
180 try {
181 for (;;) {
182 try {
183 long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
184 lock.wait(timeOut);
185 } catch (InterruptedException e) {
186 if (interruptable) {
187 throw e;
188 }
189 }
190
191 if (ready) {
192 return true;
193 } else {
194 if (endTime < System.currentTimeMillis()) {
195 return ready;
196 }
197 }
198 }
199 } finally {
200 waiters--;
201 if (!ready) {
202 checkDeadLock();
203 }
204 }
205 }
206 }
207
208
209
210
211
212
213
214 private void checkDeadLock() {
215
216 if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
217 this instanceof ReadFuture || this instanceof ConnectFuture)) {
218 return;
219 }
220
221
222
223
224
225
226
227 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
228
229
230 for (StackTraceElement s: stackTrace) {
231 if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
232 IllegalStateException e = new IllegalStateException( "t" );
233 e.getStackTrace();
234 throw new IllegalStateException(
235 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
236 ".await() was invoked from an I/O processor thread. " +
237 "Please use " + IoFutureListener.class.getSimpleName() +
238 " or configure a proper thread model alternatively.");
239 }
240 }
241
242
243 for (StackTraceElement s: stackTrace) {
244 try {
245 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
246 if (IoProcessor.class.isAssignableFrom(cls)) {
247 throw new IllegalStateException(
248 "DEAD LOCK: " + IoFuture.class.getSimpleName() +
249 ".await() was invoked from an I/O processor thread. " +
250 "Please use " + IoFutureListener.class.getSimpleName() +
251 " or configure a proper thread model alternatively.");
252 }
253 } catch (Exception cnfe) {
254
255 }
256 }
257 }
258
259
260
261
262 public boolean isDone() {
263 synchronized (lock) {
264 return ready;
265 }
266 }
267
268
269
270
271 public void setValue(Object newValue) {
272 synchronized (lock) {
273
274 if (ready) {
275 return;
276 }
277
278 result = newValue;
279 ready = true;
280 if (waiters > 0) {
281 lock.notifyAll();
282 }
283 }
284
285 notifyListeners();
286 }
287
288
289
290
291 protected Object getValue() {
292 synchronized (lock) {
293 return result;
294 }
295 }
296
297
298
299
300 public IoFuture addListener(IoFutureListener<?> listener) {
301 if (listener == null) {
302 throw new NullPointerException("listener");
303 }
304
305 boolean notifyNow = false;
306 synchronized (lock) {
307 if (ready) {
308 notifyNow = true;
309 } else {
310 if (firstListener == null) {
311 firstListener = listener;
312 } else {
313 if (otherListeners == null) {
314 otherListeners = new ArrayList<IoFutureListener<?>>(1);
315 }
316 otherListeners.add(listener);
317 }
318 }
319 }
320
321 if (notifyNow) {
322 notifyListener(listener);
323 }
324 return this;
325 }
326
327
328
329
330 public IoFuture removeListener(IoFutureListener<?> listener) {
331 if (listener == null) {
332 throw new NullPointerException("listener");
333 }
334
335 synchronized (lock) {
336 if (!ready) {
337 if (listener == firstListener) {
338 if (otherListeners != null && !otherListeners.isEmpty()) {
339 firstListener = otherListeners.remove(0);
340 } else {
341 firstListener = null;
342 }
343 } else if (otherListeners != null) {
344 otherListeners.remove(listener);
345 }
346 }
347 }
348
349 return this;
350 }
351
352 private void notifyListeners() {
353
354
355
356 if (firstListener != null) {
357 notifyListener(firstListener);
358 firstListener = null;
359
360 if (otherListeners != null) {
361 for (IoFutureListener<?> l : otherListeners) {
362 notifyListener(l);
363 }
364 otherListeners = null;
365 }
366 }
367 }
368
369 @SuppressWarnings("unchecked")
370 private void notifyListener(IoFutureListener l) {
371 try {
372 l.operationComplete(this);
373 } catch (Throwable t) {
374 ExceptionMonitor.getInstance().exceptionCaught(t);
375 }
376 }
377 }