001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.future; 021 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.TimeUnit; 025 026import org.apache.mina.core.polling.AbstractPollingIoProcessor; 027import org.apache.mina.core.service.IoProcessor; 028import org.apache.mina.core.session.IoSession; 029import org.apache.mina.util.ExceptionMonitor; 030 031/** 032 * A default implementation of {@link IoFuture} associated with 033 * an {@link IoSession}. 034 * 035 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 036 */ 037public class DefaultIoFuture implements IoFuture { 038 039 /** A number of milliseconds to wait between two deadlock controls ( 5 seconds ) */ 040 private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L; 041 042 /** The associated session */ 043 private final IoSession session; 044 045 /** A lock used by the wait() method */ 046 private final Object lock; 047 048 /** The first listener. This is easier to have this variable 049 * when we most of the time have one single listener */ 050 private IoFutureListener<?> firstListener; 051 052 /** All the other listeners, in case we have more than one */ 053 private List<IoFutureListener<?>> otherListeners; 054 055 private Object result; 056 057 /** The flag used to determinate if the Future is completed or not */ 058 private boolean ready; 059 060 /** A counter for the number of threads waiting on this future */ 061 private int waiters; 062 063 /** 064 * Creates a new instance associated with an {@link IoSession}. 065 * 066 * @param session an {@link IoSession} which is associated with this future 067 */ 068 public DefaultIoFuture(IoSession session) { 069 this.session = session; 070 this.lock = this; 071 } 072 073 /** 074 * {@inheritDoc} 075 */ 076 public IoSession getSession() { 077 return session; 078 } 079 080 /** 081 * @deprecated Replaced with {@link #awaitUninterruptibly()}. 082 */ 083 @Deprecated 084 public void join() { 085 awaitUninterruptibly(); 086 } 087 088 /** 089 * @deprecated Replaced with {@link #awaitUninterruptibly(long)}. 090 */ 091 @Deprecated 092 public boolean join(long timeoutMillis) { 093 return awaitUninterruptibly(timeoutMillis); 094 } 095 096 /** 097 * {@inheritDoc} 098 */ 099 public IoFuture await() throws InterruptedException { 100 synchronized (lock) { 101 while (!ready) { 102 waiters++; 103 104 try { 105 // Wait for a notify, or if no notify is called, 106 // assume that we have a deadlock and exit the 107 // loop to check for a potential deadlock. 108 lock.wait(DEAD_LOCK_CHECK_INTERVAL); 109 } finally { 110 waiters--; 111 112 if (!ready) { 113 checkDeadLock(); 114 } 115 } 116 } 117 } 118 119 return this; 120 } 121 122 /** 123 * {@inheritDoc} 124 */ 125 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 126 return await0(unit.toMillis(timeout), true); 127 } 128 129 /** 130 * {@inheritDoc} 131 */ 132 public boolean await(long timeoutMillis) throws InterruptedException { 133 return await0(timeoutMillis, true); 134 } 135 136 /** 137 * {@inheritDoc} 138 */ 139 public IoFuture awaitUninterruptibly() { 140 try { 141 await0(Long.MAX_VALUE, false); 142 } catch (InterruptedException ie) { 143 // Do nothing : this catch is just mandatory by contract 144 } 145 146 return this; 147 } 148 149 /** 150 * {@inheritDoc} 151 */ 152 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { 153 try { 154 return await0(unit.toMillis(timeout), false); 155 } catch (InterruptedException e) { 156 throw new InternalError(); 157 } 158 } 159 160 /** 161 * {@inheritDoc} 162 */ 163 public boolean awaitUninterruptibly(long timeoutMillis) { 164 try { 165 return await0(timeoutMillis, false); 166 } catch (InterruptedException e) { 167 throw new InternalError(); 168 } 169 } 170 171 /** 172 * Wait for the Future to be ready. If the requested delay is 0 or 173 * negative, this method immediately returns the value of the 174 * 'ready' flag. 175 * Every 5 second, the wait will be suspended to be able to check if 176 * there is a deadlock or not. 177 * 178 * @param timeoutMillis The delay we will wait for the Future to be ready 179 * @param interruptable Tells if the wait can be interrupted or not 180 * @return <tt>true</tt> if the Future is ready 181 * @throws InterruptedException If the thread has been interrupted 182 * when it's not allowed. 183 */ 184 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException { 185 long endTime = System.currentTimeMillis() + timeoutMillis; 186 187 if (endTime < 0) { 188 endTime = Long.MAX_VALUE; 189 } 190 191 synchronized (lock) { 192 // We can quit if the ready flag is set to true, or if 193 // the timeout is set to 0 or below : we don't wait in this case. 194 if (ready||(timeoutMillis <= 0)) { 195 return ready; 196 } 197 198 // The operation is not completed : we have to wait 199 waiters++; 200 201 try { 202 for (;;) { 203 try { 204 long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL); 205 206 // Wait for the requested period of time, 207 // but every DEAD_LOCK_CHECK_INTERVAL seconds, we will 208 // check that we aren't blocked. 209 lock.wait(timeOut); 210 } catch (InterruptedException e) { 211 if (interruptable) { 212 throw e; 213 } 214 } 215 216 if (ready || (endTime < System.currentTimeMillis())) { 217 return ready; 218 } else { 219 // Take a chance, detect a potential deadlock 220 checkDeadLock(); 221 } 222 } 223 } finally { 224 // We get here for 3 possible reasons : 225 // 1) We have been notified (the operation has completed a way or another) 226 // 2) We have reached the timeout 227 // 3) The thread has been interrupted 228 // In any case, we decrement the number of waiters, and we get out. 229 waiters--; 230 231 if (!ready) { 232 checkDeadLock(); 233 } 234 } 235 } 236 } 237 238 /** 239 * Check for a deadlock, ie look into the stack trace that we don't have already an 240 * instance of the caller. 241 */ 242 private void checkDeadLock() { 243 // Only read / write / connect / write future can cause dead lock. 244 if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) { 245 return; 246 } 247 248 // Get the current thread stackTrace. 249 // Using Thread.currentThread().getStackTrace() is the best solution, 250 // even if slightly less efficient than doing a new Exception().getStackTrace(), 251 // as internally, it does exactly the same thing. The advantage of using 252 // this solution is that we may benefit some improvement with some 253 // future versions of Java. 254 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); 255 256 // Simple and quick check. 257 for (StackTraceElement stackElement : stackTrace) { 258 if (AbstractPollingIoProcessor.class.getName().equals(stackElement.getClassName())) { 259 IllegalStateException e = new IllegalStateException("t"); 260 e.getStackTrace(); 261 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName() 262 + ".await() was invoked from an I/O processor thread. " + "Please use " 263 + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively."); 264 } 265 } 266 267 // And then more precisely. 268 for (StackTraceElement s : stackTrace) { 269 try { 270 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName()); 271 272 if (IoProcessor.class.isAssignableFrom(cls)) { 273 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName() 274 + ".await() was invoked from an I/O processor thread. " + "Please use " 275 + IoFutureListener.class.getSimpleName() 276 + " or configure a proper thread model alternatively."); 277 } 278 } catch (ClassNotFoundException cnfe) { 279 // Ignore 280 } 281 } 282 } 283 284 /** 285 * {@inheritDoc} 286 */ 287 public boolean isDone() { 288 synchronized (lock) { 289 return ready; 290 } 291 } 292 293 /** 294 * Sets the result of the asynchronous operation, and mark it as finished. 295 * 296 * @param newValue The result to store into the Future 297 * @return {@code true} if the value has been set, {@code false} if 298 * the future already has a value (thus is in ready state) 299 */ 300 public boolean setValue(Object newValue) { 301 synchronized (lock) { 302 // Allowed only once. 303 if (ready) { 304 return false; 305 } 306 307 result = newValue; 308 ready = true; 309 310 // Now, if we have waiters, notify them that the operation has completed 311 if (waiters > 0) { 312 lock.notifyAll(); 313 } 314 } 315 316 // Last, not least, inform the listeners 317 notifyListeners(); 318 319 return true; 320 } 321 322 /** 323 * @return the result of the asynchronous operation. 324 */ 325 protected Object getValue() { 326 synchronized (lock) { 327 return result; 328 } 329 } 330 331 /** 332 * {@inheritDoc} 333 */ 334 public IoFuture addListener(IoFutureListener<?> listener) { 335 if (listener == null) { 336 throw new IllegalArgumentException("listener"); 337 } 338 339 synchronized (lock) { 340 if (ready) { 341 // Shortcut : if the operation has completed, no need to 342 // add a new listener, we just have to notify it. The existing 343 // listeners have already been notified anyway, when the 344 // 'ready' flag has been set. 345 notifyListener(listener); 346 } else { 347 if (firstListener == null) { 348 firstListener = listener; 349 } else { 350 if (otherListeners == null) { 351 otherListeners = new ArrayList<IoFutureListener<?>>(1); 352 } 353 354 otherListeners.add(listener); 355 } 356 } 357 } 358 359 return this; 360 } 361 362 /** 363 * {@inheritDoc} 364 */ 365 public IoFuture removeListener(IoFutureListener<?> listener) { 366 if (listener == null) { 367 throw new IllegalArgumentException("listener"); 368 } 369 370 synchronized (lock) { 371 if (!ready) { 372 if (listener == firstListener) { 373 if ((otherListeners != null) && !otherListeners.isEmpty()) { 374 firstListener = otherListeners.remove(0); 375 } else { 376 firstListener = null; 377 } 378 } else if (otherListeners != null) { 379 otherListeners.remove(listener); 380 } 381 } 382 } 383 384 return this; 385 } 386 387 /** 388 * Notify the listeners, if we have some. 389 */ 390 private void notifyListeners() { 391 // There won't be any visibility problem or concurrent modification 392 // because 'ready' flag will be checked against both addListener and 393 // removeListener calls. 394 if (firstListener != null) { 395 notifyListener(firstListener); 396 firstListener = null; 397 398 if (otherListeners != null) { 399 for (IoFutureListener<?> listener : otherListeners) { 400 notifyListener(listener); 401 } 402 403 otherListeners = null; 404 } 405 } 406 } 407 408 @SuppressWarnings("unchecked") 409 private void notifyListener(IoFutureListener listener) { 410 try { 411 listener.operationComplete(this); 412 } catch (Exception e) { 413 ExceptionMonitor.getInstance().exceptionCaught(e); 414 } 415 } 416}