001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.future;
021
022import java.util.ArrayList;
023import java.util.List;
024import java.util.concurrent.TimeUnit;
025
026import org.apache.mina.core.polling.AbstractPollingIoProcessor;
027import org.apache.mina.core.service.IoProcessor;
028import org.apache.mina.core.session.IoSession;
029import org.apache.mina.util.ExceptionMonitor;
030
031/**
032 * A default implementation of {@link IoFuture} associated with
033 * an {@link IoSession}.
034 * 
035 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
036 */
037public class DefaultIoFuture implements IoFuture {
038
039    /** A number of milliseconds to wait between two deadlock controls ( 5 seconds ) */
040    private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
041
042    /** The associated session */
043    private final IoSession session;
044
045    /** A lock used by the wait() method */
046    private final Object lock;
047
048    /** The first listener. This is easier to have this variable
049     * when we most of the time have one single listener */
050    private IoFutureListener<?> firstListener;
051
052    /** All the other listeners, in case we have more than one */
053    private List<IoFutureListener<?>> otherListeners;
054
055    private Object result;
056
057    /** The flag used to determinate if the Future is completed or not */
058    private boolean ready;
059
060    /** A counter for the number of threads waiting on this future */
061    private int waiters;
062
063    /**
064     * Creates a new instance associated with an {@link IoSession}.
065     *
066     * @param session an {@link IoSession} which is associated with this future
067     */
068    public DefaultIoFuture(IoSession session) {
069        this.session = session;
070        this.lock = this;
071    }
072
073    /**
074     * {@inheritDoc}
075     */
076    public IoSession getSession() {
077        return session;
078    }
079
080    /**
081     * @deprecated Replaced with {@link #awaitUninterruptibly()}.
082     */
083    @Deprecated
084    public void join() {
085        awaitUninterruptibly();
086    }
087
088    /**
089     * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
090     */
091    @Deprecated
092    public boolean join(long timeoutMillis) {
093        return awaitUninterruptibly(timeoutMillis);
094    }
095
096    /**
097     * {@inheritDoc}
098     */
099    public IoFuture await() throws InterruptedException {
100        synchronized (lock) {
101            while (!ready) {
102                waiters++;
103                
104                try {
105                    // Wait for a notify, or if no notify is called,
106                    // assume that we have a deadlock and exit the
107                    // loop to check for a potential deadlock.
108                    lock.wait(DEAD_LOCK_CHECK_INTERVAL);
109                } finally {
110                    waiters--;
111                    
112                    if (!ready) {
113                        checkDeadLock();
114                    }
115                }
116            }
117        }
118        
119        return this;
120    }
121
122    /**
123     * {@inheritDoc}
124     */
125    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
126        return await0(unit.toMillis(timeout), true);
127    }
128
129    /**
130     * {@inheritDoc}
131     */
132    public boolean await(long timeoutMillis) throws InterruptedException {
133        return await0(timeoutMillis, true);
134    }
135
136    /**
137     * {@inheritDoc}
138     */
139    public IoFuture awaitUninterruptibly() {
140        try {
141            await0(Long.MAX_VALUE, false);
142        } catch (InterruptedException ie) {
143            // Do nothing : this catch is just mandatory by contract
144        }
145
146        return this;
147    }
148
149    /**
150     * {@inheritDoc}
151     */
152    public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
153        try {
154            return await0(unit.toMillis(timeout), false);
155        } catch (InterruptedException e) {
156            throw new InternalError();
157        }
158    }
159
160    /**
161     * {@inheritDoc}
162     */
163    public boolean awaitUninterruptibly(long timeoutMillis) {
164        try {
165            return await0(timeoutMillis, false);
166        } catch (InterruptedException e) {
167            throw new InternalError();
168        }
169    }
170
171    /**
172     * Wait for the Future to be ready. If the requested delay is 0 or
173     * negative, this method immediately returns the value of the
174     * 'ready' flag.
175     * Every 5 second, the wait will be suspended to be able to check if
176     * there is a deadlock or not.
177     * 
178     * @param timeoutMillis The delay we will wait for the Future to be ready
179     * @param interruptable Tells if the wait can be interrupted or not
180     * @return <code>true</code> if the Future is ready
181     * @throws InterruptedException If the thread has been interrupted
182     * when it's not allowed.
183     */
184    private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
185        long endTime = System.currentTimeMillis() + timeoutMillis;
186
187        if (endTime < 0) {
188            endTime = Long.MAX_VALUE;
189        }
190
191        synchronized (lock) {
192            // We can quit if the ready flag is set to true, or if
193            // the timeout is set to 0 or below : we don't wait in this case.
194            if (ready||(timeoutMillis <= 0)) {
195                return ready;
196            }
197
198            // The operation is not completed : we have to wait
199            waiters++;
200
201            try {
202                for (;;) {
203                    try {
204                        long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
205                        
206                        // Wait for the requested period of time,
207                        // but every DEAD_LOCK_CHECK_INTERVAL seconds, we will
208                        // check that we aren't blocked.
209                        lock.wait(timeOut);
210                    } catch (InterruptedException e) {
211                        if (interruptable) {
212                            throw e;
213                        }
214                    }
215
216                    if (ready || (endTime < System.currentTimeMillis())) {
217                        return ready;
218                    } else {
219                        // Take a chance, detect a potential deadlock
220                        checkDeadLock();
221                    }
222                }
223            } finally {
224                // We get here for 3 possible reasons :
225                // 1) We have been notified (the operation has completed a way or another)
226                // 2) We have reached the timeout
227                // 3) The thread has been interrupted
228                // In any case, we decrement the number of waiters, and we get out.
229                waiters--;
230                
231                if (!ready) {
232                    checkDeadLock();
233                }
234            }
235        }
236    }
237
238    /**
239     * Check for a deadlock, ie look into the stack trace that we don't have already an 
240     * instance of the caller.
241     */
242    private void checkDeadLock() {
243        // Only read / write / connect / write future can cause dead lock.
244        if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) {
245            return;
246        }
247
248        // Get the current thread stackTrace.
249        // Using Thread.currentThread().getStackTrace() is the best solution,
250        // even if slightly less efficient than doing a new Exception().getStackTrace(),
251        // as internally, it does exactly the same thing. The advantage of using
252        // this solution is that we may benefit some improvement with some
253        // future versions of Java.
254        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
255
256        // Simple and quick check.
257        for (StackTraceElement stackElement : stackTrace) {
258            if (AbstractPollingIoProcessor.class.getName().equals(stackElement.getClassName())) {
259                IllegalStateException e = new IllegalStateException("t");
260                e.getStackTrace();
261                throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
262                        + ".await() was invoked from an I/O processor thread.  " + "Please use "
263                        + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively.");
264            }
265        }
266
267        // And then more precisely.
268        for (StackTraceElement s : stackTrace) {
269            try {
270                Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
271                if (IoProcessor.class.isAssignableFrom(cls)) {
272                    throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
273                            + ".await() was invoked from an I/O processor thread.  " + "Please use "
274                            + IoFutureListener.class.getSimpleName()
275                            + " or configure a proper thread model alternatively.");
276                }
277            } catch (Exception cnfe) {
278                // Ignore
279            }
280        }
281    }
282
283    /**
284     * {@inheritDoc}
285     */
286    public boolean isDone() {
287        synchronized (lock) {
288            return ready;
289        }
290    }
291
292    /**
293     * Sets the result of the asynchronous operation, and mark it as finished.
294     * 
295     * @param newValue The result to store into the Future
296     * @return {@code true} if the value has been set, {@code false} if
297     * the future already has a value (thus is in ready state)
298     */
299    public boolean setValue(Object newValue) {
300        synchronized (lock) {
301            // Allowed only once.
302            if (ready) {
303                return false;
304            }
305
306            result = newValue;
307            ready = true;
308            
309            // Now, if we have waiters, notofy them that the operation has completed
310            if (waiters > 0) {
311                lock.notifyAll();
312            }
313        }
314
315        // Last, not least, inform the listeners
316        notifyListeners();
317        
318        return true;
319    }
320
321    /**
322     * Returns the result of the asynchronous operation.
323     * 
324     * @return The stored value
325     */
326    protected Object getValue() {
327        synchronized (lock) {
328            return result;
329        }
330    }
331
332    /**
333     * {@inheritDoc}
334     */
335    public IoFuture addListener(IoFutureListener<?> listener) {
336        if (listener == null) {
337            throw new IllegalArgumentException("listener");
338        }
339
340        synchronized (lock) {
341            if (ready) {
342                // Shortcut : if the operation has completed, no need to 
343                // add a new listener, we just have to notify it. The existing
344                // listeners have already been notified anyway, when the 
345                // 'ready' flag has been set.
346                notifyListener(listener);
347            } else {
348                if (firstListener == null) {
349                    firstListener = listener;
350                } else {
351                    if (otherListeners == null) {
352                        otherListeners = new ArrayList<IoFutureListener<?>>(1);
353                    }
354                    
355                    otherListeners.add(listener);
356                }
357            }
358        }
359        
360        return this;
361    }
362
363    /**
364     * {@inheritDoc}
365     */
366    public IoFuture removeListener(IoFutureListener<?> listener) {
367        if (listener == null) {
368            throw new IllegalArgumentException("listener");
369        }
370
371        synchronized (lock) {
372            if (!ready) {
373                if (listener == firstListener) {
374                    if ((otherListeners != null) && !otherListeners.isEmpty()) {
375                        firstListener = otherListeners.remove(0);
376                    } else {
377                        firstListener = null;
378                    }
379                } else if (otherListeners != null) {
380                    otherListeners.remove(listener);
381                }
382            }
383        }
384
385        return this;
386    }
387
388    /**
389     * Notify the listeners, if we have some.
390     */
391    private void notifyListeners() {
392        // There won't be any visibility problem or concurrent modification
393        // because 'ready' flag will be checked against both addListener and
394        // removeListener calls.
395        if (firstListener != null) {
396            notifyListener(firstListener);
397            firstListener = null;
398
399            if (otherListeners != null) {
400                for (IoFutureListener<?> listener : otherListeners) {
401                    notifyListener(listener);
402                }
403                
404                otherListeners = null;
405            }
406        }
407    }
408
409    @SuppressWarnings("unchecked")
410    private void notifyListener(IoFutureListener listener) {
411        try {
412            listener.operationComplete(this);
413        } catch (Exception e) {
414            ExceptionMonitor.getInstance().exceptionCaught(e);
415        }
416    }
417}