View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.future;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27  import org.apache.mina.core.service.IoProcessor;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.util.ExceptionMonitor;
30  
31  
32  /**
33   * A default implementation of {@link IoFuture} associated with
34   * an {@link IoSession}.
35   * 
36   * @author The Apache MINA Project (dev@mina.apache.org)
37   */
38  public class DefaultIoFuture implements IoFuture {
39  
40      /** A number of seconds to wait between two deadlock controls ( 5 seconds ) */
41      private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
42  
43      /** The associated session */
44      private final IoSession session;
45      
46      /** A lock used by the wait() method */
47      private final Object lock;
48      private IoFutureListener<?> firstListener;
49      private List<IoFutureListener<?>> otherListeners;
50      private Object result;
51      private boolean ready;
52      private int waiters;
53  
54      /**
55       * Creates a new instance associated with an {@link IoSession}.
56       *
57       * @param session an {@link IoSession} which is associated with this future
58       */
59      public DefaultIoFuture(IoSession session) {
60          this.session = session;
61          this.lock = this;
62      }
63  
64      /**
65       * {@inheritDoc}
66       */
67      public IoSession getSession() {
68          return session;
69      }
70  
71      /**
72       * @deprecated Replaced with {@link #awaitUninterruptibly()}.
73       */
74      @Deprecated
75      public void join() {
76          awaitUninterruptibly();
77      }
78  
79      /**
80       * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
81       */
82      @Deprecated
83      public boolean join(long timeoutMillis) {
84          return awaitUninterruptibly(timeoutMillis);
85      }
86  
87      /**
88       * {@inheritDoc}
89       */
90      public IoFuture await() throws InterruptedException {
91          synchronized (lock) {
92              while (!ready) {
93                  waiters++;
94                  try {
95                      // Wait for a notify, or if no notify is called,
96                      // assume that we have a deadlock and exit the 
97                      // loop to check for a potential deadlock.
98                      lock.wait(DEAD_LOCK_CHECK_INTERVAL);
99                  } finally {
100                     waiters--;
101                     if (!ready) {
102                         checkDeadLock();
103                     }
104                 }
105             }
106         }
107         return this;
108     }
109 
110     /**
111      * {@inheritDoc}
112      */
113     public boolean await(long timeout, TimeUnit unit)
114             throws InterruptedException {
115         return await(unit.toMillis(timeout));
116     }
117 
118     /**
119      * {@inheritDoc}
120      */
121     public boolean await(long timeoutMillis) throws InterruptedException {
122         return await0(timeoutMillis, true);
123     }
124 
125     /**
126      * {@inheritDoc}
127      */
128     public IoFuture awaitUninterruptibly() {
129         try {
130             await0(Long.MAX_VALUE, false);
131         } catch ( InterruptedException ie) {
132             // Do nothing : this catch is just mandatory by contract
133         }
134         
135         return this;
136     }
137 
138     /**
139      * {@inheritDoc}
140      */
141     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
142         return awaitUninterruptibly(unit.toMillis(timeout));
143     }
144 
145     /**
146      * {@inheritDoc}
147      */
148     public boolean awaitUninterruptibly(long timeoutMillis) {
149         try {
150             return await0(timeoutMillis, false);
151         } catch (InterruptedException e) {
152             throw new InternalError();
153         }
154     }
155 
156     /**
157      * Wait for the Future to be ready. If the requested delay is 0 or 
158      * negative, this method immediately returns the value of the 
159      * 'ready' flag. 
160      * Every 5 second, the wait will be suspended to be able to check if 
161      * there is a deadlock or not.
162      * 
163      * @param timeoutMillis The delay we will wait for the Future to be ready
164      * @param interruptable Tells if the wait can be interrupted or not
165      * @return <code>true</code> if the Future is ready
166      * @throws InterruptedException If the thread has been interrupted
167      * when it's not allowed.
168      */
169     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
170         long endTime = System.currentTimeMillis() + timeoutMillis;
171 
172         synchronized (lock) {
173             if (ready) {
174                 return ready;
175             } else if (timeoutMillis <= 0) {
176                 return ready;
177             }
178 
179             waiters++;
180             try {
181                 for (;;) {
182                     try {
183                         long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
184                         lock.wait(timeOut);
185                     } catch (InterruptedException e) {
186                         if (interruptable) {
187                             throw e;
188                         }
189                     }
190 
191                     if (ready) {
192                         return true;
193                     } else {
194                         if (endTime < System.currentTimeMillis()) {
195                             return ready;
196                         }
197                     }
198                 }
199             } finally {
200                 waiters--;
201                 if (!ready) {
202                     checkDeadLock();
203                 }
204             }
205         }
206     }
207 
208     
209     /**
210      * 
211      * TODO checkDeadLock.
212      *
213      */
214     private void checkDeadLock() {
215         // Only read / write / connect / write future can cause dead lock. 
216         if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
217               this instanceof ReadFuture || this instanceof ConnectFuture)) {
218             return;
219         }
220         
221         // Get the current thread stackTrace. 
222         // Using Thread.currentThread().getStackTrace() is the best solution,
223         // even if slightly less efficient than doing a new Exception().getStackTrace(),
224         // as internally, it does exactly the same thing. The advantage of using
225         // this solution is that we may benefit some improvement with some
226         // future versions of Java.
227         StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
228 
229         // Simple and quick check.
230         for (StackTraceElement s: stackTrace) {
231             if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
232                 IllegalStateException e = new IllegalStateException( "t" );
233                 e.getStackTrace();
234                 throw new IllegalStateException(
235                     "DEAD LOCK: " + IoFuture.class.getSimpleName() +
236                     ".await() was invoked from an I/O processor thread.  " +
237                     "Please use " + IoFutureListener.class.getSimpleName() +
238                     " or configure a proper thread model alternatively.");
239             }
240         }
241 
242         // And then more precisely.
243         for (StackTraceElement s: stackTrace) {
244             try {
245                 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
246                 if (IoProcessor.class.isAssignableFrom(cls)) {
247                     throw new IllegalStateException(
248                         "DEAD LOCK: " + IoFuture.class.getSimpleName() +
249                         ".await() was invoked from an I/O processor thread.  " +
250                         "Please use " + IoFutureListener.class.getSimpleName() +
251                         " or configure a proper thread model alternatively.");
252                 }
253             } catch (Exception cnfe) {
254                 // Ignore
255             }
256         }
257     }
258 
259     /**
260      * {@inheritDoc}
261      */
262     public boolean isDone() {
263         synchronized (lock) {
264             return ready;
265         }
266     }
267 
268     /**
269      * Sets the result of the asynchronous operation, and mark it as finished.
270      */
271     public void setValue(Object newValue) {
272         synchronized (lock) {
273             // Allow only once.
274             if (ready) {
275                 return;
276             }
277 
278             result = newValue;
279             ready = true;
280             if (waiters > 0) {
281                 lock.notifyAll();
282             }
283         }
284 
285         notifyListeners();
286     }
287 
288     /**
289      * Returns the result of the asynchronous operation.
290      */
291     protected Object getValue() {
292         synchronized (lock) {
293             return result;
294         }
295     }
296 
297     /**
298      * {@inheritDoc}
299      */
300     public IoFuture addListener(IoFutureListener<?> listener) {
301         if (listener == null) {
302             throw new NullPointerException("listener");
303         }
304 
305         boolean notifyNow = false;
306         synchronized (lock) {
307             if (ready) {
308                 notifyNow = true;
309             } else {
310                 if (firstListener == null) {
311                     firstListener = listener;
312                 } else {
313                     if (otherListeners == null) {
314                         otherListeners = new ArrayList<IoFutureListener<?>>(1);
315                     }
316                     otherListeners.add(listener);
317                 }
318             }
319         }
320 
321         if (notifyNow) {
322             notifyListener(listener);
323         }
324         return this;
325     }
326 
327     /**
328      * {@inheritDoc}
329      */
330     public IoFuture removeListener(IoFutureListener<?> listener) {
331         if (listener == null) {
332             throw new NullPointerException("listener");
333         }
334 
335         synchronized (lock) {
336             if (!ready) {
337                 if (listener == firstListener) {
338                     if (otherListeners != null && !otherListeners.isEmpty()) {
339                         firstListener = otherListeners.remove(0);
340                     } else {
341                         firstListener = null;
342                     }
343                 } else if (otherListeners != null) {
344                     otherListeners.remove(listener);
345                 }
346             }
347         }
348 
349         return this;
350     }
351 
352     private void notifyListeners() {
353         // There won't be any visibility problem or concurrent modification
354         // because 'ready' flag will be checked against both addListener and
355         // removeListener calls.
356         if (firstListener != null) {
357             notifyListener(firstListener);
358             firstListener = null;
359 
360             if (otherListeners != null) {
361                 for (IoFutureListener<?> l : otherListeners) {
362                     notifyListener(l);
363                 }
364                 otherListeners = null;
365             }
366         }
367     }
368 
369     @SuppressWarnings("unchecked")
370     private void notifyListener(IoFutureListener l) {
371         try {
372             l.operationComplete(this);
373         } catch (Throwable t) {
374             ExceptionMonitor.getInstance().exceptionCaught(t);
375         }
376     }
377 }