View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.future;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27  import org.apache.mina.core.service.IoProcessor;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.util.ExceptionMonitor;
30  
31  
32  /**
33   * A default implementation of {@link IoFuture} associated with
34   * an {@link IoSession}.
35   * 
36   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
37   */
38  public class DefaultIoFuture implements IoFuture {
39  
40      /** A number of seconds to wait between two deadlock controls ( 5 seconds ) */
41      private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
42  
43      /** The associated session */
44      private final IoSession session;
45      
46      /** A lock used by the wait() method */
47      private final Object lock;
48      private IoFutureListener<?> firstListener;
49      private List<IoFutureListener<?>> otherListeners;
50      private Object result;
51      private boolean ready;
52      private int waiters;
53  
54      /**
55       * Creates a new instance associated with an {@link IoSession}.
56       *
57       * @param session an {@link IoSession} which is associated with this future
58       */
59      public DefaultIoFuture(IoSession session) {
60          this.session = session;
61          this.lock = this;
62      }
63  
64      /**
65       * {@inheritDoc}
66       */
67      public IoSession getSession() {
68          return session;
69      }
70  
71      /**
72       * @deprecated Replaced with {@link #awaitUninterruptibly()}.
73       */
74      @Deprecated
75      public void join() {
76          awaitUninterruptibly();
77      }
78  
79      /**
80       * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
81       */
82      @Deprecated
83      public boolean join(long timeoutMillis) {
84          return awaitUninterruptibly(timeoutMillis);
85      }
86  
87      /**
88       * {@inheritDoc}
89       */
90      public IoFuture await() throws InterruptedException {
91          synchronized (lock) {
92              while (!ready) {
93                  waiters++;
94                  try {
95                      // Wait for a notify, or if no notify is called,
96                      // assume that we have a deadlock and exit the 
97                      // loop to check for a potential deadlock.
98                      lock.wait(DEAD_LOCK_CHECK_INTERVAL);
99                  } finally {
100                     waiters--;
101                     if (!ready) {
102                         checkDeadLock();
103                     }
104                 }
105             }
106         }
107         return this;
108     }
109 
110     /**
111      * {@inheritDoc}
112      */
113     public boolean await(long timeout, TimeUnit unit)
114             throws InterruptedException {
115         return await(unit.toMillis(timeout));
116     }
117 
118     /**
119      * {@inheritDoc}
120      */
121     public boolean await(long timeoutMillis) throws InterruptedException {
122         return await0(timeoutMillis, true);
123     }
124 
125     /**
126      * {@inheritDoc}
127      */
128     public IoFuture awaitUninterruptibly() {
129         try {
130             await0(Long.MAX_VALUE, false);
131         } catch ( InterruptedException ie) {
132             // Do nothing : this catch is just mandatory by contract
133         }
134         
135         return this;
136     }
137 
138     /**
139      * {@inheritDoc}
140      */
141     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
142         return awaitUninterruptibly(unit.toMillis(timeout));
143     }
144 
145     /**
146      * {@inheritDoc}
147      */
148     public boolean awaitUninterruptibly(long timeoutMillis) {
149         try {
150             return await0(timeoutMillis, false);
151         } catch (InterruptedException e) {
152             throw new InternalError();
153         }
154     }
155 
156     /**
157      * Wait for the Future to be ready. If the requested delay is 0 or 
158      * negative, this method immediately returns the value of the 
159      * 'ready' flag. 
160      * Every 5 second, the wait will be suspended to be able to check if 
161      * there is a deadlock or not.
162      * 
163      * @param timeoutMillis The delay we will wait for the Future to be ready
164      * @param interruptable Tells if the wait can be interrupted or not
165      * @return <code>true</code> if the Future is ready
166      * @throws InterruptedException If the thread has been interrupted
167      * when it's not allowed.
168      */
169     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
170         long endTime = System.currentTimeMillis() + timeoutMillis;
171         
172         if (endTime < 0) {
173             endTime = Long.MAX_VALUE;
174         }
175 
176         synchronized (lock) {
177             if (ready) {
178                 return ready;
179             } else if (timeoutMillis <= 0) {
180                 return ready;
181             }
182 
183             waiters++;
184             
185             try {
186                 for (;;) {
187                     try {
188                         long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
189                         lock.wait(timeOut);
190                     } catch (InterruptedException e) {
191                         if (interruptable) {
192                             throw e;
193                         }
194                     }
195 
196                     if (ready) {
197                         return true;
198                     }
199                     
200                     if (endTime < System.currentTimeMillis()) {
201                         return ready;
202                     }
203                 }
204             } finally {
205                 waiters--;
206                 if (!ready) {
207                     checkDeadLock();
208                 }
209             }
210         }
211     }
212 
213     
214     /**
215      * 
216      * TODO checkDeadLock.
217      *
218      */
219     private void checkDeadLock() {
220         // Only read / write / connect / write future can cause dead lock. 
221         if (!(this instanceof CloseFuture || this instanceof WriteFuture ||
222               this instanceof ReadFuture || this instanceof ConnectFuture)) {
223             return;
224         }
225         
226         // Get the current thread stackTrace. 
227         // Using Thread.currentThread().getStackTrace() is the best solution,
228         // even if slightly less efficient than doing a new Exception().getStackTrace(),
229         // as internally, it does exactly the same thing. The advantage of using
230         // this solution is that we may benefit some improvement with some
231         // future versions of Java.
232         StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
233 
234         // Simple and quick check.
235         for (StackTraceElement s: stackTrace) {
236             if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
237                 IllegalStateException e = new IllegalStateException( "t" );
238                 e.getStackTrace();
239                 throw new IllegalStateException(
240                     "DEAD LOCK: " + IoFuture.class.getSimpleName() +
241                     ".await() was invoked from an I/O processor thread.  " +
242                     "Please use " + IoFutureListener.class.getSimpleName() +
243                     " or configure a proper thread model alternatively.");
244             }
245         }
246 
247         // And then more precisely.
248         for (StackTraceElement s: stackTrace) {
249             try {
250                 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
251                 if (IoProcessor.class.isAssignableFrom(cls)) {
252                     throw new IllegalStateException(
253                         "DEAD LOCK: " + IoFuture.class.getSimpleName() +
254                         ".await() was invoked from an I/O processor thread.  " +
255                         "Please use " + IoFutureListener.class.getSimpleName() +
256                         " or configure a proper thread model alternatively.");
257                 }
258             } catch (Exception cnfe) {
259                 // Ignore
260             }
261         }
262     }
263 
264     /**
265      * {@inheritDoc}
266      */
267     public boolean isDone() {
268         synchronized (lock) {
269             return ready;
270         }
271     }
272 
273     /**
274      * Sets the result of the asynchronous operation, and mark it as finished.
275      */
276     public void setValue(Object newValue) {
277         synchronized (lock) {
278             // Allow only once.
279             if (ready) {
280                 return;
281             }
282 
283             result = newValue;
284             ready = true;
285             if (waiters > 0) {
286                 lock.notifyAll();
287             }
288         }
289 
290         notifyListeners();
291     }
292 
293     /**
294      * Returns the result of the asynchronous operation.
295      */
296     protected Object getValue() {
297         synchronized (lock) {
298             return result;
299         }
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     public IoFuture addListener(IoFutureListener<?> listener) {
306         if (listener == null) {
307             throw new IllegalArgumentException("listener");
308         }
309 
310         boolean notifyNow = false;
311         synchronized (lock) {
312             if (ready) {
313                 notifyNow = true;
314             } else {
315                 if (firstListener == null) {
316                     firstListener = listener;
317                 } else {
318                     if (otherListeners == null) {
319                         otherListeners = new ArrayList<IoFutureListener<?>>(1);
320                     }
321                     otherListeners.add(listener);
322                 }
323             }
324         }
325 
326         if (notifyNow) {
327             notifyListener(listener);
328         }
329         return this;
330     }
331 
332     /**
333      * {@inheritDoc}
334      */
335     public IoFuture removeListener(IoFutureListener<?> listener) {
336         if (listener == null) {
337             throw new IllegalArgumentException("listener");
338         }
339 
340         synchronized (lock) {
341             if (!ready) {
342                 if (listener == firstListener) {
343                     if (otherListeners != null && !otherListeners.isEmpty()) {
344                         firstListener = otherListeners.remove(0);
345                     } else {
346                         firstListener = null;
347                     }
348                 } else if (otherListeners != null) {
349                     otherListeners.remove(listener);
350                 }
351             }
352         }
353 
354         return this;
355     }
356 
357     private void notifyListeners() {
358         // There won't be any visibility problem or concurrent modification
359         // because 'ready' flag will be checked against both addListener and
360         // removeListener calls.
361         if (firstListener != null) {
362             notifyListener(firstListener);
363             firstListener = null;
364 
365             if (otherListeners != null) {
366                 for (IoFutureListener<?> l : otherListeners) {
367                     notifyListener(l);
368                 }
369                 otherListeners = null;
370             }
371         }
372     }
373 
374     @SuppressWarnings("unchecked")
375     private void notifyListener(IoFutureListener l) {
376         try {
377             l.operationComplete(this);
378         } catch (Throwable t) {
379             ExceptionMonitor.getInstance().exceptionCaught(t);
380         }
381     }
382 }