001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.polling; 021 022import java.net.SocketAddress; 023import java.nio.channels.ClosedSelectorException; 024import java.nio.channels.spi.SelectorProvider; 025import java.util.Collections; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Queue; 032import java.util.Set; 033import java.util.concurrent.ConcurrentHashMap; 034import java.util.concurrent.ConcurrentLinkedQueue; 035import java.util.concurrent.Executor; 036import java.util.concurrent.Executors; 037import java.util.concurrent.Semaphore; 038import java.util.concurrent.atomic.AtomicReference; 039 040import org.apache.mina.core.RuntimeIoException; 041import org.apache.mina.core.filterchain.IoFilter; 042import org.apache.mina.core.service.AbstractIoAcceptor; 043import org.apache.mina.core.service.AbstractIoService; 044import org.apache.mina.core.service.IoAcceptor; 045import org.apache.mina.core.service.IoHandler; 046import org.apache.mina.core.service.IoProcessor; 047import org.apache.mina.core.service.SimpleIoProcessorPool; 048import org.apache.mina.core.session.AbstractIoSession; 049import org.apache.mina.core.session.IoSession; 050import org.apache.mina.core.session.IoSessionConfig; 051import org.apache.mina.transport.socket.SocketSessionConfig; 052import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 053import org.apache.mina.util.ExceptionMonitor; 054 055/** 056 * A base class for implementing transport using a polling strategy. The 057 * underlying sockets will be checked in an active loop and woke up when an 058 * socket needed to be processed. This class handle the logic behind binding, 059 * accepting and disposing the server sockets. An {@link Executor} will be used 060 * for running client accepting and an {@link AbstractPollingIoProcessor} will 061 * be used for processing client I/O operations like reading, writing and 062 * closing. 063 * 064 * All the low level methods for binding, accepting, closing need to be provided 065 * by the subclassing implementation. 066 * 067 * @see NioSocketAcceptor for a example of implementation 068 * 069 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 070 */ 071public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor { 072 /** A lock used to protect the selector to be waked up before it's created */ 073 private final Semaphore lock = new Semaphore(1); 074 075 private final IoProcessor<S> processor; 076 077 private final boolean createdProcessor; 078 079 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>(); 080 081 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>(); 082 083 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>()); 084 085 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); 086 087 /** A flag set when the acceptor has been created and initialized */ 088 private volatile boolean selectable; 089 090 /** The thread responsible of accepting incoming requests */ 091 private AtomicReference<Acceptor> acceptorRef = new AtomicReference<>(); 092 093 protected boolean reuseAddress = false; 094 095 /** 096 * Define the number of socket that can wait to be accepted. Default 097 * to 50 (as in the SocketServer default). 098 */ 099 protected int backlog = 50; 100 101 /** 102 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 103 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 104 * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default 105 * pool size will be used. 106 * 107 * @see SimpleIoProcessorPool 108 * 109 * @param sessionConfig 110 * the default configuration for the managed {@link IoSession} 111 * @param processorClass a {@link Class}?of {@link IoProcessor} for the associated {@link IoSession} 112 * type. 113 */ 114 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) { 115 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null); 116 } 117 118 /** 119 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 120 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 121 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor 122 * systems. 123 * 124 * @see SimpleIoProcessorPool 125 * 126 * @param sessionConfig 127 * the default configuration for the managed {@link IoSession} 128 * @param processorClass a {@link Class}?of {@link IoProcessor} for the associated {@link IoSession} 129 * type. 130 * @param processorCount the amount of processor to instantiate for the pool 131 */ 132 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, 133 int processorCount) { 134 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null); 135 } 136 137 /** 138 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 139 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 140 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor 141 * systems. 142 * 143 * @see SimpleIoProcessorPool 144 * 145 * @param sessionConfig 146 * the default configuration for the managed {@link IoSession} 147 * @param processorClass a {@link Class}?of {@link IoProcessor} for the associated {@link IoSession} 148 * type. 149 * @param processorCount the amount of processor to instantiate for the pool 150 * @param selectorProvider The SelectorProvider to use 151 */ 152 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, 153 int processorCount, SelectorProvider selectorProvider ) { 154 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider); 155 } 156 157 /** 158 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 159 * session configuration, a default {@link Executor} will be created using 160 * {@link Executors#newCachedThreadPool()}. 161 * 162 * @see AbstractIoService 163 * 164 * @param sessionConfig 165 * the default configuration for the managed {@link IoSession} 166 * @param processor the {@link IoProcessor}?for processing the {@link IoSession} of this transport, triggering 167 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 168 */ 169 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) { 170 this(sessionConfig, null, processor, false, null); 171 } 172 173 /** 174 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a 175 * default session configuration and an {@link Executor} for handling I/O 176 * events. If a null {@link Executor} is provided, a default one will be 177 * created using {@link Executors#newCachedThreadPool()}. 178 * 179 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) 180 * 181 * @param sessionConfig 182 * the default configuration for the managed {@link IoSession} 183 * @param executor 184 * the {@link Executor} used for handling asynchronous execution 185 * of I/O events. Can be <code>null</code>. 186 * @param processor 187 * the {@link IoProcessor}?for processing the {@link IoSession} 188 * of this transport, triggering events to the bound 189 * {@link IoHandler} and processing the chains of 190 * {@link IoFilter} 191 */ 192 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) { 193 this(sessionConfig, executor, processor, false, null); 194 } 195 196 /** 197 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a 198 * default session configuration and an {@link Executor} for handling I/O 199 * events. If a null {@link Executor} is provided, a default one will be 200 * created using {@link Executors#newCachedThreadPool()}. 201 * 202 * @see AbstractIoService(IoSessionConfig, Executor) 203 * 204 * @param sessionConfig 205 * the default configuration for the managed {@link IoSession} 206 * @param executor 207 * the {@link Executor} used for handling asynchronous execution 208 * of I/O events. Can be <code>null</code>. 209 * @param processor 210 * the {@link IoProcessor}?for processing the {@link IoSession} 211 * of this transport, triggering events to the bound 212 * {@link IoHandler} and processing the chains of 213 * {@link IoFilter} 214 * @param createdProcessor 215 * tagging the processor as automatically created, so it will be 216 * automatically disposed 217 */ 218 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor, 219 boolean createdProcessor, SelectorProvider selectorProvider) { 220 super(sessionConfig, executor); 221 222 if (processor == null) { 223 throw new IllegalArgumentException("processor"); 224 } 225 226 this.processor = processor; 227 this.createdProcessor = createdProcessor; 228 229 try { 230 // Initialize the selector 231 init(selectorProvider); 232 233 // The selector is now ready, we can switch the 234 // flag to true so that incoming connection can be accepted 235 selectable = true; 236 } catch (RuntimeException e) { 237 throw e; 238 } catch (Exception e) { 239 throw new RuntimeIoException("Failed to initialize.", e); 240 } finally { 241 if (!selectable) { 242 try { 243 destroy(); 244 } catch (Exception e) { 245 ExceptionMonitor.getInstance().exceptionCaught(e); 246 } 247 } 248 } 249 } 250 251 /** 252 * Initialize the polling system, will be called at construction time. 253 * @throws Exception any exception thrown by the underlying system calls 254 */ 255 protected abstract void init() throws Exception; 256 257 /** 258 * Initialize the polling system, will be called at construction time. 259 * 260 * @param selectorProvider The Selector Provider that will be used by this polling acceptor 261 * @throws Exception any exception thrown by the underlying system calls 262 */ 263 protected abstract void init(SelectorProvider selectorProvider) throws Exception; 264 265 /** 266 * Destroy the polling system, will be called when this {@link IoAcceptor} 267 * implementation will be disposed. 268 * @throws Exception any exception thrown by the underlying systems calls 269 */ 270 protected abstract void destroy() throws Exception; 271 272 /** 273 * Check for acceptable connections, interrupt when at least a server is ready for accepting. 274 * All the ready server socket descriptors need to be returned by {@link #selectedHandles()} 275 * @return The number of sockets having got incoming client 276 * @throws Exception any exception thrown by the underlying systems calls 277 */ 278 protected abstract int select() throws Exception; 279 280 /** 281 * Interrupt the {@link #select()} method. Used when the poll set need to be modified. 282 */ 283 protected abstract void wakeup(); 284 285 /** 286 * {@link Iterator} for the set of server sockets found with acceptable incoming connections 287 * during the last {@link #select()} call. 288 * @return the list of server handles ready 289 */ 290 protected abstract Iterator<H> selectedHandles(); 291 292 /** 293 * Open a server socket for a given local address. 294 * @param localAddress the associated local address 295 * @return the opened server socket 296 * @throws Exception any exception thrown by the underlying systems calls 297 */ 298 protected abstract H open(SocketAddress localAddress) throws Exception; 299 300 /** 301 * Get the local address associated with a given server socket 302 * @param handle the server socket 303 * @return the local {@link SocketAddress} associated with this handle 304 * @throws Exception any exception thrown by the underlying systems calls 305 */ 306 protected abstract SocketAddress localAddress(H handle) throws Exception; 307 308 /** 309 * Accept a client connection for a server socket and return a new {@link IoSession} 310 * associated with the given {@link IoProcessor} 311 * @param processor the {@link IoProcessor} to associate with the {@link IoSession} 312 * @param handle the server handle 313 * @return the created {@link IoSession} 314 * @throws Exception any exception thrown by the underlying systems calls 315 */ 316 protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception; 317 318 /** 319 * Close a server socket. 320 * @param handle the server socket 321 * @throws Exception any exception thrown by the underlying systems calls 322 */ 323 protected abstract void close(H handle) throws Exception; 324 325 /** 326 * {@inheritDoc} 327 */ 328 @Override 329 protected void dispose0() throws Exception { 330 unbind(); 331 332 startupAcceptor(); 333 wakeup(); 334 } 335 336 /** 337 * {@inheritDoc} 338 */ 339 @Override 340 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { 341 // Create a bind request as a Future operation. When the selector 342 // have handled the registration, it will signal this future. 343 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); 344 345 // adds the Registration request to the queue for the Workers 346 // to handle 347 registerQueue.add(request); 348 349 // creates the Acceptor instance and has the local 350 // executor kick it off. 351 startupAcceptor(); 352 353 // As we just started the acceptor, we have to unblock the select() 354 // in order to process the bind request we just have added to the 355 // registerQueue. 356 try { 357 lock.acquire(); 358 359 wakeup(); 360 } finally { 361 lock.release(); 362 } 363 364 // Now, we wait until this request is completed. 365 request.awaitUninterruptibly(); 366 367 if (request.getException() != null) { 368 throw request.getException(); 369 } 370 371 // Update the local addresses. 372 // setLocalAddresses() shouldn't be called from the worker thread 373 // because of deadlock. 374 Set<SocketAddress> newLocalAddresses = new HashSet<>(); 375 376 for (H handle : boundHandles.values()) { 377 newLocalAddresses.add(localAddress(handle)); 378 } 379 380 return newLocalAddresses; 381 } 382 383 /** 384 * This method is called by the doBind() and doUnbind() 385 * methods. If the acceptor is null, the acceptor object will 386 * be created and kicked off by the executor. If the acceptor 387 * object is null, probably already created and this class 388 * is now working, then nothing will happen and the method 389 * will just return. 390 */ 391 private void startupAcceptor() throws InterruptedException { 392 // If the acceptor is not ready, clear the queues 393 // TODO : they should already be clean : do we have to do that ? 394 if (!selectable) { 395 registerQueue.clear(); 396 cancelQueue.clear(); 397 } 398 399 // start the acceptor if not already started 400 Acceptor acceptor = acceptorRef.get(); 401 402 if (acceptor == null) { 403 lock.acquire(); 404 acceptor = new Acceptor(); 405 406 if (acceptorRef.compareAndSet(null, acceptor)) { 407 executeWorker(acceptor); 408 } else { 409 lock.release(); 410 } 411 } 412 } 413 414 /** 415 * {@inheritDoc} 416 */ 417 @Override 418 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { 419 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses); 420 421 cancelQueue.add(future); 422 startupAcceptor(); 423 wakeup(); 424 425 future.awaitUninterruptibly(); 426 if (future.getException() != null) { 427 throw future.getException(); 428 } 429 } 430 431 /** 432 * This class is called by the startupAcceptor() method and is 433 * placed into a NamePreservingRunnable class. 434 * It's a thread accepting incoming connections from clients. 435 * The loop is stopped when all the bound handlers are unbound. 436 */ 437 private class Acceptor implements Runnable { 438 public void run() { 439 assert (acceptorRef.get() == this); 440 441 int nHandles = 0; 442 443 // Release the lock 444 lock.release(); 445 446 while (selectable) { 447 try { 448 // Process the bound sockets to this acceptor. 449 // this actually sets the selector to OP_ACCEPT, 450 // and binds to the port on which this class will 451 // listen on. We do that before the select because 452 // the registerQueue containing the new handler is 453 // already updated at this point. 454 nHandles += registerHandles(); 455 456 // Detect if we have some keys ready to be processed 457 // The select() will be woke up if some new connection 458 // have occurred, or if the selector has been explicitly 459 // woke up 460 int selected = select(); 461 462 // Now, if the number of registred handles is 0, we can 463 // quit the loop: we don't have any socket listening 464 // for incoming connection. 465 if (nHandles == 0) { 466 acceptorRef.set(null); 467 468 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { 469 assert (acceptorRef.get() != this); 470 break; 471 } 472 473 if (!acceptorRef.compareAndSet(null, this)) { 474 assert (acceptorRef.get() != this); 475 break; 476 } 477 478 assert (acceptorRef.get() == this); 479 } 480 481 if (selected > 0) { 482 // We have some connection request, let's process 483 // them here. 484 processHandles(selectedHandles()); 485 } 486 487 // check to see if any cancellation request has been made. 488 nHandles -= unregisterHandles(); 489 } catch (ClosedSelectorException cse) { 490 // If the selector has been closed, we can exit the loop 491 ExceptionMonitor.getInstance().exceptionCaught(cse); 492 break; 493 } catch (Exception e) { 494 ExceptionMonitor.getInstance().exceptionCaught(e); 495 496 try { 497 Thread.sleep(1000); 498 } catch (InterruptedException e1) { 499 ExceptionMonitor.getInstance().exceptionCaught(e1); 500 } 501 } 502 } 503 504 // Cleanup all the processors, and shutdown the acceptor. 505 if (selectable && isDisposing()) { 506 selectable = false; 507 try { 508 if (createdProcessor) { 509 processor.dispose(); 510 } 511 } finally { 512 try { 513 synchronized (disposalLock) { 514 if (isDisposing()) { 515 destroy(); 516 } 517 } 518 } catch (Exception e) { 519 ExceptionMonitor.getInstance().exceptionCaught(e); 520 } finally { 521 disposalFuture.setDone(); 522 } 523 } 524 } 525 } 526 527 /** 528 * This method will process new sessions for the Worker class. All 529 * keys that have had their status updates as per the Selector.selectedKeys() 530 * method will be processed here. Only keys that are ready to accept 531 * connections are handled here. 532 * <p/> 533 * Session objects are created by making new instances of SocketSessionImpl 534 * and passing the session object to the SocketIoProcessor class. 535 */ 536 @SuppressWarnings("unchecked") 537 private void processHandles(Iterator<H> handles) throws Exception { 538 while (handles.hasNext()) { 539 H handle = handles.next(); 540 handles.remove(); 541 542 // Associates a new created connection to a processor, 543 // and get back a session 544 S session = accept(processor, handle); 545 546 if (session == null) { 547 continue; 548 } 549 550 initSession(session, null, null); 551 552 // add the session to the SocketIoProcessor 553 session.getProcessor().add(session); 554 } 555 } 556 } 557 558 /** 559 * Sets up the socket communications. Sets items such as: 560 * <p/> 561 * Blocking 562 * Reuse address 563 * Receive buffer size 564 * Bind to listen port 565 * Registers OP_ACCEPT for selector 566 */ 567 private int registerHandles() { 568 for (;;) { 569 // The register queue contains the list of services to manage 570 // in this acceptor. 571 AcceptorOperationFuture future = registerQueue.poll(); 572 573 if (future == null) { 574 return 0; 575 } 576 577 // We create a temporary map to store the bound handles, 578 // as we may have to remove them all if there is an exception 579 // during the sockets opening. 580 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>(); 581 List<SocketAddress> localAddresses = future.getLocalAddresses(); 582 583 try { 584 // Process all the addresses 585 for (SocketAddress a : localAddresses) { 586 H handle = open(a); 587 newHandles.put(localAddress(handle), handle); 588 } 589 590 // Everything went ok, we can now update the map storing 591 // all the bound sockets. 592 boundHandles.putAll(newHandles); 593 594 // and notify. 595 future.setDone(); 596 597 return newHandles.size(); 598 } catch (Exception e) { 599 // We store the exception in the future 600 future.setException(e); 601 } finally { 602 // Roll back if failed to bind all addresses. 603 if (future.getException() != null) { 604 for (H handle : newHandles.values()) { 605 try { 606 close(handle); 607 } catch (Exception e) { 608 ExceptionMonitor.getInstance().exceptionCaught(e); 609 } 610 } 611 612 // Wake up the selector to be sure we will process the newly bound handle 613 // and not block forever in the select() 614 wakeup(); 615 } 616 } 617 } 618 } 619 620 /** 621 * This method just checks to see if anything has been placed into the 622 * cancellation queue. The only thing that should be in the cancelQueue 623 * is CancellationRequest objects and the only place this happens is in 624 * the doUnbind() method. 625 */ 626 private int unregisterHandles() { 627 int cancelledHandles = 0; 628 for (;;) { 629 AcceptorOperationFuture future = cancelQueue.poll(); 630 if (future == null) { 631 break; 632 } 633 634 // close the channels 635 for (SocketAddress a : future.getLocalAddresses()) { 636 H handle = boundHandles.remove(a); 637 638 if (handle == null) { 639 continue; 640 } 641 642 try { 643 close(handle); 644 wakeup(); // wake up again to trigger thread death 645 } catch (Exception e) { 646 ExceptionMonitor.getInstance().exceptionCaught(e); 647 } finally { 648 cancelledHandles++; 649 } 650 } 651 652 future.setDone(); 653 } 654 655 return cancelledHandles; 656 } 657 658 /** 659 * {@inheritDoc} 660 */ 661 @Override 662 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { 663 throw new UnsupportedOperationException(); 664 } 665 666 /** 667 * @return the backLog 668 */ 669 public int getBacklog() { 670 return backlog; 671 } 672 673 /** 674 * Sets the Backlog parameter 675 * 676 * @param backlog 677 * the backlog variable 678 */ 679 public void setBacklog(int backlog) { 680 synchronized (bindLock) { 681 if (isActive()) { 682 throw new IllegalStateException("backlog can't be set while the acceptor is bound."); 683 } 684 685 this.backlog = backlog; 686 } 687 } 688 689 /** 690 * @return the flag that sets the reuseAddress information 691 */ 692 public boolean isReuseAddress() { 693 return reuseAddress; 694 } 695 696 /** 697 * Set the Reuse Address flag 698 * 699 * @param reuseAddress 700 * The flag to set 701 */ 702 public void setReuseAddress(boolean reuseAddress) { 703 synchronized (bindLock) { 704 if (isActive()) { 705 throw new IllegalStateException("backlog can't be set while the acceptor is bound."); 706 } 707 708 this.reuseAddress = reuseAddress; 709 } 710 } 711 712 /** 713 * {@inheritDoc} 714 */ 715 @Override 716 public SocketSessionConfig getSessionConfig() { 717 return (SocketSessionConfig)sessionConfig; 718 } 719}