001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.polling; 021 022import java.net.SocketAddress; 023import java.nio.channels.ClosedSelectorException; 024import java.nio.channels.spi.SelectorProvider; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Queue; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentLinkedQueue; 035import java.util.concurrent.Executor; 036import java.util.concurrent.Executors; 037import java.util.concurrent.Semaphore; 038import java.util.concurrent.atomic.AtomicReference; 039 040import org.apache.mina.core.RuntimeIoException; 041import org.apache.mina.core.filterchain.IoFilter; 042import org.apache.mina.core.service.AbstractIoAcceptor; 043import org.apache.mina.core.service.AbstractIoService; 044import org.apache.mina.core.service.IoAcceptor; 045import org.apache.mina.core.service.IoHandler; 046import org.apache.mina.core.service.IoProcessor; 047import org.apache.mina.core.service.SimpleIoProcessorPool; 048import org.apache.mina.core.session.AbstractIoSession; 049import org.apache.mina.core.session.IoSession; 050import org.apache.mina.core.session.IoSessionConfig; 051import org.apache.mina.transport.socket.SocketSessionConfig; 052import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 053import org.apache.mina.util.ExceptionMonitor; 054 055/** 056 * A base class for implementing transport using a polling strategy. The 057 * underlying sockets will be checked in an active loop and woke up when an 058 * socket needed to be processed. This class handle the logic behind binding, 059 * accepting and disposing the server sockets. An {@link Executor} will be used 060 * for running client accepting and an {@link AbstractPollingIoProcessor} will 061 * be used for processing client I/O operations like reading, writing and 062 * closing. 063 * 064 * All the low level methods for binding, accepting, closing need to be provided 065 * by the subclassing implementation. 066 * 067 * @see NioSocketAcceptor for a example of implementation 068 * 069 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 070 */ 071public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor { 072 /** A lock used to protect the selector to be waked up before it's created */ 073 private final Semaphore lock = new Semaphore(1); 074 075 private final IoProcessor<S> processor; 076 077 private final boolean createdProcessor; 078 079 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); 080 081 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); 082 083 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>()); 084 085 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); 086 087 /** A flag set when the acceptor has been created and initialized */ 088 private volatile boolean selectable; 089 090 /** The thread responsible of accepting incoming requests */ 091 private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>(); 092 093 protected boolean reuseAddress = false; 094 095 /** 096 * Define the number of socket that can wait to be accepted. Default 097 * to 50 (as in the SocketServer default). 098 */ 099 protected int backlog = 50; 100 101 /** 102 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 103 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 104 * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default 105 * pool size will be used. 106 * 107 * @see SimpleIoProcessorPool 108 * 109 * @param sessionConfig 110 * the default configuration for the managed {@link IoSession} 111 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} 112 * type. 113 */ 114 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) { 115 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null); 116 } 117 118 /** 119 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 120 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 121 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor 122 * systems. 123 * 124 * @see SimpleIoProcessorPool 125 * 126 * @param sessionConfig 127 * the default configuration for the managed {@link IoSession} 128 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} 129 * type. 130 * @param processorCount the amount of processor to instantiate for the pool 131 */ 132 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, 133 int processorCount) { 134 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null); 135 } 136 137 /** 138 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 139 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 140 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor 141 * systems. 142 * 143 * @see SimpleIoProcessorPool 144 * 145 * @param sessionConfig 146 * the default configuration for the managed {@link IoSession} 147 * @param processorClass a {@link Class}�of {@link IoProcessor} for the associated {@link IoSession} 148 * type. 149 * @param processorCount the amount of processor to instantiate for the pool 150 * @param selectorProvider The SelectorProvider to use 151 */ 152 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, 153 int processorCount, SelectorProvider selectorProvider ) { 154 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider); 155 } 156 157 /** 158 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 159 * session configuration, a default {@link Executor} will be created using 160 * {@link Executors#newCachedThreadPool()}. 161 * 162 * @see AbstractIoService 163 * 164 * @param sessionConfig 165 * the default configuration for the managed {@link IoSession} 166 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 167 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 168 */ 169 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) { 170 this(sessionConfig, null, processor, false, null); 171 } 172 173 /** 174 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a 175 * default session configuration and an {@link Executor} for handling I/O 176 * events. If a null {@link Executor} is provided, a default one will be 177 * created using {@link Executors#newCachedThreadPool()}. 178 * 179 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) 180 * 181 * @param sessionConfig 182 * the default configuration for the managed {@link IoSession} 183 * @param executor 184 * the {@link Executor} used for handling asynchronous execution 185 * of I/O events. Can be <code>null</code>. 186 * @param processor 187 * the {@link IoProcessor} for processing the {@link IoSession} 188 * of this transport, triggering events to the bound 189 * {@link IoHandler} and processing the chains of 190 * {@link IoFilter} 191 */ 192 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) { 193 this(sessionConfig, executor, processor, false, null); 194 } 195 196 /** 197 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a 198 * default session configuration and an {@link Executor} for handling I/O 199 * events. If a null {@link Executor} is provided, a default one will be 200 * created using {@link Executors#newCachedThreadPool()}. 201 * 202 * @see AbstractIoService(IoSessionConfig, Executor) 203 * 204 * @param sessionConfig 205 * the default configuration for the managed {@link IoSession} 206 * @param executor 207 * the {@link Executor} used for handling asynchronous execution 208 * of I/O events. Can be <code>null</code>. 209 * @param processor 210 * the {@link IoProcessor} for processing the {@link IoSession} 211 * of this transport, triggering events to the bound 212 * {@link IoHandler} and processing the chains of 213 * {@link IoFilter} 214 * @param createdProcessor 215 * tagging the processor as automatically created, so it will be 216 * automatically disposed 217 */ 218 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor, 219 boolean createdProcessor, SelectorProvider selectorProvider) { 220 super(sessionConfig, executor); 221 222 if (processor == null) { 223 throw new IllegalArgumentException("processor"); 224 } 225 226 this.processor = processor; 227 this.createdProcessor = createdProcessor; 228 229 try { 230 // Initialize the selector 231 init(selectorProvider); 232 233 // The selector is now ready, we can switch the 234 // flag to true so that incoming connection can be accepted 235 selectable = true; 236 } catch (RuntimeException e) { 237 throw e; 238 } catch (Exception e) { 239 throw new RuntimeIoException("Failed to initialize.", e); 240 } finally { 241 if (!selectable) { 242 try { 243 destroy(); 244 } catch (Exception e) { 245 ExceptionMonitor.getInstance().exceptionCaught(e); 246 } 247 } 248 } 249 } 250 251 /** 252 * Initialize the polling system, will be called at construction time. 253 * @throws Exception any exception thrown by the underlying system calls 254 */ 255 protected abstract void init() throws Exception; 256 257 /** 258 * Initialize the polling system, will be called at construction time. 259 * 260 * @param selectorProvider The Selector Provider that will be used by this polling acceptor 261 * @throws Exception any exception thrown by the underlying system calls 262 */ 263 protected abstract void init(SelectorProvider selectorProvider) throws Exception; 264 265 /** 266 * Destroy the polling system, will be called when this {@link IoAcceptor} 267 * implementation will be disposed. 268 * @throws Exception any exception thrown by the underlying systems calls 269 */ 270 protected abstract void destroy() throws Exception; 271 272 /** 273 * Check for acceptable connections, interrupt when at least a server is ready for accepting. 274 * All the ready server socket descriptors need to be returned by {@link #selectedHandles()} 275 * @return The number of sockets having got incoming client 276 * @throws Exception any exception thrown by the underlying systems calls 277 */ 278 protected abstract int select() throws Exception; 279 280 /** 281 * Interrupt the {@link #select()} method. Used when the poll set need to be modified. 282 */ 283 protected abstract void wakeup(); 284 285 /** 286 * {@link Iterator} for the set of server sockets found with acceptable incoming connections 287 * during the last {@link #select()} call. 288 * @return the list of server handles ready 289 */ 290 protected abstract Iterator<H> selectedHandles(); 291 292 /** 293 * Open a server socket for a given local address. 294 * @param localAddress the associated local address 295 * @return the opened server socket 296 * @throws Exception any exception thrown by the underlying systems calls 297 */ 298 protected abstract H open(SocketAddress localAddress) throws Exception; 299 300 /** 301 * Get the local address associated with a given server socket 302 * @param handle the server socket 303 * @return the local {@link SocketAddress} associated with this handle 304 * @throws Exception any exception thrown by the underlying systems calls 305 */ 306 protected abstract SocketAddress localAddress(H handle) throws Exception; 307 308 /** 309 * Accept a client connection for a server socket and return a new {@link IoSession} 310 * associated with the given {@link IoProcessor} 311 * @param processor the {@link IoProcessor} to associate with the {@link IoSession} 312 * @param handle the server handle 313 * @return the created {@link IoSession} 314 * @throws Exception any exception thrown by the underlying systems calls 315 */ 316 protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception; 317 318 /** 319 * Close a server socket. 320 * @param handle the server socket 321 * @throws Exception any exception thrown by the underlying systems calls 322 */ 323 protected abstract void close(H handle) throws Exception; 324 325 /** 326 * {@inheritDoc} 327 */ 328 @Override 329 protected void dispose0() throws Exception { 330 unbind(); 331 332 startupAcceptor(); 333 wakeup(); 334 } 335 336 /** 337 * {@inheritDoc} 338 */ 339 @Override 340 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { 341 // Create a bind request as a Future operation. When the selector 342 // have handled the registration, it will signal this future. 343 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); 344 345 // adds the Registration request to the queue for the Workers 346 // to handle 347 registerQueue.add(request); 348 349 // creates the Acceptor instance and has the local 350 // executor kick it off. 351 startupAcceptor(); 352 353 // As we just started the acceptor, we have to unblock the select() 354 // in order to process the bind request we just have added to the 355 // registerQueue. 356 try { 357 lock.acquire(); 358 359 // Wait a bit to give a chance to the Acceptor thread to do the select() 360 Thread.sleep(10); 361 wakeup(); 362 } finally { 363 lock.release(); 364 } 365 366 // Now, we wait until this request is completed. 367 request.awaitUninterruptibly(); 368 369 if (request.getException() != null) { 370 throw request.getException(); 371 } 372 373 // Update the local addresses. 374 // setLocalAddresses() shouldn't be called from the worker thread 375 // because of deadlock. 376 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>(); 377 378 for (H handle : boundHandles.values()) { 379 newLocalAddresses.add(localAddress(handle)); 380 } 381 382 return newLocalAddresses; 383 } 384 385 /** 386 * This method is called by the doBind() and doUnbind() 387 * methods. If the acceptor is null, the acceptor object will 388 * be created and kicked off by the executor. If the acceptor 389 * object is null, probably already created and this class 390 * is now working, then nothing will happen and the method 391 * will just return. 392 */ 393 private void startupAcceptor() throws InterruptedException { 394 // If the acceptor is not ready, clear the queues 395 // TODO : they should already be clean : do we have to do that ? 396 if (!selectable) { 397 registerQueue.clear(); 398 cancelQueue.clear(); 399 } 400 401 // start the acceptor if not already started 402 Acceptor acceptor = acceptorRef.get(); 403 404 if (acceptor == null) { 405 lock.acquire(); 406 acceptor = new Acceptor(); 407 408 if (acceptorRef.compareAndSet(null, acceptor)) { 409 executeWorker(acceptor); 410 } else { 411 lock.release(); 412 } 413 } 414 } 415 416 /** 417 * {@inheritDoc} 418 */ 419 @Override 420 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { 421 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses); 422 423 cancelQueue.add(future); 424 startupAcceptor(); 425 wakeup(); 426 427 future.awaitUninterruptibly(); 428 if (future.getException() != null) { 429 throw future.getException(); 430 } 431 } 432 433 /** 434 * This class is called by the startupAcceptor() method and is 435 * placed into a NamePreservingRunnable class. 436 * It's a thread accepting incoming connections from clients. 437 * The loop is stopped when all the bound handlers are unbound. 438 */ 439 private class Acceptor implements Runnable { 440 public void run() { 441 assert (acceptorRef.get() == this); 442 443 int nHandles = 0; 444 445 // Release the lock 446 lock.release(); 447 448 while (selectable) { 449 try { 450 // Detect if we have some keys ready to be processed 451 // The select() will be woke up if some new connection 452 // have occurred, or if the selector has been explicitly 453 // woke up 454 int selected = select(); 455 456 // this actually sets the selector to OP_ACCEPT, 457 // and binds to the port on which this class will 458 // listen on 459 nHandles += registerHandles(); 460 461 // Now, if the number of registred handles is 0, we can 462 // quit the loop: we don't have any socket listening 463 // for incoming connection. 464 if (nHandles == 0) { 465 acceptorRef.set(null); 466 467 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { 468 assert (acceptorRef.get() != this); 469 break; 470 } 471 472 if (!acceptorRef.compareAndSet(null, this)) { 473 assert (acceptorRef.get() != this); 474 break; 475 } 476 477 assert (acceptorRef.get() == this); 478 } 479 480 if (selected > 0) { 481 // We have some connection request, let's process 482 // them here. 483 processHandles(selectedHandles()); 484 } 485 486 // check to see if any cancellation request has been made. 487 nHandles -= unregisterHandles(); 488 } catch (ClosedSelectorException cse) { 489 // If the selector has been closed, we can exit the loop 490 ExceptionMonitor.getInstance().exceptionCaught(cse); 491 break; 492 } catch (Exception e) { 493 ExceptionMonitor.getInstance().exceptionCaught(e); 494 495 try { 496 Thread.sleep(1000); 497 } catch (InterruptedException e1) { 498 ExceptionMonitor.getInstance().exceptionCaught(e1); 499 } 500 } 501 } 502 503 // Cleanup all the processors, and shutdown the acceptor. 504 if (selectable && isDisposing()) { 505 selectable = false; 506 try { 507 if (createdProcessor) { 508 processor.dispose(); 509 } 510 } finally { 511 try { 512 synchronized (disposalLock) { 513 if (isDisposing()) { 514 destroy(); 515 } 516 } 517 } catch (Exception e) { 518 ExceptionMonitor.getInstance().exceptionCaught(e); 519 } finally { 520 disposalFuture.setDone(); 521 } 522 } 523 } 524 } 525 526 /** 527 * This method will process new sessions for the Worker class. All 528 * keys that have had their status updates as per the Selector.selectedKeys() 529 * method will be processed here. Only keys that are ready to accept 530 * connections are handled here. 531 * <p/> 532 * Session objects are created by making new instances of SocketSessionImpl 533 * and passing the session object to the SocketIoProcessor class. 534 */ 535 @SuppressWarnings("unchecked") 536 private void processHandles(Iterator<H> handles) throws Exception { 537 while (handles.hasNext()) { 538 H handle = handles.next(); 539 handles.remove(); 540 541 // Associates a new created connection to a processor, 542 // and get back a session 543 S session = accept(processor, handle); 544 545 if (session == null) { 546 continue; 547 } 548 549 initSession(session, null, null); 550 551 // add the session to the SocketIoProcessor 552 session.getProcessor().add(session); 553 } 554 } 555 } 556 557 /** 558 * Sets up the socket communications. Sets items such as: 559 * <p/> 560 * Blocking 561 * Reuse address 562 * Receive buffer size 563 * Bind to listen port 564 * Registers OP_ACCEPT for selector 565 */ 566 private int registerHandles() { 567 for (;;) { 568 // The register queue contains the list of services to manage 569 // in this acceptor. 570 AcceptorOperationFuture future = registerQueue.poll(); 571 572 if (future == null) { 573 return 0; 574 } 575 576 // We create a temporary map to store the bound handles, 577 // as we may have to remove them all if there is an exception 578 // during the sockets opening. 579 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>(); 580 List<SocketAddress> localAddresses = future.getLocalAddresses(); 581 582 try { 583 // Process all the addresses 584 for (SocketAddress a : localAddresses) { 585 H handle = open(a); 586 newHandles.put(localAddress(handle), handle); 587 } 588 589 // Everything went ok, we can now update the map storing 590 // all the bound sockets. 591 boundHandles.putAll(newHandles); 592 593 // and notify. 594 future.setDone(); 595 return newHandles.size(); 596 } catch (Exception e) { 597 // We store the exception in the future 598 future.setException(e); 599 } finally { 600 // Roll back if failed to bind all addresses. 601 if (future.getException() != null) { 602 for (H handle : newHandles.values()) { 603 try { 604 close(handle); 605 } catch (Exception e) { 606 ExceptionMonitor.getInstance().exceptionCaught(e); 607 } 608 } 609 610 // TODO : add some comment : what is the wakeup() waking up ? 611 wakeup(); 612 } 613 } 614 } 615 } 616 617 /** 618 * This method just checks to see if anything has been placed into the 619 * cancellation queue. The only thing that should be in the cancelQueue 620 * is CancellationRequest objects and the only place this happens is in 621 * the doUnbind() method. 622 */ 623 private int unregisterHandles() { 624 int cancelledHandles = 0; 625 for (;;) { 626 AcceptorOperationFuture future = cancelQueue.poll(); 627 if (future == null) { 628 break; 629 } 630 631 // close the channels 632 for (SocketAddress a : future.getLocalAddresses()) { 633 H handle = boundHandles.remove(a); 634 635 if (handle == null) { 636 continue; 637 } 638 639 try { 640 close(handle); 641 wakeup(); // wake up again to trigger thread death 642 } catch (Exception e) { 643 ExceptionMonitor.getInstance().exceptionCaught(e); 644 } finally { 645 cancelledHandles++; 646 } 647 } 648 649 future.setDone(); 650 } 651 652 return cancelledHandles; 653 } 654 655 /** 656 * {@inheritDoc} 657 */ 658 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { 659 throw new UnsupportedOperationException(); 660 } 661 662 /** 663 * @return the backLog 664 */ 665 public int getBacklog() { 666 return backlog; 667 } 668 669 /** 670 * Sets the Backlog parameter 671 * 672 * @param backlog 673 * the backlog variable 674 */ 675 public void setBacklog(int backlog) { 676 synchronized (bindLock) { 677 if (isActive()) { 678 throw new IllegalStateException("backlog can't be set while the acceptor is bound."); 679 } 680 681 this.backlog = backlog; 682 } 683 } 684 685 /** 686 * @return the flag that sets the reuseAddress information 687 */ 688 public boolean isReuseAddress() { 689 return reuseAddress; 690 } 691 692 /** 693 * Set the Reuse Address flag 694 * 695 * @param reuseAddress 696 * The flag to set 697 */ 698 public void setReuseAddress(boolean reuseAddress) { 699 synchronized (bindLock) { 700 if (isActive()) { 701 throw new IllegalStateException("backlog can't be set while the acceptor is bound."); 702 } 703 704 this.reuseAddress = reuseAddress; 705 } 706 } 707 708 /** 709 * {@inheritDoc} 710 */ 711 public SocketSessionConfig getSessionConfig() { 712 return (SocketSessionConfig)sessionConfig; 713 } 714}