001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.future; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.mina.core.polling.AbstractPollingIoProcessor; 027import org.apache.mina.core.service.IoProcessor; 028import org.apache.mina.core.session.IoSession; 029import org.apache.mina.util.ExceptionMonitor; 030 031/** 032 * A default implementation of {@link IoFuture} associated with 033 * an {@link IoSession}. 034 * 035 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 036 */ 037public class DefaultIoFuture implements IoFuture { 038 039 /** A number of milliseconds to wait between two deadlock controls ( 5 seconds ) */ 040 private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L; 041 042 /** The associated session */ 043 private final IoSession session; 044 045 /** A lock used by the wait() method */ 046 private final Object lock; 047 048 /** The first listener. This is easier to have this variable 049 * when we most of the time have one single listener */ 050 private IoFutureListener<?> firstListener; 051 052 /** All the other listeners, in case we have more than one */ 053 private List<IoFutureListener<?>> otherListeners; 054 055 private Object result; 056 057 /** The flag used to determinate if the Future is completed or not */ 058 private boolean ready; 059 060 /** A counter for the number of threads waiting on this future */ 061 private int waiters; 062 063 /** 064 * Creates a new instance associated with an {@link IoSession}. 065 * 066 * @param session an {@link IoSession} which is associated with this future 067 */ 068 public DefaultIoFuture(IoSession session) { 069 this.session = session; 070 this.lock = this; 071 } 072 073 /** 074 * {@inheritDoc} 075 */ 076 public IoSession getSession() { 077 return session; 078 } 079 080 /** 081 * @deprecated Replaced with {@link #awaitUninterruptibly()}. 082 */ 083 @Deprecated 084 public void join() { 085 awaitUninterruptibly(); 086 } 087 088 /** 089 * @deprecated Replaced with {@link #awaitUninterruptibly(long)}. 090 */ 091 @Deprecated 092 public boolean join(long timeoutMillis) { 093 return awaitUninterruptibly(timeoutMillis); 094 } 095 096 /** 097 * {@inheritDoc} 098 */ 099 public IoFuture await() throws InterruptedException { 100 synchronized (lock) { 101 while (!ready) { 102 waiters++; 103 104 try { 105 // Wait for a notify, or if no notify is called, 106 // assume that we have a deadlock and exit the 107 // loop to check for a potential deadlock. 108 lock.wait(DEAD_LOCK_CHECK_INTERVAL); 109 } finally { 110 waiters--; 111 112 if (!ready) { 113 checkDeadLock(); 114 } 115 } 116 } 117 } 118 119 return this; 120 } 121 122 /** 123 * {@inheritDoc} 124 */ 125 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 126 return await0(unit.toMillis(timeout), true); 127 } 128 129 /** 130 * {@inheritDoc} 131 */ 132 public boolean await(long timeoutMillis) throws InterruptedException { 133 return await0(timeoutMillis, true); 134 } 135 136 /** 137 * {@inheritDoc} 138 */ 139 public IoFuture awaitUninterruptibly() { 140 try { 141 await0(Long.MAX_VALUE, false); 142 } catch (InterruptedException ie) { 143 // Do nothing : this catch is just mandatory by contract 144 } 145 146 return this; 147 } 148 149 /** 150 * {@inheritDoc} 151 */ 152 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { 153 try { 154 return await0(unit.toMillis(timeout), false); 155 } catch (InterruptedException e) { 156 throw new InternalError(); 157 } 158 } 159 160 /** 161 * {@inheritDoc} 162 */ 163 public boolean awaitUninterruptibly(long timeoutMillis) { 164 try { 165 return await0(timeoutMillis, false); 166 } catch (InterruptedException e) { 167 throw new InternalError(); 168 } 169 } 170 171 /** 172 * Wait for the Future to be ready. If the requested delay is 0 or 173 * negative, this method immediately returns the value of the 174 * 'ready' flag. 175 * Every 5 second, the wait will be suspended to be able to check if 176 * there is a deadlock or not. 177 * 178 * @param timeoutMillis The delay we will wait for the Future to be ready 179 * @param interruptable Tells if the wait can be interrupted or not 180 * @return <code>true</code> if the Future is ready 181 * @throws InterruptedException If the thread has been interrupted 182 * when it's not allowed. 183 */ 184 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException { 185 long endTime = System.currentTimeMillis() + timeoutMillis; 186 187 if (endTime < 0) { 188 endTime = Long.MAX_VALUE; 189 } 190 191 synchronized (lock) { 192 // We can quit if the ready flag is set to true, or if 193 // the timeout is set to 0 or below : we don't wait in this case. 194 if (ready||(timeoutMillis <= 0)) { 195 return ready; 196 } 197 198 // The operation is not completed : we have to wait 199 waiters++; 200 201 try { 202 for (;;) { 203 try { 204 long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL); 205 206 // Wait for the requested period of time, 207 // but every DEAD_LOCK_CHECK_INTERVAL seconds, we will 208 // check that we aren't blocked. 209 lock.wait(timeOut); 210 } catch (InterruptedException e) { 211 if (interruptable) { 212 throw e; 213 } 214 } 215 216 if (ready || (endTime < System.currentTimeMillis())) { 217 return ready; 218 } else { 219 // Take a chance, detect a potential deadlock 220 checkDeadLock(); 221 } 222 } 223 } finally { 224 // We get here for 3 possible reasons : 225 // 1) We have been notified (the operation has completed a way or another) 226 // 2) We have reached the timeout 227 // 3) The thread has been interrupted 228 // In any case, we decrement the number of waiters, and we get out. 229 waiters--; 230 231 if (!ready) { 232 checkDeadLock(); 233 } 234 } 235 } 236 } 237 238 /** 239 * Check for a deadlock, ie look into the stack trace that we don't have already an 240 * instance of the caller. 241 */ 242 private void checkDeadLock() { 243 // Only read / write / connect / write future can cause dead lock. 244 if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) { 245 return; 246 } 247 248 // Get the current thread stackTrace. 249 // Using Thread.currentThread().getStackTrace() is the best solution, 250 // even if slightly less efficient than doing a new Exception().getStackTrace(), 251 // as internally, it does exactly the same thing. The advantage of using 252 // this solution is that we may benefit some improvement with some 253 // future versions of Java. 254 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 255 256 // Simple and quick check. 257 for (StackTraceElement stackElement : stackTrace) { 258 if (AbstractPollingIoProcessor.class.getName().equals(stackElement.getClassName())) { 259 IllegalStateException e = new IllegalStateException("t"); 260 e.getStackTrace(); 261 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName() 262 + ".await() was invoked from an I/O processor thread. " + "Please use " 263 + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively."); 264 } 265 } 266 267 // And then more precisely. 268 for (StackTraceElement s : stackTrace) { 269 try { 270 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName()); 271 if (IoProcessor.class.isAssignableFrom(cls)) { 272 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName() 273 + ".await() was invoked from an I/O processor thread. " + "Please use " 274 + IoFutureListener.class.getSimpleName() 275 + " or configure a proper thread model alternatively."); 276 } 277 } catch (Exception cnfe) { 278 // Ignore 279 } 280 } 281 } 282 283 /** 284 * {@inheritDoc} 285 */ 286 public boolean isDone() { 287 synchronized (lock) { 288 return ready; 289 } 290 } 291 292 /** 293 * Sets the result of the asynchronous operation, and mark it as finished. 294 * 295 * @param newValue The result to store into the Future 296 * @return {@code true} if the value has been set, {@code false} if 297 * the future already has a value (thus is in ready state) 298 */ 299 public boolean setValue(Object newValue) { 300 synchronized (lock) { 301 // Allowed only once. 302 if (ready) { 303 return false; 304 } 305 306 result = newValue; 307 ready = true; 308 309 // Now, if we have waiters, notofy them that the operation has completed 310 if (waiters > 0) { 311 lock.notifyAll(); 312 } 313 } 314 315 // Last, not least, inform the listeners 316 notifyListeners(); 317 318 return true; 319 } 320 321 /** 322 * Returns the result of the asynchronous operation. 323 * 324 * @return The stored value 325 */ 326 protected Object getValue() { 327 synchronized (lock) { 328 return result; 329 } 330 } 331 332 /** 333 * {@inheritDoc} 334 */ 335 public IoFuture addListener(IoFutureListener<?> listener) { 336 if (listener == null) { 337 throw new IllegalArgumentException("listener"); 338 } 339 340 synchronized (lock) { 341 if (ready) { 342 // Shortcut : if the operation has completed, no need to 343 // add a new listener, we just have to notify it. The existing 344 // listeners have already been notified anyway, when the 345 // 'ready' flag has been set. 346 notifyListener(listener); 347 } else { 348 if (firstListener == null) { 349 firstListener = listener; 350 } else { 351 if (otherListeners == null) { 352 otherListeners = new ArrayList<IoFutureListener<?>>(1); 353 } 354 355 otherListeners.add(listener); 356 } 357 } 358 } 359 360 return this; 361 } 362 363 /** 364 * {@inheritDoc} 365 */ 366 public IoFuture removeListener(IoFutureListener<?> listener) { 367 if (listener == null) { 368 throw new IllegalArgumentException("listener"); 369 } 370 371 synchronized (lock) { 372 if (!ready) { 373 if (listener == firstListener) { 374 if ((otherListeners != null) && !otherListeners.isEmpty()) { 375 firstListener = otherListeners.remove(0); 376 } else { 377 firstListener = null; 378 } 379 } else if (otherListeners != null) { 380 otherListeners.remove(listener); 381 } 382 } 383 } 384 385 return this; 386 } 387 388 /** 389 * Notify the listeners, if we have some. 390 */ 391 private void notifyListeners() { 392 // There won't be any visibility problem or concurrent modification 393 // because 'ready' flag will be checked against both addListener and 394 // removeListener calls. 395 if (firstListener != null) { 396 notifyListener(firstListener); 397 firstListener = null; 398 399 if (otherListeners != null) { 400 for (IoFutureListener<?> listener : otherListeners) { 401 notifyListener(listener); 402 } 403 404 otherListeners = null; 405 } 406 } 407 } 408 409 @SuppressWarnings("unchecked") 410 private void notifyListener(IoFutureListener listener) { 411 try { 412 listener.operationComplete(this); 413 } catch (Exception e) { 414 ExceptionMonitor.getInstance().exceptionCaught(e); 415 } 416 } 417}