001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.service; 021 022import java.util.AbstractSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.Executor; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import org.apache.mina.core.IoUtil; 034import org.apache.mina.core.filterchain.DefaultIoFilterChain; 035import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; 036import org.apache.mina.core.filterchain.IoFilterChainBuilder; 037import org.apache.mina.core.future.ConnectFuture; 038import org.apache.mina.core.future.DefaultIoFuture; 039import org.apache.mina.core.future.IoFuture; 040import org.apache.mina.core.future.WriteFuture; 041import org.apache.mina.core.session.AbstractIoSession; 042import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory; 043import org.apache.mina.core.session.IdleStatus; 044import org.apache.mina.core.session.IoSession; 045import org.apache.mina.core.session.IoSessionConfig; 046import org.apache.mina.core.session.IoSessionDataStructureFactory; 047import org.apache.mina.core.session.IoSessionInitializationException; 048import org.apache.mina.core.session.IoSessionInitializer; 049import org.apache.mina.util.ExceptionMonitor; 050import org.apache.mina.util.NamePreservingRunnable; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Base implementation of {@link IoService}s. 056 * 057 * An instance of IoService contains an Executor which will handle the incoming 058 * events. 059 * 060 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 061 */ 062public abstract class AbstractIoService implements IoService { 063 064 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class); 065 066 /** 067 * The unique number identifying the Service. It's incremented 068 * for each new IoService created. 069 */ 070 private static final AtomicInteger id = new AtomicInteger(); 071 072 /** 073 * The thread name built from the IoService inherited 074 * instance class name and the IoService Id 075 **/ 076 private final String threadName; 077 078 /** 079 * The associated executor, responsible for handling execution of I/O events. 080 */ 081 private final Executor executor; 082 083 /** 084 * A flag used to indicate that the local executor has been created 085 * inside this instance, and not passed by a caller. 086 * 087 * If the executor is locally created, then it will be an instance 088 * of the ThreadPoolExecutor class. 089 */ 090 private final boolean createdExecutor; 091 092 /** 093 * The IoHandler in charge of managing all the I/O Events. It is 094 */ 095 private IoHandler handler; 096 097 /** 098 * The default {@link IoSessionConfig} which will be used to configure new sessions. 099 */ 100 protected final IoSessionConfig sessionConfig; 101 102 private final IoServiceListener serviceActivationListener = new IoServiceListener() { 103 public void serviceActivated(IoService service) { 104 // Update lastIoTime. 105 AbstractIoService s = (AbstractIoService) service; 106 IoServiceStatistics _stats = s.getStatistics(); 107 _stats.setLastReadTime(s.getActivationTime()); 108 _stats.setLastWriteTime(s.getActivationTime()); 109 _stats.setLastThroughputCalculationTime(s.getActivationTime()); 110 111 } 112 113 public void serviceDeactivated(IoService service) throws Exception { 114 // Empty handler 115 } 116 117 public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception { 118 // Empty handler 119 } 120 121 public void sessionCreated(IoSession session) throws Exception { 122 // Empty handler 123 } 124 125 public void sessionClosed(IoSession session) throws Exception { 126 // Empty handler 127 } 128 129 public void sessionDestroyed(IoSession session) throws Exception { 130 // Empty handler 131 } 132 }; 133 134 /** 135 * Current filter chain builder. 136 */ 137 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder(); 138 139 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory(); 140 141 /** 142 * Maintains the {@link IoServiceListener}s of this service. 143 */ 144 private final IoServiceListenerSupport listeners; 145 146 /** 147 * A lock object which must be acquired when related resources are 148 * destroyed. 149 */ 150 protected final Object disposalLock = new Object(); 151 152 private volatile boolean disposing; 153 154 private volatile boolean disposed; 155 156 /** 157 * {@inheritDoc} 158 */ 159 private IoServiceStatistics stats = new IoServiceStatistics(this); 160 161 /** 162 * Constructor for {@link AbstractIoService}. You need to provide a default 163 * session configuration and an {@link Executor} for handling I/O events. If 164 * a null {@link Executor} is provided, a default one will be created using 165 * {@link Executors#newCachedThreadPool()}. 166 * 167 * @param sessionConfig 168 * the default configuration for the managed {@link IoSession} 169 * @param executor 170 * the {@link Executor} used for handling execution of I/O 171 * events. Can be <code>null</code>. 172 */ 173 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { 174 if (sessionConfig == null) { 175 throw new IllegalArgumentException("sessionConfig"); 176 } 177 178 if (getTransportMetadata() == null) { 179 throw new IllegalArgumentException("TransportMetadata"); 180 } 181 182 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { 183 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " 184 + getTransportMetadata().getSessionConfigType() + ")"); 185 } 186 187 // Create the listeners, and add a first listener : a activation listener 188 // for this service, which will give information on the service state. 189 listeners = new IoServiceListenerSupport(this); 190 listeners.add(serviceActivationListener); 191 192 // Stores the given session configuration 193 this.sessionConfig = sessionConfig; 194 195 // Make JVM load the exception monitor before some transports 196 // change the thread context class loader. 197 ExceptionMonitor.getInstance(); 198 199 if (executor == null) { 200 this.executor = Executors.newCachedThreadPool(); 201 createdExecutor = true; 202 } else { 203 this.executor = executor; 204 createdExecutor = false; 205 } 206 207 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); 208 } 209 210 /** 211 * {@inheritDoc} 212 */ 213 public final IoFilterChainBuilder getFilterChainBuilder() { 214 return filterChainBuilder; 215 } 216 217 /** 218 * {@inheritDoc} 219 */ 220 public final void setFilterChainBuilder(IoFilterChainBuilder builder) { 221 if (builder == null) { 222 builder = new DefaultIoFilterChainBuilder(); 223 } 224 filterChainBuilder = builder; 225 } 226 227 /** 228 * {@inheritDoc} 229 */ 230 public final DefaultIoFilterChainBuilder getFilterChain() { 231 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) { 232 return (DefaultIoFilterChainBuilder) filterChainBuilder; 233 } 234 235 throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder."); 236 } 237 238 /** 239 * {@inheritDoc} 240 */ 241 public final void addListener(IoServiceListener listener) { 242 listeners.add(listener); 243 } 244 245 /** 246 * {@inheritDoc} 247 */ 248 public final void removeListener(IoServiceListener listener) { 249 listeners.remove(listener); 250 } 251 252 /** 253 * {@inheritDoc} 254 */ 255 public final boolean isActive() { 256 return listeners.isActive(); 257 } 258 259 /** 260 * {@inheritDoc} 261 */ 262 public final boolean isDisposing() { 263 return disposing; 264 } 265 266 /** 267 * {@inheritDoc} 268 */ 269 public final boolean isDisposed() { 270 return disposed; 271 } 272 273 /** 274 * {@inheritDoc} 275 */ 276 public final void dispose() { 277 dispose(false); 278 } 279 280 /** 281 * {@inheritDoc} 282 */ 283 public final void dispose(boolean awaitTermination) { 284 if (disposed) { 285 return; 286 } 287 288 synchronized (disposalLock) { 289 if (!disposing) { 290 disposing = true; 291 292 try { 293 dispose0(); 294 } catch (Exception e) { 295 ExceptionMonitor.getInstance().exceptionCaught(e); 296 } 297 } 298 } 299 300 if (createdExecutor) { 301 ExecutorService e = (ExecutorService) executor; 302 e.shutdownNow(); 303 if (awaitTermination) { 304 305 try { 306 LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName()); 307 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 308 LOGGER.debug("awaitTermination on {} finished", this); 309 } catch (InterruptedException e1) { 310 LOGGER.warn("awaitTermination on [{}] was interrupted", this); 311 // Restore the interrupted status 312 Thread.currentThread().interrupt(); 313 } 314 } 315 } 316 disposed = true; 317 } 318 319 /** 320 * Implement this method to release any acquired resources. This method 321 * is invoked only once by {@link #dispose()}. 322 * 323 * @throws Exception If the dispose failed 324 */ 325 protected abstract void dispose0() throws Exception; 326 327 /** 328 * {@inheritDoc} 329 */ 330 public final Map<Long, IoSession> getManagedSessions() { 331 return listeners.getManagedSessions(); 332 } 333 334 /** 335 * {@inheritDoc} 336 */ 337 public final int getManagedSessionCount() { 338 return listeners.getManagedSessionCount(); 339 } 340 341 /** 342 * {@inheritDoc} 343 */ 344 public final IoHandler getHandler() { 345 return handler; 346 } 347 348 /** 349 * {@inheritDoc} 350 */ 351 public final void setHandler(IoHandler handler) { 352 if (handler == null) { 353 throw new IllegalArgumentException("handler cannot be null"); 354 } 355 356 if (isActive()) { 357 throw new IllegalStateException("handler cannot be set while the service is active."); 358 } 359 360 this.handler = handler; 361 } 362 363 /** 364 * {@inheritDoc} 365 */ 366 public final IoSessionDataStructureFactory getSessionDataStructureFactory() { 367 return sessionDataStructureFactory; 368 } 369 370 /** 371 * {@inheritDoc} 372 */ 373 public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) { 374 if (sessionDataStructureFactory == null) { 375 throw new IllegalArgumentException("sessionDataStructureFactory"); 376 } 377 378 if (isActive()) { 379 throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active."); 380 } 381 382 this.sessionDataStructureFactory = sessionDataStructureFactory; 383 } 384 385 /** 386 * {@inheritDoc} 387 */ 388 public IoServiceStatistics getStatistics() { 389 return stats; 390 } 391 392 /** 393 * {@inheritDoc} 394 */ 395 public final long getActivationTime() { 396 return listeners.getActivationTime(); 397 } 398 399 /** 400 * {@inheritDoc} 401 */ 402 public final Set<WriteFuture> broadcast(Object message) { 403 // Convert to Set. We do not return a List here because only the 404 // direct caller of MessageBroadcaster knows the order of write 405 // operations. 406 final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values()); 407 return new AbstractSet<WriteFuture>() { 408 @Override 409 public Iterator<WriteFuture> iterator() { 410 return futures.iterator(); 411 } 412 413 @Override 414 public int size() { 415 return futures.size(); 416 } 417 }; 418 } 419 420 public final IoServiceListenerSupport getListeners() { 421 return listeners; 422 } 423 424 protected final void executeWorker(Runnable worker) { 425 executeWorker(worker, null); 426 } 427 428 protected final void executeWorker(Runnable worker, String suffix) { 429 String actualThreadName = threadName; 430 if (suffix != null) { 431 actualThreadName = actualThreadName + '-' + suffix; 432 } 433 executor.execute(new NamePreservingRunnable(worker, actualThreadName)); 434 } 435 436 // TODO Figure out make it work without causing a compiler error / warning. 437 @SuppressWarnings("unchecked") 438 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) { 439 // Update lastIoTime if needed. 440 if (stats.getLastReadTime() == 0) { 441 stats.setLastReadTime(getActivationTime()); 442 } 443 444 if (stats.getLastWriteTime() == 0) { 445 stats.setLastWriteTime(getActivationTime()); 446 } 447 448 // Every property but attributeMap should be set now. 449 // Now initialize the attributeMap. The reason why we initialize 450 // the attributeMap at last is to make sure all session properties 451 // such as remoteAddress are provided to IoSessionDataStructureFactory. 452 try { 453 ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory() 454 .getAttributeMap(session)); 455 } catch (IoSessionInitializationException e) { 456 throw e; 457 } catch (Exception e) { 458 throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e); 459 } 460 461 try { 462 ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory() 463 .getWriteRequestQueue(session)); 464 } catch (IoSessionInitializationException e) { 465 throw e; 466 } catch (Exception e) { 467 throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e); 468 } 469 470 if ((future != null) && (future instanceof ConnectFuture)) { 471 // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now). 472 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future); 473 } 474 475 if (sessionInitializer != null) { 476 sessionInitializer.initializeSession(session, future); 477 } 478 479 finishSessionInitialization0(session, future); 480 } 481 482 /** 483 * Implement this method to perform additional tasks required for session 484 * initialization. Do not call this method directly; 485 * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call 486 * this method instead. 487 * 488 * @param session The session to initialize 489 * @param future The Future to use 490 * 491 */ 492 protected void finishSessionInitialization0(IoSession session, IoFuture future) { 493 // Do nothing. Extended class might add some specific code 494 } 495 496 /** 497 * A specific class used to 498 * @author elecharny 499 * 500 */ 501 protected static class ServiceOperationFuture extends DefaultIoFuture { 502 public ServiceOperationFuture() { 503 super(null); 504 } 505 506 public final boolean isDone() { 507 return getValue() == Boolean.TRUE; 508 } 509 510 public final void setDone() { 511 setValue(Boolean.TRUE); 512 } 513 514 public final Exception getException() { 515 if (getValue() instanceof Exception) { 516 return (Exception) getValue(); 517 } 518 519 return null; 520 } 521 522 public final void setException(Exception exception) { 523 if (exception == null) { 524 throw new IllegalArgumentException("exception"); 525 } 526 527 setValue(exception); 528 } 529 } 530 531 /** 532 * {@inheritDoc} 533 */ 534 public int getScheduledWriteBytes() { 535 return stats.getScheduledWriteBytes(); 536 } 537 538 /** 539 * {@inheritDoc} 540 */ 541 public int getScheduledWriteMessages() { 542 return stats.getScheduledWriteMessages(); 543 } 544 545}