1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 * 19 */ 20 package org.apache.mina.core.polling; 21 22 import java.net.SocketAddress; 23 import java.nio.channels.ClosedSelectorException; 24 import java.nio.channels.spi.SelectorProvider; 25 import java.util.Collections; 26 import java.util.HashMap; 27 import java.util.HashSet; 28 import java.util.Iterator; 29 import java.util.List; 30 import java.util.Map; 31 import java.util.Queue; 32 import java.util.Set; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.concurrent.ConcurrentLinkedQueue; 35 import java.util.concurrent.Executor; 36 import java.util.concurrent.Executors; 37 import java.util.concurrent.Semaphore; 38 import java.util.concurrent.atomic.AtomicReference; 39 40 import org.apache.mina.core.RuntimeIoException; 41 import org.apache.mina.core.filterchain.IoFilter; 42 import org.apache.mina.core.service.AbstractIoAcceptor; 43 import org.apache.mina.core.service.AbstractIoService; 44 import org.apache.mina.core.service.IoAcceptor; 45 import org.apache.mina.core.service.IoHandler; 46 import org.apache.mina.core.service.IoProcessor; 47 import org.apache.mina.core.service.SimpleIoProcessorPool; 48 import org.apache.mina.core.session.AbstractIoSession; 49 import org.apache.mina.core.session.IoSession; 50 import org.apache.mina.core.session.IoSessionConfig; 51 import org.apache.mina.transport.socket.SocketSessionConfig; 52 import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 53 import org.apache.mina.util.ExceptionMonitor; 54 55 /** 56 * A base class for implementing transport using a polling strategy. The 57 * underlying sockets will be checked in an active loop and woke up when an 58 * socket needed to be processed. This class handle the logic behind binding, 59 * accepting and disposing the server sockets. An {@link Executor} will be used 60 * for running client accepting and an {@link AbstractPollingIoProcessor} will 61 * be used for processing client I/O operations like reading, writing and 62 * closing. 63 * 64 * All the low level methods for binding, accepting, closing need to be provided 65 * by the subclassing implementation. 66 * 67 * @see NioSocketAcceptor for a example of implementation 68 * @param <H> The type of IoHandler 69 * @param <S> The type of IoSession 70 * 71 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 72 */ 73 public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor { 74 /** A lock used to protect the selector to be waked up before it's created */ 75 private final Semaphore lock = new Semaphore(1); 76 77 private final IoProcessor<S> processor; 78 79 private final boolean createdProcessor; 80 81 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>(); 82 83 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>(); 84 85 private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>()); 86 87 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); 88 89 /** A flag set when the acceptor has been created and initialized */ 90 private volatile boolean selectable; 91 92 /** The thread responsible of accepting incoming requests */ 93 private AtomicReference<Acceptor> acceptorRef = new AtomicReference<>(); 94 95 protected boolean reuseAddress = false; 96 97 /** 98 * Define the number of socket that can wait to be accepted. Default 99 * to 50 (as in the SocketServer default). 100 */ 101 protected int backlog = 50; 102 103 /** 104 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 105 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 106 * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default 107 * pool size will be used. 108 * 109 * @see SimpleIoProcessorPool 110 * 111 * @param sessionConfig 112 * the default configuration for the managed {@link IoSession} 113 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} 114 * type. 115 */ 116 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) { 117 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null); 118 } 119 120 /** 121 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 122 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 123 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor 124 * systems. 125 * 126 * @see SimpleIoProcessorPool 127 * 128 * @param sessionConfig 129 * the default configuration for the managed {@link IoSession} 130 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} 131 * type. 132 * @param processorCount the amount of processor to instantiate for the pool 133 */ 134 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, 135 int processorCount) { 136 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null); 137 } 138 139 /** 140 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 141 * session configuration, a class of {@link IoProcessor} which will be instantiated in a 142 * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor 143 * systems. 144 * 145 * @see SimpleIoProcessorPool 146 * 147 * @param sessionConfig 148 * the default configuration for the managed {@link IoSession} 149 * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession} 150 * type. 151 * @param processorCount the amount of processor to instantiate for the pool 152 * @param selectorProvider The SelectorProvider to use 153 */ 154 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass, 155 int processorCount, SelectorProvider selectorProvider ) { 156 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider); 157 } 158 159 /** 160 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default 161 * session configuration, a default {@link Executor} will be created using 162 * {@link Executors#newCachedThreadPool()}. 163 * 164 * @see AbstractIoService 165 * 166 * @param sessionConfig 167 * the default configuration for the managed {@link IoSession} 168 * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 169 * events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 170 */ 171 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) { 172 this(sessionConfig, null, processor, false, null); 173 } 174 175 /** 176 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a 177 * default session configuration and an {@link Executor} for handling I/O 178 * events. If a null {@link Executor} is provided, a default one will be 179 * created using {@link Executors#newCachedThreadPool()}. 180 * 181 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) 182 * 183 * @param sessionConfig 184 * the default configuration for the managed {@link IoSession} 185 * @param executor 186 * the {@link Executor} used for handling asynchronous execution 187 * of I/O events. Can be <code>null</code>. 188 * @param processor 189 * the {@link IoProcessor} for processing the {@link IoSession} 190 * of this transport, triggering events to the bound 191 * {@link IoHandler} and processing the chains of 192 * {@link IoFilter} 193 */ 194 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) { 195 this(sessionConfig, executor, processor, false, null); 196 } 197 198 /** 199 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a 200 * default session configuration and an {@link Executor} for handling I/O 201 * events. If a null {@link Executor} is provided, a default one will be 202 * created using {@link Executors#newCachedThreadPool()}. 203 * 204 * @see #AbstractIoService(IoSessionConfig, Executor) 205 * 206 * @param sessionConfig 207 * the default configuration for the managed {@link IoSession} 208 * @param executor 209 * the {@link Executor} used for handling asynchronous execution 210 * of I/O events. Can be <code>null</code>. 211 * @param processor 212 * the {@link IoProcessor} for processing the {@link IoSession} 213 * of this transport, triggering events to the bound 214 * {@link IoHandler} and processing the chains of 215 * {@link IoFilter} 216 * @param createdProcessor 217 * tagging the processor as automatically created, so it will be 218 * automatically disposed 219 */ 220 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor, 221 boolean createdProcessor, SelectorProvider selectorProvider) { 222 super(sessionConfig, executor); 223 224 if (processor == null) { 225 throw new IllegalArgumentException("processor"); 226 } 227 228 this.processor = processor; 229 this.createdProcessor = createdProcessor; 230 231 try { 232 // Initialize the selector 233 init(selectorProvider); 234 235 // The selector is now ready, we can switch the 236 // flag to true so that incoming connection can be accepted 237 selectable = true; 238 } catch (RuntimeException e) { 239 throw e; 240 } catch (Exception e) { 241 throw new RuntimeIoException("Failed to initialize.", e); 242 } finally { 243 if (!selectable) { 244 try { 245 destroy(); 246 } catch (Exception e) { 247 ExceptionMonitor.getInstance().exceptionCaught(e); 248 } 249 } 250 } 251 } 252 253 /** 254 * Initialize the polling system, will be called at construction time. 255 * @throws Exception any exception thrown by the underlying system calls 256 */ 257 protected abstract void init() throws Exception; 258 259 /** 260 * Initialize the polling system, will be called at construction time. 261 * 262 * @param selectorProvider The Selector Provider that will be used by this polling acceptor 263 * @throws Exception any exception thrown by the underlying system calls 264 */ 265 protected abstract void init(SelectorProvider selectorProvider) throws Exception; 266 267 /** 268 * Destroy the polling system, will be called when this {@link IoAcceptor} 269 * implementation will be disposed. 270 * @throws Exception any exception thrown by the underlying systems calls 271 */ 272 protected abstract void destroy() throws Exception; 273 274 /** 275 * Check for acceptable connections, interrupt when at least a server is ready for accepting. 276 * All the ready server socket descriptors need to be returned by {@link #selectedHandles()} 277 * @return The number of sockets having got incoming client 278 * @throws Exception any exception thrown by the underlying systems calls 279 */ 280 protected abstract int select() throws Exception; 281 282 /** 283 * Interrupt the {@link #select()} method. Used when the poll set need to be modified. 284 */ 285 protected abstract void wakeup(); 286 287 /** 288 * {@link Iterator} for the set of server sockets found with acceptable incoming connections 289 * during the last {@link #select()} call. 290 * @return the list of server handles ready 291 */ 292 protected abstract Iterator<H> selectedHandles(); 293 294 /** 295 * Open a server socket for a given local address. 296 * @param localAddress the associated local address 297 * @return the opened server socket 298 * @throws Exception any exception thrown by the underlying systems calls 299 */ 300 protected abstract H open(SocketAddress localAddress) throws Exception; 301 302 /** 303 * Get the local address associated with a given server socket 304 * @param handle the server socket 305 * @return the local {@link SocketAddress} associated with this handle 306 * @throws Exception any exception thrown by the underlying systems calls 307 */ 308 protected abstract SocketAddress localAddress(H handle) throws Exception; 309 310 /** 311 * Accept a client connection for a server socket and return a new {@link IoSession} 312 * associated with the given {@link IoProcessor} 313 * @param processor the {@link IoProcessor} to associate with the {@link IoSession} 314 * @param handle the server handle 315 * @return the created {@link IoSession} 316 * @throws Exception any exception thrown by the underlying systems calls 317 */ 318 protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception; 319 320 /** 321 * Close a server socket. 322 * @param handle the server socket 323 * @throws Exception any exception thrown by the underlying systems calls 324 */ 325 protected abstract void close(H handle) throws Exception; 326 327 /** 328 * {@inheritDoc} 329 */ 330 @Override 331 protected void dispose0() throws Exception { 332 unbind(); 333 334 startupAcceptor(); 335 wakeup(); 336 } 337 338 /** 339 * {@inheritDoc} 340 */ 341 @Override 342 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { 343 // Create a bind request as a Future operation. When the selector 344 // have handled the registration, it will signal this future. 345 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); 346 347 // adds the Registration request to the queue for the Workers 348 // to handle 349 registerQueue.add(request); 350 351 // creates the Acceptor instance and has the local 352 // executor kick it off. 353 startupAcceptor(); 354 355 // As we just started the acceptor, we have to unblock the select() 356 // in order to process the bind request we just have added to the 357 // registerQueue. 358 try { 359 lock.acquire(); 360 361 wakeup(); 362 } finally { 363 lock.release(); 364 } 365 366 // Now, we wait until this request is completed. 367 request.awaitUninterruptibly(); 368 369 if (request.getException() != null) { 370 throw request.getException(); 371 } 372 373 // Update the local addresses. 374 // setLocalAddresses() shouldn't be called from the worker thread 375 // because of deadlock. 376 Set<SocketAddress> newLocalAddresses = new HashSet<>(); 377 378 for (H handle : boundHandles.values()) { 379 newLocalAddresses.add(localAddress(handle)); 380 } 381 382 return newLocalAddresses; 383 } 384 385 /** 386 * This method is called by the doBind() and doUnbind() 387 * methods. If the acceptor is null, the acceptor object will 388 * be created and kicked off by the executor. If the acceptor 389 * object is null, probably already created and this class 390 * is now working, then nothing will happen and the method 391 * will just return. 392 */ 393 private void startupAcceptor() throws InterruptedException { 394 // If the acceptor is not ready, clear the queues 395 // TODO : they should already be clean : do we have to do that ? 396 if (!selectable) { 397 registerQueue.clear(); 398 cancelQueue.clear(); 399 } 400 401 // start the acceptor if not already started 402 Acceptor acceptor = acceptorRef.get(); 403 404 if (acceptor == null) { 405 lock.acquire(); 406 acceptor = new Acceptor(); 407 408 if (acceptorRef.compareAndSet(null, acceptor)) { 409 executeWorker(acceptor); 410 } else { 411 lock.release(); 412 } 413 } 414 } 415 416 /** 417 * {@inheritDoc} 418 */ 419 @Override 420 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { 421 AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses); 422 423 cancelQueue.add(future); 424 startupAcceptor(); 425 wakeup(); 426 427 future.awaitUninterruptibly(); 428 if (future.getException() != null) { 429 throw future.getException(); 430 } 431 } 432 433 /** 434 * This class is called by the startupAcceptor() method and is 435 * placed into a NamePreservingRunnable class. 436 * It's a thread accepting incoming connections from clients. 437 * The loop is stopped when all the bound handlers are unbound. 438 */ 439 private class Acceptor implements Runnable { 440 /** 441 * {@inheritDoc} 442 */ 443 @Override 444 public void run() { 445 assert acceptorRef.get() == this; 446 447 int nHandles = 0; 448 449 // Release the lock 450 lock.release(); 451 452 while (selectable) { 453 try { 454 // Process the bound sockets to this acceptor. 455 // this actually sets the selector to OP_ACCEPT, 456 // and binds to the port on which this class will 457 // listen on. We do that before the select because 458 // the registerQueue containing the new handler is 459 // already updated at this point. 460 nHandles += registerHandles(); 461 462 // Detect if we have some keys ready to be processed 463 // The select() will be woke up if some new connection 464 // have occurred, or if the selector has been explicitly 465 // woke up 466 int selected = select(); 467 468 // Now, if the number of registered handles is 0, we can 469 // quit the loop: we don't have any socket listening 470 // for incoming connection. 471 if (nHandles == 0) { 472 acceptorRef.set(null); 473 474 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { 475 assert acceptorRef.get() != this; 476 break; 477 } 478 479 if (!acceptorRef.compareAndSet(null, this)) { 480 assert acceptorRef.get() != this; 481 break; 482 } 483 484 assert acceptorRef.get() == this; 485 } 486 487 if (selected > 0) { 488 // We have some connection request, let's process 489 // them here. 490 processHandles(selectedHandles()); 491 } 492 493 // check to see if any cancellation request has been made. 494 nHandles -= unregisterHandles(); 495 } catch (ClosedSelectorException cse) { 496 // If the selector has been closed, we can exit the loop 497 ExceptionMonitor.getInstance().exceptionCaught(cse); 498 break; 499 } catch (Exception e) { 500 ExceptionMonitor.getInstance().exceptionCaught(e); 501 502 try { 503 Thread.sleep(1000); 504 } catch (InterruptedException e1) { 505 ExceptionMonitor.getInstance().exceptionCaught(e1); 506 } 507 } 508 } 509 510 // Cleanup all the processors, and shutdown the acceptor. 511 if (selectable && isDisposing()) { 512 selectable = false; 513 try { 514 if (createdProcessor) { 515 processor.dispose(); 516 } 517 } finally { 518 try { 519 synchronized (disposalLock) { 520 if (isDisposing()) { 521 destroy(); 522 } 523 } 524 } catch (Exception e) { 525 ExceptionMonitor.getInstance().exceptionCaught(e); 526 } finally { 527 disposalFuture.setDone(); 528 } 529 } 530 } 531 } 532 533 /** 534 * This method will process new sessions for the Worker class. All 535 * keys that have had their status updates as per the Selector.selectedKeys() 536 * method will be processed here. Only keys that are ready to accept 537 * connections are handled here. 538 * <p/> 539 * Session objects are created by making new instances of SocketSessionImpl 540 * and passing the session object to the SocketIoProcessor class. 541 */ 542 @SuppressWarnings("unchecked") 543 private void processHandles(Iterator<H> handles) throws Exception { 544 while (handles.hasNext()) { 545 H handle = handles.next(); 546 handles.remove(); 547 548 // Associates a new created connection to a processor, 549 // and get back a session 550 S session = accept(processor, handle); 551 552 if (session == null) { 553 continue; 554 } 555 556 initSession(session, null, null); 557 558 // add the session to the SocketIoProcessor 559 session.getProcessor().add(session); 560 } 561 } 562 563 /** 564 * Sets up the socket communications. Sets items such as: 565 * <p/> 566 * Blocking 567 * Reuse address 568 * Receive buffer size 569 * Bind to listen port 570 * Registers OP_ACCEPT for selector 571 */ 572 private int registerHandles() { 573 for (;;) { 574 // The register queue contains the list of services to manage 575 // in this acceptor. 576 AcceptorOperationFuture future = registerQueue.poll(); 577 578 if (future == null) { 579 return 0; 580 } 581 582 // We create a temporary map to store the bound handles, 583 // as we may have to remove them all if there is an exception 584 // during the sockets opening. 585 Map<SocketAddress, H> newHandles = new ConcurrentHashMap<>(); 586 List<SocketAddress> localAddresses = future.getLocalAddresses(); 587 588 try { 589 // Process all the addresses 590 for (SocketAddress a : localAddresses) { 591 H handle = open(a); 592 newHandles.put(localAddress(handle), handle); 593 } 594 595 // Everything went ok, we can now update the map storing 596 // all the bound sockets. 597 boundHandles.putAll(newHandles); 598 599 // and notify. 600 future.setDone(); 601 602 return newHandles.size(); 603 } catch (Exception e) { 604 // We store the exception in the future 605 future.setException(e); 606 } finally { 607 // Roll back if failed to bind all addresses. 608 if (future.getException() != null) { 609 for (H handle : newHandles.values()) { 610 try { 611 close(handle); 612 } catch (Exception e) { 613 ExceptionMonitor.getInstance().exceptionCaught(e); 614 } 615 } 616 617 // Wake up the selector to be sure we will process the newly bound handle 618 // and not block forever in the select() 619 wakeup(); 620 } 621 } 622 } 623 } 624 625 /** 626 * This method just checks to see if anything has been placed into the 627 * cancellation queue. The only thing that should be in the cancelQueue 628 * is CancellationRequest objects and the only place this happens is in 629 * the doUnbind() method. 630 */ 631 private int unregisterHandles() { 632 int cancelledHandles = 0; 633 for (;;) { 634 AcceptorOperationFuture future = cancelQueue.poll(); 635 if (future == null) { 636 break; 637 } 638 639 // close the channels 640 for (SocketAddress a : future.getLocalAddresses()) { 641 H handle = boundHandles.remove(a); 642 643 if (handle == null) { 644 continue; 645 } 646 647 try { 648 close(handle); 649 wakeup(); // wake up again to trigger thread death 650 } catch (Exception e) { 651 ExceptionMonitor.getInstance().exceptionCaught(e); 652 } finally { 653 cancelledHandles++; 654 } 655 } 656 657 future.setDone(); 658 } 659 660 return cancelledHandles; 661 } 662 } 663 664 /** 665 * {@inheritDoc} 666 */ 667 @Override 668 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { 669 throw new UnsupportedOperationException(); 670 } 671 672 /** 673 * @return the backLog 674 */ 675 public int getBacklog() { 676 return backlog; 677 } 678 679 /** 680 * Sets the Backlog parameter 681 * 682 * @param backlog 683 * the backlog variable 684 */ 685 public void setBacklog(int backlog) { 686 synchronized (bindLock) { 687 if (isActive()) { 688 throw new IllegalStateException("backlog can't be set while the acceptor is bound."); 689 } 690 691 this.backlog = backlog; 692 } 693 } 694 695 /** 696 * @return the flag that sets the reuseAddress information 697 */ 698 public boolean isReuseAddress() { 699 return reuseAddress; 700 } 701 702 /** 703 * Set the Reuse Address flag 704 * 705 * @param reuseAddress 706 * The flag to set 707 */ 708 public void setReuseAddress(boolean reuseAddress) { 709 synchronized (bindLock) { 710 if (isActive()) { 711 throw new IllegalStateException("backlog can't be set while the acceptor is bound."); 712 } 713 714 this.reuseAddress = reuseAddress; 715 } 716 } 717 718 /** 719 * {@inheritDoc} 720 */ 721 @Override 722 public SocketSessionConfig getSessionConfig() { 723 return (SocketSessionConfig)sessionConfig; 724 } 725 }