View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.future;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27  import org.apache.mina.core.service.IoProcessor;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.util.ExceptionMonitor;
30  
31  
32  /**
33   * A default implementation of {@link IoFuture} associated with
34   * an {@link IoSession}.
35   * 
36   * @author The Apache MINA Project (dev@mina.apache.org)
37   * @version $Rev:671827 $, $Date:2008-06-26 09:49:48 +0100 (jeu., 26 juin 2008) $
38   */
39  public class DefaultIoFuture implements IoFuture {
40  
41      /** A number of seconds to wait between two deadlock controls ( 5 seconds ) */
42      private static final int DEAD_LOCK_CHECK_INTERVAL = 5000;
43  
44      private final IoSession session;
45      
46      /** A lock used by the wait() method */
47      private final Object lock;
48      private IoFutureListener<?> firstListener;
49      private List<IoFutureListener<?>> otherListeners;
50      private Object result;
51      private boolean ready;
52      private int waiters;
53  
54      /**
55       * Creates a new instance associated with an {@link IoSession}.
56       *
57       * @param session an {@link IoSession} which is associated with this future
58       */
59      public DefaultIoFuture(IoSession session) {
60          this.session = session;
61          this.lock = this;
62      }
63  
64      /**
65       * {@inheritDoc}
66       */
67      public IoSession getSession() {
68          return session;
69      }
70  
71      /**
72       * @deprecated Replaced with {@link #awaitUninterruptibly()}.
73       */
74      @Deprecated
75      public void join() {
76          awaitUninterruptibly();
77      }
78  
79      /**
80       * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
81       */
82      @Deprecated
83      public boolean join(long timeoutMillis) {
84          return awaitUninterruptibly(timeoutMillis);
85      }
86  
87      /**
88       * {@inheritDoc}
89       */
90      public IoFuture await() throws InterruptedException {
91          synchronized (lock) {
92              while (!ready) {
93                  waiters++;
94                  try {
95                      // Wait for a notify, or if no notify is called,
96                      // assume that we have a deadlock and exit the 
97                      // loop to check for a potential deadlock.
98                      lock.wait(DEAD_LOCK_CHECK_INTERVAL);
99                  } finally {
100                     waiters--;
101                     if (!ready) {
102                         checkDeadLock();
103                     }
104                 }
105             }
106         }
107         return this;
108     }
109 
110     /**
111      * {@inheritDoc}
112      */
113     public boolean await(long timeout, TimeUnit unit)
114             throws InterruptedException {
115         return await(unit.toMillis(timeout));
116     }
117 
118     /**
119      * {@inheritDoc}
120      */
121     public boolean await(long timeoutMillis) throws InterruptedException {
122         return await0(timeoutMillis, true);
123     }
124 
125     /**
126      * {@inheritDoc}
127      */
128     public IoFuture awaitUninterruptibly() {
129         synchronized (lock) {
130             while (!ready) {
131                 waiters++;
132                 try {
133                     lock.wait(DEAD_LOCK_CHECK_INTERVAL);
134                 } catch (InterruptedException ie) {
135                     // Do nothing : this catch is just mandatory by contract
136                 } finally {
137                     waiters--;
138                     if (!ready) {
139                         checkDeadLock();
140                     }
141                 }
142             }
143         }
144 
145         return this;
146     }
147 
148     /**
149      * {@inheritDoc}
150      */
151     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
152         return awaitUninterruptibly(unit.toMillis(timeout));
153     }
154 
155     /**
156      * {@inheritDoc}
157      */
158     public boolean awaitUninterruptibly(long timeoutMillis) {
159         try {
160             return await0(timeoutMillis, false);
161         } catch (InterruptedException e) {
162             throw new InternalError();
163         }
164     }
165 
166     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
167         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
168         long waitTime = timeoutMillis;
169 
170         synchronized (lock) {
171             if (ready) {
172                 return ready;
173             } else if (waitTime <= 0) {
174                 return ready;
175             }
176 
177             waiters++;
178             try {
179                 for (;;) {
180                     try {
181                         lock.wait(Math.min(waitTime, DEAD_LOCK_CHECK_INTERVAL));
182                     } catch (InterruptedException e) {
183                         if (interruptable) {
184                             throw e;
185                         }
186                     }
187 
188                     if (ready) {
189                         return true;
190                     } else {
191                         waitTime = timeoutMillis
192                                 - (System.currentTimeMillis() - startTime);
193                         if (waitTime <= 0) {
194                             return ready;
195                         }
196                     }
197                 }
198             } finally {
199                 waiters--;
200                 if (!ready) {
201                     checkDeadLock();
202                 }
203             }
204         }
205     }
206 
207     
208     /**
209      * 
210      * TODO checkDeadLock.
211      *
212      */
213     private void checkDeadLock() {
214         // Only read / write / connect / write future can cause dead lock. 
215         if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
216               this instanceof ReadFuture || this instanceof ConnectFuture)) {
217             return;
218         }
219         
220         // Get the current thread stackTrace. 
221         // Using Thread.currentThread().getStackTrace() is the best solution,
222         // even if slightly less efficient than doing a new Exception().getStackTrace(),
223         // as internally, it does exactly the same thing. The advantage of using
224         // this solution is that we may benefit some improvement with some
225         // future versions of Java.
226         StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
227 
228         // Simple and quick check.
229         for (StackTraceElement s: stackTrace) {
230             if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
231                 IllegalStateException e = new IllegalStateException( "t" );
232                 e.getStackTrace();
233                 throw new IllegalStateException(
234                     "DEAD LOCK: " + IoFuture.class.getSimpleName() +
235                     ".await() was invoked from an I/O processor thread.  " +
236                     "Please use " + IoFutureListener.class.getSimpleName() +
237                     " or configure a proper thread model alternatively.");
238             }
239         }
240 
241         // And then more precisely.
242         for (StackTraceElement s: stackTrace) {
243             try {
244                 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
245                 if (IoProcessor.class.isAssignableFrom(cls)) {
246                     throw new IllegalStateException(
247                         "DEAD LOCK: " + IoFuture.class.getSimpleName() +
248                         ".await() was invoked from an I/O processor thread.  " +
249                         "Please use " + IoFutureListener.class.getSimpleName() +
250                         " or configure a proper thread model alternatively.");
251                 }
252             } catch (Exception cnfe) {
253                 // Ignore
254             }
255         }
256     }
257 
258     /**
259      * {@inheritDoc}
260      */
261     public boolean isDone() {
262         synchronized (lock) {
263             return ready;
264         }
265     }
266 
267     /**
268      * Sets the result of the asynchronous operation, and mark it as finished.
269      */
270     public void setValue(Object newValue) {
271         synchronized (lock) {
272             // Allow only once.
273             if (ready) {
274                 return;
275             }
276 
277             result = newValue;
278             ready = true;
279             if (waiters > 0) {
280                 lock.notifyAll();
281             }
282         }
283 
284         notifyListeners();
285     }
286 
287     /**
288      * Returns the result of the asynchronous operation.
289      */
290     protected Object getValue() {
291         synchronized (lock) {
292             return result;
293         }
294     }
295 
296     /**
297      * {@inheritDoc}
298      */
299     public IoFuture addListener(IoFutureListener<?> listener) {
300         if (listener == null) {
301             throw new NullPointerException("listener");
302         }
303 
304         boolean notifyNow = false;
305         synchronized (lock) {
306             if (ready) {
307                 notifyNow = true;
308             } else {
309                 if (firstListener == null) {
310                     firstListener = listener;
311                 } else {
312                     if (otherListeners == null) {
313                         otherListeners = new ArrayList<IoFutureListener<?>>(1);
314                     }
315                     otherListeners.add(listener);
316                 }
317             }
318         }
319 
320         if (notifyNow) {
321             notifyListener(listener);
322         }
323         return this;
324     }
325 
326     /**
327      * {@inheritDoc}
328      */
329     public IoFuture removeListener(IoFutureListener<?> listener) {
330         if (listener == null) {
331             throw new NullPointerException("listener");
332         }
333 
334         synchronized (lock) {
335             if (!ready) {
336                 if (listener == firstListener) {
337                     if (otherListeners != null && !otherListeners.isEmpty()) {
338                         firstListener = otherListeners.remove(0);
339                     } else {
340                         firstListener = null;
341                     }
342                 } else if (otherListeners != null) {
343                     otherListeners.remove(listener);
344                 }
345             }
346         }
347 
348         return this;
349     }
350 
351     private void notifyListeners() {
352         // There won't be any visibility problem or concurrent modification
353         // because 'ready' flag will be checked against both addListener and
354         // removeListener calls.
355         if (firstListener != null) {
356             notifyListener(firstListener);
357             firstListener = null;
358 
359             if (otherListeners != null) {
360                 for (IoFutureListener<?> l : otherListeners) {
361                     notifyListener(l);
362                 }
363                 otherListeners = null;
364             }
365         }
366     }
367 
368     @SuppressWarnings("unchecked")
369     private void notifyListener(IoFutureListener l) {
370         try {
371             l.operationComplete(this);
372         } catch (Throwable t) {
373             ExceptionMonitor.getInstance().exceptionCaught(t);
374         }
375     }
376 }