001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.polling; 021 022import java.net.ConnectException; 023import java.net.SocketAddress; 024import java.nio.channels.ClosedSelectorException; 025import java.util.Iterator; 026import java.util.Queue; 027import java.util.concurrent.ConcurrentLinkedQueue; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Executors; 030import java.util.concurrent.atomic.AtomicReference; 031 032import org.apache.mina.core.RuntimeIoException; 033import org.apache.mina.core.filterchain.IoFilter; 034import org.apache.mina.core.future.ConnectFuture; 035import org.apache.mina.core.future.DefaultConnectFuture; 036import org.apache.mina.core.service.AbstractIoConnector; 037import org.apache.mina.core.service.AbstractIoService; 038import org.apache.mina.core.service.IoConnector; 039import org.apache.mina.core.service.IoHandler; 040import org.apache.mina.core.service.IoProcessor; 041import org.apache.mina.core.service.SimpleIoProcessorPool; 042import org.apache.mina.core.session.AbstractIoSession; 043import org.apache.mina.core.session.IoSession; 044import org.apache.mina.core.session.IoSessionConfig; 045import org.apache.mina.core.session.IoSessionInitializer; 046import org.apache.mina.transport.socket.nio.NioSocketConnector; 047import org.apache.mina.util.ExceptionMonitor; 048 049/** 050 * A base class for implementing client transport using a polling strategy. The 051 * underlying sockets will be checked in an active loop and woke up when an 052 * socket needed to be processed. This class handle the logic behind binding, 053 * connecting and disposing the client sockets. A {@link Executor} will be used 054 * for running client connection, and an {@link AbstractPollingIoProcessor} will 055 * be used for processing connected client I/O operations like reading, writing 056 * and closing. 057 * 058 * All the low level methods for binding, connecting, closing need to be 059 * provided by the subclassing implementation. 060 * 061 * @see NioSocketConnector for a example of implementation 062 * 063 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 064 */ 065public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector { 066 067 private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>(); 068 069 private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>(); 070 071 private final IoProcessor<T> processor; 072 073 private final boolean createdProcessor; 074 075 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); 076 077 private volatile boolean selectable; 078 079 /** The connector thread */ 080 private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>(); 081 082 /** 083 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a 084 * default session configuration, a class of {@link IoProcessor} which will 085 * be instantiated in a {@link SimpleIoProcessorPool} for better scaling in 086 * multiprocessor systems. The default pool size will be used. 087 * 088 * @see SimpleIoProcessorPool 089 * 090 * @param sessionConfig 091 * the default configuration for the managed {@link IoSession} 092 * @param processorClass 093 * a {@link Class} of {@link IoProcessor} for the associated 094 * {@link IoSession} type. 095 */ 096 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) { 097 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true); 098 } 099 100 /** 101 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a 102 * default session configuration, a class of {@link IoProcessor} which will 103 * be instantiated in a {@link SimpleIoProcessorPool} for using multiple 104 * thread for better scaling in multiprocessor systems. 105 * 106 * @see SimpleIoProcessorPool 107 * 108 * @param sessionConfig 109 * the default configuration for the managed {@link IoSession} 110 * @param processorClass 111 * a {@link Class} of {@link IoProcessor} for the associated 112 * {@link IoSession} type. 113 * @param processorCount 114 * the amount of processor to instantiate for the pool 115 */ 116 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, 117 int processorCount) { 118 this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true); 119 } 120 121 /** 122 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a 123 * default session configuration, a default {@link Executor} will be created 124 * using {@link Executors#newCachedThreadPool()}. 125 * 126 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) 127 * 128 * @param sessionConfig 129 * the default configuration for the managed {@link IoSession} 130 * @param processor 131 * the {@link IoProcessor} for processing the {@link IoSession} 132 * of this transport, triggering events to the bound 133 * {@link IoHandler} and processing the chains of 134 * {@link IoFilter} 135 */ 136 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) { 137 this(sessionConfig, null, processor, false); 138 } 139 140 /** 141 * Constructor for {@link AbstractPollingIoConnector}. You need to provide a 142 * default session configuration and an {@link Executor} for handling I/O 143 * events. If null {@link Executor} is provided, a default one will be 144 * created using {@link Executors#newCachedThreadPool()}. 145 * 146 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) 147 * 148 * @param sessionConfig 149 * the default configuration for the managed {@link IoSession} 150 * @param executor 151 * the {@link Executor} used for handling asynchronous execution 152 * of I/O events. Can be <code>null</code>. 153 * @param processor 154 * the {@link IoProcessor} for processing the {@link IoSession} 155 * of this transport, triggering events to the bound 156 * {@link IoHandler} and processing the chains of 157 * {@link IoFilter} 158 */ 159 protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) { 160 this(sessionConfig, executor, processor, false); 161 } 162 163 /** 164 * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a 165 * default session configuration and an {@link Executor} for handling I/O 166 * events. If null {@link Executor} is provided, a default one will be 167 * created using {@link Executors#newCachedThreadPool()}. 168 * 169 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) 170 * 171 * @param sessionConfig 172 * the default configuration for the managed {@link IoSession} 173 * @param executor 174 * the {@link Executor} used for handling asynchronous execution 175 * of I/O events. Can be <code>null</code>. 176 * @param processor 177 * the {@link IoProcessor} for processing the {@link IoSession} 178 * of this transport, triggering events to the bound 179 * {@link IoHandler} and processing the chains of 180 * {@link IoFilter} 181 * @param createdProcessor 182 * tagging the processor as automatically created, so it will be 183 * automatically disposed 184 */ 185 private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, 186 boolean createdProcessor) { 187 super(sessionConfig, executor); 188 189 if (processor == null) { 190 throw new IllegalArgumentException("processor"); 191 } 192 193 this.processor = processor; 194 this.createdProcessor = createdProcessor; 195 196 try { 197 init(); 198 selectable = true; 199 } catch (RuntimeException e) { 200 throw e; 201 } catch (Exception e) { 202 throw new RuntimeIoException("Failed to initialize.", e); 203 } finally { 204 if (!selectable) { 205 try { 206 destroy(); 207 } catch (Exception e) { 208 ExceptionMonitor.getInstance().exceptionCaught(e); 209 } 210 } 211 } 212 } 213 214 /** 215 * Initialize the polling system, will be called at construction time. 216 * 217 * @throws Exception 218 * any exception thrown by the underlying system calls 219 */ 220 protected abstract void init() throws Exception; 221 222 /** 223 * Destroy the polling system, will be called when this {@link IoConnector} 224 * implementation will be disposed. 225 * 226 * @throws Exception 227 * any exception thrown by the underlying systems calls 228 */ 229 protected abstract void destroy() throws Exception; 230 231 /** 232 * Create a new client socket handle from a local {@link SocketAddress} 233 * 234 * @param localAddress 235 * the socket address for binding the new client socket 236 * @return a new client socket handle 237 * @throws Exception 238 * any exception thrown by the underlying systems calls 239 */ 240 protected abstract H newHandle(SocketAddress localAddress) throws Exception; 241 242 /** 243 * Connect a newly created client socket handle to a remote 244 * {@link SocketAddress}. This operation is non-blocking, so at end of the 245 * call the socket can be still in connection process. 246 * 247 * @param handle the client socket handle 248 * @param remoteAddress the remote address where to connect 249 * @return <tt>true</tt> if a connection was established, <tt>false</tt> if 250 * this client socket is in non-blocking mode and the connection 251 * operation is in progress 252 * @throws Exception If the connect failed 253 */ 254 protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception; 255 256 /** 257 * Finish the connection process of a client socket after it was marked as 258 * ready to process by the {@link #select(int)} call. The socket will be 259 * connected or reported as connection failed. 260 * 261 * @param handle 262 * the client socket handle to finish to connect 263 * @return true if the socket is connected 264 * @throws Exception 265 * any exception thrown by the underlying systems calls 266 */ 267 protected abstract boolean finishConnect(H handle) throws Exception; 268 269 /** 270 * Create a new {@link IoSession} from a connected socket client handle. 271 * Will assign the created {@link IoSession} to the given 272 * {@link IoProcessor} for managing future I/O events. 273 * 274 * @param processor 275 * the processor in charge of this session 276 * @param handle 277 * the newly connected client socket handle 278 * @return a new {@link IoSession} 279 * @throws Exception 280 * any exception thrown by the underlying systems calls 281 */ 282 protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception; 283 284 /** 285 * Close a client socket. 286 * 287 * @param handle 288 * the client socket 289 * @throws Exception 290 * any exception thrown by the underlying systems calls 291 */ 292 protected abstract void close(H handle) throws Exception; 293 294 /** 295 * Interrupt the {@link #select(int)} method. Used when the poll set need to 296 * be modified. 297 */ 298 protected abstract void wakeup(); 299 300 /** 301 * Check for connected sockets, interrupt when at least a connection is 302 * processed (connected or failed to connect). All the client socket 303 * descriptors processed need to be returned by {@link #selectedHandles()} 304 * 305 * @param timeout The timeout for the select() method 306 * @return The number of socket having received some data 307 * @throws Exception any exception thrown by the underlying systems calls 308 */ 309 protected abstract int select(int timeout) throws Exception; 310 311 /** 312 * {@link Iterator} for the set of client sockets found connected or failed 313 * to connect during the last {@link #select(int)} call. 314 * 315 * @return the list of client socket handles to process 316 */ 317 protected abstract Iterator<H> selectedHandles(); 318 319 /** 320 * {@link Iterator} for all the client sockets polled for connection. 321 * 322 * @return the list of client sockets currently polled for connection 323 */ 324 protected abstract Iterator<H> allHandles(); 325 326 /** 327 * Register a new client socket for connection, add it to connection polling 328 * 329 * @param handle 330 * client socket handle 331 * @param request 332 * the associated {@link ConnectionRequest} 333 * @throws Exception 334 * any exception thrown by the underlying systems calls 335 */ 336 protected abstract void register(H handle, ConnectionRequest request) throws Exception; 337 338 /** 339 * get the {@link ConnectionRequest} for a given client socket handle 340 * 341 * @param handle 342 * the socket client handle 343 * @return the connection request if the socket is connecting otherwise 344 * <code>null</code> 345 */ 346 protected abstract ConnectionRequest getConnectionRequest(H handle); 347 348 /** 349 * {@inheritDoc} 350 */ 351 @Override 352 protected final void dispose0() throws Exception { 353 startupWorker(); 354 wakeup(); 355 } 356 357 /** 358 * {@inheritDoc} 359 */ 360 @Override 361 @SuppressWarnings("unchecked") 362 protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, 363 IoSessionInitializer<? extends ConnectFuture> sessionInitializer) { 364 H handle = null; 365 boolean success = false; 366 try { 367 handle = newHandle(localAddress); 368 if (connect(handle, remoteAddress)) { 369 ConnectFuture future = new DefaultConnectFuture(); 370 T session = newSession(processor, handle); 371 initSession(session, future, sessionInitializer); 372 // Forward the remaining process to the IoProcessor. 373 session.getProcessor().add(session); 374 success = true; 375 return future; 376 } 377 378 success = true; 379 } catch (Exception e) { 380 return DefaultConnectFuture.newFailedFuture(e); 381 } finally { 382 if (!success && handle != null) { 383 try { 384 close(handle); 385 } catch (Exception e) { 386 ExceptionMonitor.getInstance().exceptionCaught(e); 387 } 388 } 389 } 390 391 ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer); 392 connectQueue.add(request); 393 startupWorker(); 394 wakeup(); 395 396 return request; 397 } 398 399 private void startupWorker() { 400 if (!selectable) { 401 connectQueue.clear(); 402 cancelQueue.clear(); 403 } 404 405 Connector connector = connectorRef.get(); 406 407 if (connector == null) { 408 connector = new Connector(); 409 410 if (connectorRef.compareAndSet(null, connector)) { 411 executeWorker(connector); 412 } 413 } 414 } 415 416 private int registerNew() { 417 int nHandles = 0; 418 for (;;) { 419 ConnectionRequest req = connectQueue.poll(); 420 if (req == null) { 421 break; 422 } 423 424 H handle = req.handle; 425 try { 426 register(handle, req); 427 nHandles++; 428 } catch (Exception e) { 429 req.setException(e); 430 try { 431 close(handle); 432 } catch (Exception e2) { 433 ExceptionMonitor.getInstance().exceptionCaught(e2); 434 } 435 } 436 } 437 return nHandles; 438 } 439 440 private int cancelKeys() { 441 int nHandles = 0; 442 443 for (;;) { 444 ConnectionRequest req = cancelQueue.poll(); 445 446 if (req == null) { 447 break; 448 } 449 450 H handle = req.handle; 451 452 try { 453 close(handle); 454 } catch (Exception e) { 455 ExceptionMonitor.getInstance().exceptionCaught(e); 456 } finally { 457 nHandles++; 458 } 459 } 460 461 if (nHandles > 0) { 462 wakeup(); 463 } 464 465 return nHandles; 466 } 467 468 /** 469 * Process the incoming connections, creating a new session for each valid 470 * connection. 471 */ 472 private int processConnections(Iterator<H> handlers) { 473 int nHandles = 0; 474 475 // Loop on each connection request 476 while (handlers.hasNext()) { 477 H handle = handlers.next(); 478 handlers.remove(); 479 480 ConnectionRequest connectionRequest = getConnectionRequest(handle); 481 482 if (connectionRequest == null) { 483 continue; 484 } 485 486 boolean success = false; 487 try { 488 if (finishConnect(handle)) { 489 T session = newSession(processor, handle); 490 initSession(session, connectionRequest, connectionRequest.getSessionInitializer()); 491 // Forward the remaining process to the IoProcessor. 492 session.getProcessor().add(session); 493 nHandles++; 494 } 495 success = true; 496 } catch (Exception e) { 497 connectionRequest.setException(e); 498 } finally { 499 if (!success) { 500 // The connection failed, we have to cancel it. 501 cancelQueue.offer(connectionRequest); 502 } 503 } 504 } 505 return nHandles; 506 } 507 508 private void processTimedOutSessions(Iterator<H> handles) { 509 long currentTime = System.currentTimeMillis(); 510 511 while (handles.hasNext()) { 512 H handle = handles.next(); 513 ConnectionRequest connectionRequest = getConnectionRequest(handle); 514 515 if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) { 516 connectionRequest.setException(new ConnectException("Connection timed out.")); 517 cancelQueue.offer(connectionRequest); 518 } 519 } 520 } 521 522 private class Connector implements Runnable { 523 524 public void run() { 525 assert (connectorRef.get() == this); 526 527 int nHandles = 0; 528 529 while (selectable) { 530 try { 531 // the timeout for select shall be smaller of the connect 532 // timeout or 1 second... 533 int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L); 534 int selected = select(timeout); 535 536 nHandles += registerNew(); 537 538 // get a chance to get out of the connector loop, if we 539 // don't have any more handles 540 if (nHandles == 0) { 541 connectorRef.set(null); 542 543 if (connectQueue.isEmpty()) { 544 assert (connectorRef.get() != this); 545 break; 546 } 547 548 if (!connectorRef.compareAndSet(null, this)) { 549 assert (connectorRef.get() != this); 550 break; 551 } 552 553 assert (connectorRef.get() == this); 554 } 555 556 if (selected > 0) { 557 nHandles -= processConnections(selectedHandles()); 558 } 559 560 processTimedOutSessions(allHandles()); 561 562 nHandles -= cancelKeys(); 563 } catch (ClosedSelectorException cse) { 564 // If the selector has been closed, we can exit the loop 565 ExceptionMonitor.getInstance().exceptionCaught(cse); 566 break; 567 } catch (Exception e) { 568 ExceptionMonitor.getInstance().exceptionCaught(e); 569 570 try { 571 Thread.sleep(1000); 572 } catch (InterruptedException e1) { 573 ExceptionMonitor.getInstance().exceptionCaught(e1); 574 } 575 } 576 } 577 578 if (selectable && isDisposing()) { 579 selectable = false; 580 try { 581 if (createdProcessor) { 582 processor.dispose(); 583 } 584 } finally { 585 try { 586 synchronized (disposalLock) { 587 if (isDisposing()) { 588 destroy(); 589 } 590 } 591 } catch (Exception e) { 592 ExceptionMonitor.getInstance().exceptionCaught(e); 593 } finally { 594 disposalFuture.setDone(); 595 } 596 } 597 } 598 } 599 } 600 601 public final class ConnectionRequest extends DefaultConnectFuture { 602 /** The handle associated with this connection request */ 603 private final H handle; 604 605 /** The time up to this connection request will be valid */ 606 private final long deadline; 607 608 /** The callback to call when the session is initialized */ 609 private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer; 610 611 public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) { 612 this.handle = handle; 613 long timeout = getConnectTimeoutMillis(); 614 615 if (timeout <= 0L) { 616 this.deadline = Long.MAX_VALUE; 617 } else { 618 this.deadline = System.currentTimeMillis() + timeout; 619 } 620 621 this.sessionInitializer = callback; 622 } 623 624 public H getHandle() { 625 return handle; 626 } 627 628 public long getDeadline() { 629 return deadline; 630 } 631 632 public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() { 633 return sessionInitializer; 634 } 635 636 @Override 637 public boolean cancel() { 638 if (!isDone()) { 639 boolean justCancelled = super.cancel(); 640 641 // We haven't cancelled the request before, so add the future 642 // in the cancel queue. 643 if (justCancelled) { 644 cancelQueue.add(this); 645 startupWorker(); 646 wakeup(); 647 } 648 } 649 650 return true; 651 } 652 } 653}