1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 * 19 */ 20 package org.apache.mina.core.polling; 21 22 import java.net.SocketAddress; 23 import java.util.Collections; 24 import java.util.HashMap; 25 import java.util.HashSet; 26 import java.util.Iterator; 27 import java.util.List; 28 import java.util.Map; 29 import java.util.Queue; 30 import java.util.Set; 31 import java.util.concurrent.ConcurrentLinkedQueue; 32 import java.util.concurrent.Executor; 33 import java.util.concurrent.Executors; 34 35 import org.apache.mina.core.RuntimeIoException; 36 import org.apache.mina.core.filterchain.IoFilter; 37 import org.apache.mina.core.future.IoFuture; 38 import org.apache.mina.core.service.AbstractIoAcceptor; 39 import org.apache.mina.core.service.IoAcceptor; 40 import org.apache.mina.core.service.IoHandler; 41 import org.apache.mina.core.service.IoProcessor; 42 import org.apache.mina.core.service.SimpleIoProcessorPool; 43 import org.apache.mina.core.session.AbstractIoSession; 44 import org.apache.mina.core.session.IoSession; 45 import org.apache.mina.core.session.IoSessionConfig; 46 import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 47 import org.apache.mina.util.ExceptionMonitor; 48 49 /** 50 * A base class for implementing transport using a polling strategy. The 51 * underlying sockets will be checked in an active loop and woke up when an 52 * socket needed to be processed. This class handle the logic behind binding, 53 * accepting and disposing the server sockets. An {@link Executor} will be used 54 * for running client accepting and an {@link AbstractPollingIoProcessor} will 55 * be used for processing client I/O operations like reading, writing and 56 * closing. 57 * 58 * All the low level methods for binding, accepting, closing need to be provided 59 * by the subclassing implementation. 60 * 61 * @see NioSocketAcceptor for a example of implementation 62 * 63 * @author The Apache MINA Project (dev@mina.apache.org) 64 */ 65 public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H> 66 extends AbstractIoAcceptor { 67 68 private final IoProcessor<T> processor; 69 70 private final boolean createdProcessor; 71 72 private final Object lock = new Object(); 73 74 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); 75 76 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); 77 78 private final Map<SocketAddress, H> boundHandles = Collections 79 .synchronizedMap(new HashMap<SocketAddress, H>()); 80 81 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); 82 83 /** A flag set when the acceptor has been created and initialized */ 84 private volatile boolean selectable; 85 86 /** The thread responsible of accepting incoming requests */ 87 private Acceptor acceptor; 88 89 /** 90 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 91 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 92 * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default 93 * pool size will be used. 94 * 95 * @see SimpleIoProcessorPool 96 * 97 * @param sessionConfig 98 * the default configuration for the managed {@link IoSession} 99 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} 100 * type. 101 */ 102 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, 103 Class<? extends IoProcessor<T>> processorClass) { 104 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), 105 true); 106 } 107 108 /** 109 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 110 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 111 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor 112 * systems. 113 * 114 * @see SimpleIoProcessorPool 115 * 116 * @param sessionConfig 117 * the default configuration for the managed {@link IoSession} 118 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} 119 * type. 120 * @param processorCount the amount of processor to instantiate for the pool 121 */ 122 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, 123 Class<? extends IoProcessor<T>> processorClass, int processorCount) { 124 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, 125 processorCount), true); 126 } 127 128 /** 129 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 130 * session configuration, a default {@link Executor} will be created using 131 * {@link Executors#newCachedThreadPool()}. 132 * 133 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)} 134 * 135 * @param sessionConfig 136 * the default configuration for the managed {@link IoSession} 137 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 138 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 139 */ 140 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, 141 IoProcessor<T> processor) { 142 this(sessionConfig, null, processor, false); 143 } 144 145 /** 146 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 147 * session configuration and an {@link Executor} for handling I/O events. If a 148 * null {@link Executor} is provided, a default one will be created using 149 * {@link Executors#newCachedThreadPool()}. 150 * 151 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)} 152 * 153 * @param sessionConfig 154 * the default configuration for the managed {@link IoSession} 155 * @param executor 156 * the {@link Executor} used for handling asynchronous execution of I/O 157 * events. Can be <code>null</code>. 158 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 159 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 160 */ 161 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, 162 Executor executor, IoProcessor<T> processor) { 163 this(sessionConfig, executor, processor, false); 164 } 165 166 /** 167 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 168 * session configuration and an {@link Executor} for handling I/O events. If a 169 * null {@link Executor} is provided, a default one will be created using 170 * {@link Executors#newCachedThreadPool()}. 171 * 172 * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)} 173 * 174 * @param sessionConfig 175 * the default configuration for the managed {@link IoSession} 176 * @param executor 177 * the {@link Executor} used for handling asynchronous execution of I/O 178 * events. Can be <code>null</code>. 179 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of 180 * this transport, triggering events to the bound {@link IoHandler} and processing 181 * the chains of {@link IoFilter} 182 * @param createdProcessor tagging the processor as automatically created, so it 183 * will be automatically disposed 184 */ 185 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, 186 Executor executor, IoProcessor<T> processor, 187 boolean createdProcessor) { 188 super(sessionConfig, executor); 189 190 if (processor == null) { 191 throw new NullPointerException("processor"); 192 } 193 194 this.processor = processor; 195 this.createdProcessor = createdProcessor; 196 197 try { 198 // Initialize the selector 199 init(); 200 201 // The selector is now ready, we can switch the 202 // flag to true so that incoming connection can be accepted 203 selectable = true; 204 } catch (RuntimeException e) { 205 throw e; 206 } catch (Exception e) { 207 throw new RuntimeIoException("Failed to initialize.", e); 208 } finally { 209 if (!selectable) { 210 try { 211 destroy(); 212 } catch (Exception e) { 213 ExceptionMonitor.getInstance().exceptionCaught(e); 214 } 215 } 216 } 217 } 218 219 /** 220 * Initialize the polling system, will be called at construction time. 221 * @throws Exception any exception thrown by the underlying system calls 222 */ 223 protected abstract void init() throws Exception; 224 225 /** 226 * Destroy the polling system, will be called when this {@link IoAcceptor} 227 * implementation will be disposed. 228 * @throws Exception any exception thrown by the underlying systems calls 229 */ 230 protected abstract void destroy() throws Exception; 231 232 /** 233 * Check for acceptable connections, interrupt when at least a server is ready for accepting. 234 * All the ready server socket descriptors need to be returned by {@link #selectedHandles()} 235 * @return The number of sockets having got incoming client 236 * @throws Exception any exception thrown by the underlying systems calls 237 */ 238 protected abstract int select() throws Exception; 239 240 /** 241 * Interrupt the {@link #select()} method. Used when the poll set need to be modified. 242 */ 243 protected abstract void wakeup(); 244 245 /** 246 * {@link Iterator} for the set of server sockets found with acceptable incoming connections 247 * during the last {@link #select()} call. 248 * @return the list of server handles ready 249 */ 250 protected abstract Iterator<H> selectedHandles(); 251 252 /** 253 * Open a server socket for a given local address. 254 * @param localAddress the associated local address 255 * @return the opened server socket 256 * @throws Exception any exception thrown by the underlying systems calls 257 */ 258 protected abstract H open(SocketAddress localAddress) throws Exception; 259 260 /** 261 * Get the local address associated with a given server socket 262 * @param handle the server socket 263 * @return the local {@link SocketAddress} associated with this handle 264 * @throws Exception any exception thrown by the underlying systems calls 265 */ 266 protected abstract SocketAddress localAddress(H handle) throws Exception; 267 268 /** 269 * Accept a client connection for a server socket and return a new {@link IoSession} 270 * associated with the given {@link IoProcessor} 271 * @param processor the {@link IoProcessor} to associate with the {@link IoSession} 272 * @param handle the server handle 273 * @return the created {@link IoSession} 274 * @throws Exception any exception thrown by the underlying systems calls 275 */ 276 protected abstract T accept(IoProcessor<T> processor, H handle) 277 throws Exception; 278 279 /** 280 * Close a server socket. 281 * @param handle the server socket 282 * @throws Exception any exception thrown by the underlying systems calls 283 */ 284 protected abstract void close(H handle) throws Exception; 285 286 /** 287 * {@inheritDoc} 288 */ 289 @Override 290 protected IoFuture dispose0() throws Exception { 291 unbind(); 292 if (!disposalFuture.isDone()) { 293 startupAcceptor(); 294 wakeup(); 295 } 296 return disposalFuture; 297 } 298 299 /** 300 * {@inheritDoc} 301 */ 302 @Override 303 protected final Set<SocketAddress> bindInternal( 304 List<? extends SocketAddress> localAddresses) throws Exception { 305 // Create a bind request as a Future operation. When the selector 306 // have handled the registration, it will signal this future. 307 AcceptorOperationFuture request = new AcceptorOperationFuture( 308 localAddresses); 309 310 // adds the Registration request to the queue for the Workers 311 // to handle 312 registerQueue.add(request); 313 314 // creates the Acceptor instance and has the local 315 // executor kick it off. 316 startupAcceptor(); 317 318 // As we just started the acceptor, we have to unblock the select() 319 // in order to process the bind request we just have added to the 320 // registerQueue. 321 wakeup(); 322 323 // Now, we wait until this request is completed. 324 request.awaitUninterruptibly(); 325 326 if (request.getException() != null) { 327 throw request.getException(); 328 } 329 330 // Update the local addresses. 331 // setLocalAddresses() shouldn't be called from the worker thread 332 // because of deadlock. 333 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>(); 334 335 for (H handle:boundHandles.values()) { 336 newLocalAddresses.add(localAddress(handle)); 337 } 338 339 return newLocalAddresses; 340 } 341 342 /** 343 * This method is called by the doBind() and doUnbind() 344 * methods. If the acceptor is null, the acceptor object will 345 * be created and kicked off by the executor. If the acceptor 346 * object is null, probably already created and this class 347 * is now working, then nothing will happen and the method 348 * will just return. 349 */ 350 private void startupAcceptor() { 351 // If the acceptor is not ready, clear the queues 352 // TODO : they should already be clean : do we have to do that ? 353 if (!selectable) { 354 registerQueue.clear(); 355 cancelQueue.clear(); 356 } 357 358 // start the acceptor if not already started 359 synchronized (lock) { 360 if (acceptor == null) { 361 acceptor = new Acceptor(); 362 executeWorker(acceptor); 363 } 364 } 365 } 366 367 /** 368 * {@inheritDoc} 369 */ 370 @Override 371 protected final void unbind0(List<? extends SocketAddress> localAddresses) 372 throws Exception { 373 AcceptorOperationFuture future = new AcceptorOperationFuture( 374 localAddresses); 375 376 cancelQueue.add(future); 377 startupAcceptor(); 378 wakeup(); 379 380 future.awaitUninterruptibly(); 381 if (future.getException() != null) { 382 throw future.getException(); 383 } 384 } 385 386 /** 387 * This class is called by the startupAcceptor() method and is 388 * placed into a NamePreservingRunnable class. 389 * It's a thread accepting incoming connections from clients. 390 * The loop is stopped when all the bound handlers are unbound. 391 */ 392 private class Acceptor implements Runnable { 393 public void run() { 394 int nHandles = 0; 395 396 while (selectable) { 397 try { 398 // Detect if we have some keys ready to be processed 399 // The select() will be woke up if some new connection 400 // have occurred, or if the selector has been explicitly 401 // woke up 402 int selected = select(); 403 404 // this actually sets the selector to OP_ACCEPT, 405 // and binds to the port on which this class will 406 // listen on 407 nHandles += registerHandles(); 408 409 if (selected > 0) { 410 // We have some connection request, let's process 411 // them here. 412 processHandles(selectedHandles()); 413 } 414 415 // check to see if any cancellation request has been made. 416 nHandles -= unregisterHandles(); 417 418 // Now, if the number of registred handles is 0, we can 419 // quit the loop: we don't have any socket listening 420 // for incoming connection. 421 if (nHandles == 0) { 422 synchronized (lock) { 423 if (registerQueue.isEmpty() 424 && cancelQueue.isEmpty()) { 425 acceptor = null; 426 break; 427 } 428 } 429 } 430 } catch (Throwable e) { 431 ExceptionMonitor.getInstance().exceptionCaught(e); 432 433 try { 434 Thread.sleep(1000); 435 } catch (InterruptedException e1) { 436 ExceptionMonitor.getInstance().exceptionCaught(e1); 437 } 438 } 439 } 440 441 // Cleanup all the processors, and shutdown the acceptor. 442 if (selectable && isDisposing()) { 443 selectable = false; 444 try { 445 if (createdProcessor) { 446 processor.dispose(); 447 } 448 } finally { 449 try { 450 synchronized (disposalLock) { 451 if (isDisposing()) { 452 destroy(); 453 } 454 } 455 } catch (Exception e) { 456 ExceptionMonitor.getInstance().exceptionCaught(e); 457 } finally { 458 disposalFuture.setDone(); 459 } 460 } 461 } 462 } 463 464 /** 465 * This method will process new sessions for the Worker class. All 466 * keys that have had their status updates as per the Selector.selectedKeys() 467 * method will be processed here. Only keys that are ready to accept 468 * connections are handled here. 469 * <p/> 470 * Session objects are created by making new instances of SocketSessionImpl 471 * and passing the session object to the SocketIoProcessor class. 472 */ 473 @SuppressWarnings("unchecked") 474 private void processHandles(Iterator<H> handles) throws Exception { 475 while (handles.hasNext()) { 476 H handle = handles.next(); 477 handles.remove(); 478 479 // Associates a new created connection to a processor, 480 // and get back a session 481 T session = accept(processor, handle); 482 483 if (session == null) { 484 break; 485 } 486 487 initSession(session, null, null); 488 489 // add the session to the SocketIoProcessor 490 session.getProcessor().add(session); 491 } 492 } 493 } 494 495 /** 496 * Sets up the socket communications. Sets items such as: 497 * <p/> 498 * Blocking 499 * Reuse address 500 * Receive buffer size 501 * Bind to listen port 502 * Registers OP_ACCEPT for selector 503 */ 504 private int registerHandles() { 505 for (;;) { 506 // The register queue contains the list of services to manage 507 // in this acceptor. 508 AcceptorOperationFuture future = registerQueue.poll(); 509 510 if (future == null) { 511 return 0; 512 } 513 514 // We create a temporary map to store the bound handles, 515 // as we may have to remove them all if there is an exception 516 // during the sockets opening. 517 Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>(); 518 List<SocketAddress> localAddresses = future.getLocalAddresses(); 519 520 try { 521 // Process all the addresses 522 for (SocketAddress a : localAddresses) { 523 H handle = open(a); 524 newHandles.put(localAddress(handle), handle); 525 } 526 527 // Everything went ok, we can now update the map storing 528 // all the bound sockets. 529 boundHandles.putAll(newHandles); 530 531 // and notify. 532 future.setDone(); 533 return newHandles.size(); 534 } catch (Exception e) { 535 // We store the exception in the future 536 future.setException(e); 537 } finally { 538 // Roll back if failed to bind all addresses. 539 if (future.getException() != null) { 540 for (H handle : newHandles.values()) { 541 try { 542 close(handle); 543 } catch (Exception e) { 544 ExceptionMonitor.getInstance().exceptionCaught(e); 545 } 546 } 547 548 // TODO : add some comment : what is the wakeup() waking up ? 549 wakeup(); 550 } 551 } 552 } 553 } 554 555 /** 556 * This method just checks to see if anything has been placed into the 557 * cancellation queue. The only thing that should be in the cancelQueue 558 * is CancellationRequest objects and the only place this happens is in 559 * the doUnbind() method. 560 */ 561 private int unregisterHandles() { 562 int cancelledHandles = 0; 563 for (;;) { 564 AcceptorOperationFuture future = cancelQueue.poll(); 565 if (future == null) { 566 break; 567 } 568 569 // close the channels 570 for (SocketAddress a : future.getLocalAddresses()) { 571 H handle = boundHandles.remove(a); 572 if (handle == null) { 573 continue; 574 } 575 576 try { 577 close(handle); 578 wakeup(); // wake up again to trigger thread death 579 } catch (Throwable e) { 580 ExceptionMonitor.getInstance().exceptionCaught(e); 581 } finally { 582 cancelledHandles++; 583 } 584 } 585 586 future.setDone(); 587 } 588 589 return cancelledHandles; 590 } 591 592 /** 593 * {@inheritDoc} 594 */ 595 public final IoSession newSession(SocketAddress remoteAddress, 596 SocketAddress localAddress) { 597 throw new UnsupportedOperationException(); 598 } 599 }