001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.service; 021 022import java.util.AbstractSet; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.Executor; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicInteger; 032 033import org.apache.mina.core.IoUtil; 034import org.apache.mina.core.filterchain.DefaultIoFilterChain; 035import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; 036import org.apache.mina.core.filterchain.IoFilterChainBuilder; 037import org.apache.mina.core.future.ConnectFuture; 038import org.apache.mina.core.future.DefaultIoFuture; 039import org.apache.mina.core.future.IoFuture; 040import org.apache.mina.core.future.WriteFuture; 041import org.apache.mina.core.session.AbstractIoSession; 042import org.apache.mina.core.session.DefaultIoSessionDataStructureFactory; 043import org.apache.mina.core.session.IdleStatus; 044import org.apache.mina.core.session.IoSession; 045import org.apache.mina.core.session.IoSessionConfig; 046import org.apache.mina.core.session.IoSessionDataStructureFactory; 047import org.apache.mina.core.session.IoSessionInitializationException; 048import org.apache.mina.core.session.IoSessionInitializer; 049import org.apache.mina.util.ExceptionMonitor; 050import org.apache.mina.util.NamePreservingRunnable; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * Base implementation of {@link IoService}s. 056 * 057 * An instance of IoService contains an Executor which will handle the incoming 058 * events. 059 * 060 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 061 */ 062public abstract class AbstractIoService implements IoService { 063 064 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIoService.class); 065 066 /** 067 * The unique number identifying the Service. It's incremented 068 * for each new IoService created. 069 */ 070 private static final AtomicInteger id = new AtomicInteger(); 071 072 /** 073 * The thread name built from the IoService inherited 074 * instance class name and the IoService Id 075 **/ 076 private final String threadName; 077 078 /** 079 * The associated executor, responsible for handling execution of I/O events. 080 */ 081 private final Executor executor; 082 083 /** 084 * A flag used to indicate that the local executor has been created 085 * inside this instance, and not passed by a caller. 086 * 087 * If the executor is locally created, then it will be an instance 088 * of the ThreadPoolExecutor class. 089 */ 090 private final boolean createdExecutor; 091 092 /** 093 * The IoHandler in charge of managing all the I/O Events. It is 094 */ 095 private IoHandler handler; 096 097 /** 098 * The default {@link IoSessionConfig} which will be used to configure new sessions. 099 */ 100 protected final IoSessionConfig sessionConfig; 101 102 private final IoServiceListener serviceActivationListener = new IoServiceListener() { 103 public void serviceActivated(IoService service) { 104 // Update lastIoTime. 105 AbstractIoService s = (AbstractIoService) service; 106 IoServiceStatistics _stats = s.getStatistics(); 107 _stats.setLastReadTime(s.getActivationTime()); 108 _stats.setLastWriteTime(s.getActivationTime()); 109 _stats.setLastThroughputCalculationTime(s.getActivationTime()); 110 111 } 112 113 public void serviceDeactivated(IoService service) throws Exception { 114 // Empty handler 115 } 116 117 public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception { 118 // Empty handler 119 } 120 121 public void sessionCreated(IoSession session) throws Exception { 122 // Empty handler 123 } 124 125 public void sessionClosed(IoSession session) throws Exception { 126 // Empty handler 127 } 128 129 public void sessionDestroyed(IoSession session) throws Exception { 130 // Empty handler 131 } 132 }; 133 134 /** 135 * Current filter chain builder. 136 */ 137 private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder(); 138 139 private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory(); 140 141 /** 142 * Maintains the {@link IoServiceListener}s of this service. 143 */ 144 private final IoServiceListenerSupport listeners; 145 146 /** 147 * A lock object which must be acquired when related resources are 148 * destroyed. 149 */ 150 protected final Object disposalLock = new Object(); 151 152 private volatile boolean disposing; 153 154 private volatile boolean disposed; 155 156 /** 157 * {@inheritDoc} 158 */ 159 private IoServiceStatistics stats = new IoServiceStatistics(this); 160 161 /** 162 * Constructor for {@link AbstractIoService}. You need to provide a default 163 * session configuration and an {@link Executor} for handling I/O events. If 164 * a null {@link Executor} is provided, a default one will be created using 165 * {@link Executors#newCachedThreadPool()}. 166 * 167 * @param sessionConfig 168 * the default configuration for the managed {@link IoSession} 169 * @param executor 170 * the {@link Executor} used for handling execution of I/O 171 * events. Can be <code>null</code>. 172 */ 173 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { 174 if (sessionConfig == null) { 175 throw new IllegalArgumentException("sessionConfig"); 176 } 177 178 if (getTransportMetadata() == null) { 179 throw new IllegalArgumentException("TransportMetadata"); 180 } 181 182 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { 183 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " 184 + getTransportMetadata().getSessionConfigType() + ")"); 185 } 186 187 // Create the listeners, and add a first listener : a activation listener 188 // for this service, which will give information on the service state. 189 listeners = new IoServiceListenerSupport(this); 190 listeners.add(serviceActivationListener); 191 192 // Stores the given session configuration 193 this.sessionConfig = sessionConfig; 194 195 // Make JVM load the exception monitor before some transports 196 // change the thread context class loader. 197 ExceptionMonitor.getInstance(); 198 199 if (executor == null) { 200 this.executor = Executors.newCachedThreadPool(); 201 createdExecutor = true; 202 } else { 203 this.executor = executor; 204 createdExecutor = false; 205 } 206 207 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); 208 } 209 210 /** 211 * {@inheritDoc} 212 */ 213 public final IoFilterChainBuilder getFilterChainBuilder() { 214 return filterChainBuilder; 215 } 216 217 /** 218 * {@inheritDoc} 219 */ 220 public final void setFilterChainBuilder(IoFilterChainBuilder builder) { 221 if (builder == null) { 222 builder = new DefaultIoFilterChainBuilder(); 223 } 224 filterChainBuilder = builder; 225 } 226 227 /** 228 * {@inheritDoc} 229 */ 230 public final DefaultIoFilterChainBuilder getFilterChain() { 231 if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) { 232 return (DefaultIoFilterChainBuilder) filterChainBuilder; 233 } 234 235 throw new IllegalStateException("Current filter chain builder is not a DefaultIoFilterChainBuilder."); 236 } 237 238 /** 239 * {@inheritDoc} 240 */ 241 public final void addListener(IoServiceListener listener) { 242 listeners.add(listener); 243 } 244 245 /** 246 * {@inheritDoc} 247 */ 248 public final void removeListener(IoServiceListener listener) { 249 listeners.remove(listener); 250 } 251 252 /** 253 * {@inheritDoc} 254 */ 255 public final boolean isActive() { 256 return listeners.isActive(); 257 } 258 259 /** 260 * {@inheritDoc} 261 */ 262 public final boolean isDisposing() { 263 return disposing; 264 } 265 266 /** 267 * {@inheritDoc} 268 */ 269 public final boolean isDisposed() { 270 return disposed; 271 } 272 273 /** 274 * {@inheritDoc} 275 */ 276 public final void dispose() { 277 dispose(false); 278 } 279 280 /** 281 * {@inheritDoc} 282 */ 283 public final void dispose(boolean awaitTermination) { 284 if (disposed) { 285 return; 286 } 287 288 synchronized (disposalLock) { 289 if (!disposing) { 290 disposing = true; 291 292 try { 293 dispose0(); 294 } catch (Exception e) { 295 ExceptionMonitor.getInstance().exceptionCaught(e); 296 } 297 } 298 } 299 300 if (createdExecutor) { 301 ExecutorService e = (ExecutorService) executor; 302 e.shutdownNow(); 303 if (awaitTermination) { 304 305 try { 306 LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName()); 307 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 308 LOGGER.debug("awaitTermination on {} finished", this); 309 } catch (InterruptedException e1) { 310 LOGGER.warn("awaitTermination on [{}] was interrupted", this); 311 // Restore the interrupted status 312 Thread.currentThread().interrupt(); 313 } 314 } 315 } 316 disposed = true; 317 } 318 319 /** 320 * Implement this method to release any acquired resources. This method 321 * is invoked only once by {@link #dispose()}. 322 */ 323 protected abstract void dispose0() throws Exception; 324 325 /** 326 * {@inheritDoc} 327 */ 328 public final Map<Long, IoSession> getManagedSessions() { 329 return listeners.getManagedSessions(); 330 } 331 332 /** 333 * {@inheritDoc} 334 */ 335 public final int getManagedSessionCount() { 336 return listeners.getManagedSessionCount(); 337 } 338 339 /** 340 * {@inheritDoc} 341 */ 342 public final IoHandler getHandler() { 343 return handler; 344 } 345 346 /** 347 * {@inheritDoc} 348 */ 349 public final void setHandler(IoHandler handler) { 350 if (handler == null) { 351 throw new IllegalArgumentException("handler cannot be null"); 352 } 353 354 if (isActive()) { 355 throw new IllegalStateException("handler cannot be set while the service is active."); 356 } 357 358 this.handler = handler; 359 } 360 361 /** 362 * {@inheritDoc} 363 */ 364 public final IoSessionDataStructureFactory getSessionDataStructureFactory() { 365 return sessionDataStructureFactory; 366 } 367 368 /** 369 * {@inheritDoc} 370 */ 371 public final void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory) { 372 if (sessionDataStructureFactory == null) { 373 throw new IllegalArgumentException("sessionDataStructureFactory"); 374 } 375 376 if (isActive()) { 377 throw new IllegalStateException("sessionDataStructureFactory cannot be set while the service is active."); 378 } 379 380 this.sessionDataStructureFactory = sessionDataStructureFactory; 381 } 382 383 /** 384 * {@inheritDoc} 385 */ 386 public IoServiceStatistics getStatistics() { 387 return stats; 388 } 389 390 /** 391 * {@inheritDoc} 392 */ 393 public final long getActivationTime() { 394 return listeners.getActivationTime(); 395 } 396 397 /** 398 * {@inheritDoc} 399 */ 400 public final Set<WriteFuture> broadcast(Object message) { 401 // Convert to Set. We do not return a List here because only the 402 // direct caller of MessageBroadcaster knows the order of write 403 // operations. 404 final List<WriteFuture> futures = IoUtil.broadcast(message, getManagedSessions().values()); 405 return new AbstractSet<WriteFuture>() { 406 @Override 407 public Iterator<WriteFuture> iterator() { 408 return futures.iterator(); 409 } 410 411 @Override 412 public int size() { 413 return futures.size(); 414 } 415 }; 416 } 417 418 public final IoServiceListenerSupport getListeners() { 419 return listeners; 420 } 421 422 protected final void executeWorker(Runnable worker) { 423 executeWorker(worker, null); 424 } 425 426 protected final void executeWorker(Runnable worker, String suffix) { 427 String actualThreadName = threadName; 428 if (suffix != null) { 429 actualThreadName = actualThreadName + '-' + suffix; 430 } 431 executor.execute(new NamePreservingRunnable(worker, actualThreadName)); 432 } 433 434 // TODO Figure out make it work without causing a compiler error / warning. 435 @SuppressWarnings("unchecked") 436 protected final void initSession(IoSession session, IoFuture future, IoSessionInitializer sessionInitializer) { 437 // Update lastIoTime if needed. 438 if (stats.getLastReadTime() == 0) { 439 stats.setLastReadTime(getActivationTime()); 440 } 441 442 if (stats.getLastWriteTime() == 0) { 443 stats.setLastWriteTime(getActivationTime()); 444 } 445 446 // Every property but attributeMap should be set now. 447 // Now initialize the attributeMap. The reason why we initialize 448 // the attributeMap at last is to make sure all session properties 449 // such as remoteAddress are provided to IoSessionDataStructureFactory. 450 try { 451 ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory() 452 .getAttributeMap(session)); 453 } catch (IoSessionInitializationException e) { 454 throw e; 455 } catch (Exception e) { 456 throw new IoSessionInitializationException("Failed to initialize an attributeMap.", e); 457 } 458 459 try { 460 ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory() 461 .getWriteRequestQueue(session)); 462 } catch (IoSessionInitializationException e) { 463 throw e; 464 } catch (Exception e) { 465 throw new IoSessionInitializationException("Failed to initialize a writeRequestQueue.", e); 466 } 467 468 if ((future != null) && (future instanceof ConnectFuture)) { 469 // DefaultIoFilterChain will notify the future. (We support ConnectFuture only for now). 470 session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future); 471 } 472 473 if (sessionInitializer != null) { 474 sessionInitializer.initializeSession(session, future); 475 } 476 477 finishSessionInitialization0(session, future); 478 } 479 480 /** 481 * Implement this method to perform additional tasks required for session 482 * initialization. Do not call this method directly; 483 * {@link #initSession(IoSession, IoFuture, IoSessionInitializer)} will call 484 * this method instead. 485 */ 486 protected void finishSessionInitialization0(IoSession session, IoFuture future) { 487 // Do nothing. Extended class might add some specific code 488 } 489 490 /** 491 * A specific class used to 492 * @author elecharny 493 * 494 */ 495 protected static class ServiceOperationFuture extends DefaultIoFuture { 496 public ServiceOperationFuture() { 497 super(null); 498 } 499 500 public final boolean isDone() { 501 return getValue() == Boolean.TRUE; 502 } 503 504 public final void setDone() { 505 setValue(Boolean.TRUE); 506 } 507 508 public final Exception getException() { 509 if (getValue() instanceof Exception) { 510 return (Exception) getValue(); 511 } 512 513 return null; 514 } 515 516 public final void setException(Exception exception) { 517 if (exception == null) { 518 throw new IllegalArgumentException("exception"); 519 } 520 521 setValue(exception); 522 } 523 } 524 525 /** 526 * {@inheritDoc} 527 */ 528 public int getScheduledWriteBytes() { 529 return stats.getScheduledWriteBytes(); 530 } 531 532 /** 533 * {@inheritDoc} 534 */ 535 public int getScheduledWriteMessages() { 536 return stats.getScheduledWriteMessages(); 537 } 538 539}