001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.polling; 021 022import java.io.IOException; 023import java.net.PortUnreachableException; 024import java.nio.channels.ClosedSelectorException; 025import java.util.ArrayList; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Queue; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.Executor; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034import java.util.concurrent.atomic.AtomicReference; 035 036import org.apache.mina.core.buffer.IoBuffer; 037import org.apache.mina.core.file.FileRegion; 038import org.apache.mina.core.filterchain.IoFilterChain; 039import org.apache.mina.core.filterchain.IoFilterChainBuilder; 040import org.apache.mina.core.future.DefaultIoFuture; 041import org.apache.mina.core.service.AbstractIoService; 042import org.apache.mina.core.service.IoProcessor; 043import org.apache.mina.core.service.IoServiceListenerSupport; 044import org.apache.mina.core.session.AbstractIoSession; 045import org.apache.mina.core.session.IoSession; 046import org.apache.mina.core.session.IoSessionConfig; 047import org.apache.mina.core.session.SessionState; 048import org.apache.mina.core.write.WriteRequest; 049import org.apache.mina.core.write.WriteRequestQueue; 050import org.apache.mina.core.write.WriteToClosedSessionException; 051import org.apache.mina.transport.socket.AbstractDatagramSessionConfig; 052import org.apache.mina.util.ExceptionMonitor; 053import org.apache.mina.util.NamePreservingRunnable; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * An abstract implementation of {@link IoProcessor} which helps transport 059 * developers to write an {@link IoProcessor} easily. This class is in charge of 060 * active polling a set of {@link IoSession} and trigger events when some I/O 061 * operation is possible. 062 * 063 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 064 * 065 * @param <S> 066 * the type of the {@link IoSession} this processor can handle 067 */ 068public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> { 069 /** A logger for this class */ 070 private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class); 071 072 /** 073 * A timeout used for the select, as we need to get out to deal with idle 074 * sessions 075 */ 076 private static final long SELECT_TIMEOUT = 1000L; 077 078 /** A map containing the last Thread ID for each class */ 079 private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>(); 080 081 /** This IoProcessor instance name */ 082 private final String threadName; 083 084 /** The executor to use when we need to start the inner Processor */ 085 private final Executor executor; 086 087 /** A Session queue containing the newly created sessions */ 088 private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>(); 089 090 /** A queue used to store the sessions to be removed */ 091 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>(); 092 093 /** A queue used to store the sessions to be flushed */ 094 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>(); 095 096 /** 097 * A queue used to store the sessions which have a trafficControl to be 098 * updated 099 */ 100 private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>(); 101 102 /** The processor thread : it handles the incoming messages */ 103 private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>(); 104 105 private long lastIdleCheckTime; 106 107 private final Object disposalLock = new Object(); 108 109 private volatile boolean disposing; 110 111 private volatile boolean disposed; 112 113 private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null); 114 115 protected AtomicBoolean wakeupCalled = new AtomicBoolean(false); 116 117 /** 118 * Create an {@link AbstractPollingIoProcessor} with the given 119 * {@link Executor} for handling I/Os events. 120 * 121 * @param executor 122 * the {@link Executor} for handling I/O events 123 */ 124 protected AbstractPollingIoProcessor(Executor executor) { 125 if (executor == null) { 126 throw new IllegalArgumentException("executor"); 127 } 128 129 this.threadName = nextThreadName(); 130 this.executor = executor; 131 } 132 133 /** 134 * Compute the thread ID for this class instance. As we may have different 135 * classes, we store the last ID number into a Map associating the class 136 * name to the last assigned ID. 137 * 138 * @return a name for the current thread, based on the class name and an 139 * incremental value, starting at 1. 140 */ 141 private String nextThreadName() { 142 Class<?> cls = getClass(); 143 int newThreadId; 144 145 AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1)); 146 147 if (threadId == null) { 148 newThreadId = 1; 149 } else { 150 // Just increment the last ID, and get it. 151 newThreadId = threadId.incrementAndGet(); 152 } 153 154 // Now we can compute the name for this thread 155 return cls.getSimpleName() + '-' + newThreadId; 156 } 157 158 /** 159 * {@inheritDoc} 160 */ 161 public final boolean isDisposing() { 162 return disposing; 163 } 164 165 /** 166 * {@inheritDoc} 167 */ 168 public final boolean isDisposed() { 169 return disposed; 170 } 171 172 /** 173 * {@inheritDoc} 174 */ 175 public final void dispose() { 176 if (disposed || disposing) { 177 return; 178 } 179 180 synchronized (disposalLock) { 181 disposing = true; 182 startupProcessor(); 183 } 184 185 disposalFuture.awaitUninterruptibly(); 186 disposed = true; 187 } 188 189 /** 190 * Dispose the resources used by this {@link IoProcessor} for polling the 191 * client connections. The implementing class doDispose method will be 192 * called. 193 * 194 * @throws Exception 195 * if some low level IO error occurs 196 */ 197 protected abstract void doDispose() throws Exception; 198 199 /** 200 * poll those sessions for the given timeout 201 * 202 * @param timeout 203 * milliseconds before the call timeout if no event appear 204 * @return The number of session ready for read or for write 205 * @throws Exception 206 * if some low level IO error occurs 207 */ 208 protected abstract int select(long timeout) throws Exception; 209 210 /** 211 * poll those sessions forever 212 * 213 * @return The number of session ready for read or for write 214 * @throws Exception 215 * if some low level IO error occurs 216 */ 217 protected abstract int select() throws Exception; 218 219 /** 220 * Say if the list of {@link IoSession} polled by this {@link IoProcessor} 221 * is empty 222 * 223 * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor} 224 */ 225 protected abstract boolean isSelectorEmpty(); 226 227 /** 228 * Interrupt the {@link #select(long)} call. 229 */ 230 protected abstract void wakeup(); 231 232 /** 233 * Get an {@link Iterator} for the list of {@link IoSession} polled by this 234 * {@link IoProcessor} 235 * 236 * @return {@link Iterator} of {@link IoSession} 237 */ 238 protected abstract Iterator<S> allSessions(); 239 240 /** 241 * Get an {@link Iterator} for the list of {@link IoSession} found selected 242 * by the last call of {@link #select(long)} 243 * 244 * @return {@link Iterator} of {@link IoSession} read for I/Os operation 245 */ 246 protected abstract Iterator<S> selectedSessions(); 247 248 /** 249 * Get the state of a session (One of OPENING, OPEN, CLOSING) 250 * 251 * @param session the {@link IoSession} to inspect 252 * @return the state of the session 253 */ 254 protected abstract SessionState getState(S session); 255 256 /** 257 * Tells if the session ready for writing 258 * 259 * @param session the queried session 260 * @return <tt>true</tt> is ready, <tt>false</tt> if not ready 261 */ 262 protected abstract boolean isWritable(S session); 263 264 /** 265 * Tells if the session ready for reading 266 * 267 * @param session the queried session 268 * @return <tt>true</tt> is ready, <tt>false</tt> if not ready 269 */ 270 protected abstract boolean isReadable(S session); 271 272 /** 273 * Set the session to be informed when a write event should be processed 274 * 275 * @param session the session for which we want to be interested in write events 276 * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing 277 * @throws Exception If there was a problem while registering the session 278 */ 279 protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception; 280 281 /** 282 * Set the session to be informed when a read event should be processed 283 * 284 * @param session the session for which we want to be interested in read events 285 * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing 286 * @throws Exception If there was a problem while registering the session 287 */ 288 protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception; 289 290 /** 291 * Tells if this session is registered for reading 292 * 293 * @param session the queried session 294 * @return <tt>true</tt> is registered for reading 295 */ 296 protected abstract boolean isInterestedInRead(S session); 297 298 /** 299 * Tells if this session is registered for writing 300 * 301 * @param session the queried session 302 * @return <tt>true</tt> is registered for writing 303 */ 304 protected abstract boolean isInterestedInWrite(S session); 305 306 /** 307 * Initialize the polling of a session. Add it to the polling process. 308 * 309 * @param session the {@link IoSession} to add to the polling 310 * @throws Exception any exception thrown by the underlying system calls 311 */ 312 protected abstract void init(S session) throws Exception; 313 314 /** 315 * Destroy the underlying client socket handle 316 * 317 * @param session the {@link IoSession} 318 * @throws Exception any exception thrown by the underlying system calls 319 */ 320 protected abstract void destroy(S session) throws Exception; 321 322 /** 323 * Reads a sequence of bytes from a {@link IoSession} into the given 324 * {@link IoBuffer}. Is called when the session was found ready for reading. 325 * 326 * @param session the session to read 327 * @param buf the buffer to fill 328 * @return the number of bytes read 329 * @throws Exception any exception thrown by the underlying system calls 330 */ 331 protected abstract int read(S session, IoBuffer buf) throws Exception; 332 333 /** 334 * Write a sequence of bytes to a {@link IoSession}, means to be called when 335 * a session was found ready for writing. 336 * 337 * @param session the session to write 338 * @param buf the buffer to write 339 * @param length the number of bytes to write can be superior to the number of 340 * bytes remaining in the buffer 341 * @return the number of byte written 342 * @throws Exception any exception thrown by the underlying system calls 343 */ 344 protected abstract int write(S session, IoBuffer buf, int length) throws Exception; 345 346 /** 347 * Write a part of a file to a {@link IoSession}, if the underlying API 348 * isn't supporting system calls like sendfile(), you can throw a 349 * {@link UnsupportedOperationException} so the file will be send using 350 * usual {@link #write(AbstractIoSession, IoBuffer, int)} call. 351 * 352 * @param session the session to write 353 * @param region the file region to write 354 * @param length the length of the portion to send 355 * @return the number of written bytes 356 * @throws Exception any exception thrown by the underlying system calls 357 */ 358 protected abstract int transferFile(S session, FileRegion region, int length) throws Exception; 359 360 /** 361 * {@inheritDoc} 362 */ 363 public final void add(S session) { 364 if (disposed || disposing) { 365 throw new IllegalStateException("Already disposed."); 366 } 367 368 // Adds the session to the newSession queue and starts the worker 369 newSessions.add(session); 370 startupProcessor(); 371 } 372 373 /** 374 * {@inheritDoc} 375 */ 376 public final void remove(S session) { 377 scheduleRemove(session); 378 startupProcessor(); 379 } 380 381 private void scheduleRemove(S session) { 382 if (!removingSessions.contains(session)) { 383 removingSessions.add(session); 384 } 385 } 386 387 /** 388 * {@inheritDoc} 389 */ 390 public void write(S session, WriteRequest writeRequest) { 391 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); 392 393 writeRequestQueue.offer(session, writeRequest); 394 395 if (!session.isWriteSuspended()) { 396 this.flush(session); 397 } 398 } 399 400 /** 401 * {@inheritDoc} 402 */ 403 public final void flush(S session) { 404 // add the session to the queue if it's not already 405 // in the queue, then wake up the select() 406 if (session.setScheduledForFlush(true)) { 407 flushingSessions.add(session); 408 wakeup(); 409 } 410 } 411 412 private void scheduleFlush(S session) { 413 // add the session to the queue if it's not already 414 // in the queue 415 if (session.setScheduledForFlush(true)) { 416 flushingSessions.add(session); 417 } 418 } 419 420 /** 421 * Updates the traffic mask for a given session 422 * 423 * @param session the session to update 424 */ 425 public final void updateTrafficMask(S session) { 426 trafficControllingSessions.add(session); 427 wakeup(); 428 } 429 430 /** 431 * Starts the inner Processor, asking the executor to pick a thread in its 432 * pool. The Runnable will be renamed 433 */ 434 private void startupProcessor() { 435 Processor processor = processorRef.get(); 436 437 if (processor == null) { 438 processor = new Processor(); 439 440 if (processorRef.compareAndSet(null, processor)) { 441 executor.execute(new NamePreservingRunnable(processor, threadName)); 442 } 443 } 444 445 // Just stop the select() and start it again, so that the processor 446 // can be activated immediately. 447 wakeup(); 448 } 449 450 /** 451 * In the case we are using the java select() method, this method is used to 452 * trash the buggy selector and create a new one, registring all the sockets 453 * on it. 454 * 455 * @throws IOException If we got an exception 456 */ 457 abstract protected void registerNewSelector() throws IOException; 458 459 /** 460 * Check that the select() has not exited immediately just because of a 461 * broken connection. In this case, this is a standard case, and we just 462 * have to loop. 463 * 464 * @return <tt>true</tt> if a connection has been brutally closed. 465 * @throws IOException If we got an exception 466 */ 467 abstract protected boolean isBrokenConnection() throws IOException; 468 469 /** 470 * Loops over the new sessions blocking queue and returns the number of 471 * sessions which are effectively created 472 * 473 * @return The number of new sessions 474 */ 475 private int handleNewSessions() { 476 int addedSessions = 0; 477 478 for (S session = newSessions.poll(); session != null; session = newSessions.poll()) { 479 if (addNow(session)) { 480 // A new session has been created 481 addedSessions++; 482 } 483 } 484 485 return addedSessions; 486 } 487 488 /** 489 * Process a new session : - initialize it - create its chain - fire the 490 * CREATED listeners if any 491 * 492 * @param session The session to create 493 * @return <tt>true</tt> if the session has been registered 494 */ 495 private boolean addNow(S session) { 496 boolean registered = false; 497 498 try { 499 init(session); 500 registered = true; 501 502 // Build the filter chain of this session. 503 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); 504 chainBuilder.buildFilterChain(session.getFilterChain()); 505 506 // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here 507 // in AbstractIoFilterChain.fireSessionOpened(). 508 // Propagate the SESSION_CREATED event up to the chain 509 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); 510 listeners.fireSessionCreated(session); 511 } catch (Exception e) { 512 ExceptionMonitor.getInstance().exceptionCaught(e); 513 514 try { 515 destroy(session); 516 } catch (Exception e1) { 517 ExceptionMonitor.getInstance().exceptionCaught(e1); 518 } finally { 519 registered = false; 520 } 521 } 522 523 return registered; 524 } 525 526 private int removeSessions() { 527 int removedSessions = 0; 528 529 for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) { 530 SessionState state = getState(session); 531 532 // Now deal with the removal accordingly to the session's state 533 switch (state) { 534 case OPENED: 535 // Try to remove this session 536 if (removeNow(session)) { 537 removedSessions++; 538 } 539 540 break; 541 542 case CLOSING: 543 // Skip if channel is already closed 544 // In any case, remove the session from the queue 545 removedSessions++; 546 break; 547 548 case OPENING: 549 // Remove session from the newSessions queue and 550 // remove it 551 newSessions.remove(session); 552 553 if (removeNow(session)) { 554 removedSessions++; 555 } 556 557 break; 558 559 default: 560 throw new IllegalStateException(String.valueOf(state)); 561 } 562 } 563 564 return removedSessions; 565 } 566 567 private boolean removeNow(S session) { 568 clearWriteRequestQueue(session); 569 570 try { 571 destroy(session); 572 return true; 573 } catch (Exception e) { 574 IoFilterChain filterChain = session.getFilterChain(); 575 filterChain.fireExceptionCaught(e); 576 } finally { 577 try { 578 clearWriteRequestQueue(session); 579 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session); 580 } catch (Exception e) { 581 // The session was either destroyed or not at this point. 582 // We do not want any exception thrown from this "cleanup" code to change 583 // the return value by bubbling up. 584 IoFilterChain filterChain = session.getFilterChain(); 585 filterChain.fireExceptionCaught(e); 586 } 587 } 588 589 return false; 590 } 591 592 private void clearWriteRequestQueue(S session) { 593 WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); 594 WriteRequest req; 595 596 List<WriteRequest> failedRequests = new ArrayList<WriteRequest>(); 597 598 if ((req = writeRequestQueue.poll(session)) != null) { 599 Object message = req.getMessage(); 600 601 if (message instanceof IoBuffer) { 602 IoBuffer buf = (IoBuffer) message; 603 604 // The first unwritten empty buffer must be 605 // forwarded to the filter chain. 606 if (buf.hasRemaining()) { 607 buf.reset(); 608 failedRequests.add(req); 609 } else { 610 IoFilterChain filterChain = session.getFilterChain(); 611 filterChain.fireMessageSent(req); 612 } 613 } else { 614 failedRequests.add(req); 615 } 616 617 // Discard others. 618 while ((req = writeRequestQueue.poll(session)) != null) { 619 failedRequests.add(req); 620 } 621 } 622 623 // Create an exception and notify. 624 if (!failedRequests.isEmpty()) { 625 WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests); 626 627 for (WriteRequest r : failedRequests) { 628 session.decreaseScheduledBytesAndMessages(r); 629 r.getFuture().setException(cause); 630 } 631 632 IoFilterChain filterChain = session.getFilterChain(); 633 filterChain.fireExceptionCaught(cause); 634 } 635 } 636 637 private void process() throws Exception { 638 for (Iterator<S> i = selectedSessions(); i.hasNext();) { 639 S session = i.next(); 640 process(session); 641 i.remove(); 642 } 643 } 644 645 /** 646 * Deal with session ready for the read or write operations, or both. 647 */ 648 private void process(S session) { 649 // Process Reads 650 if (isReadable(session) && !session.isReadSuspended()) { 651 read(session); 652 } 653 654 // Process writes 655 if (isWritable(session) && !session.isWriteSuspended()) { 656 // add the session to the queue, if it's not already there 657 if (session.setScheduledForFlush(true)) { 658 flushingSessions.add(session); 659 } 660 } 661 } 662 663 private void read(S session) { 664 IoSessionConfig config = session.getConfig(); 665 int bufferSize = config.getReadBufferSize(); 666 IoBuffer buf = IoBuffer.allocate(bufferSize); 667 668 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); 669 670 try { 671 int readBytes = 0; 672 int ret; 673 674 try { 675 if (hasFragmentation) { 676 677 while ((ret = read(session, buf)) > 0) { 678 readBytes += ret; 679 680 if (!buf.hasRemaining()) { 681 break; 682 } 683 } 684 } else { 685 ret = read(session, buf); 686 687 if (ret > 0) { 688 readBytes = ret; 689 } 690 } 691 } finally { 692 buf.flip(); 693 } 694 695 if (readBytes > 0) { 696 IoFilterChain filterChain = session.getFilterChain(); 697 filterChain.fireMessageReceived(buf); 698 buf = null; 699 700 if (hasFragmentation) { 701 if (readBytes << 1 < config.getReadBufferSize()) { 702 session.decreaseReadBufferSize(); 703 } else if (readBytes == config.getReadBufferSize()) { 704 session.increaseReadBufferSize(); 705 } 706 } 707 } 708 709 if (ret < 0) { 710 // scheduleRemove(session); 711 IoFilterChain filterChain = session.getFilterChain(); 712 filterChain.fireInputClosed(); 713 } 714 } catch (Exception e) { 715 if (e instanceof IOException) { 716 if (!(e instanceof PortUnreachableException) 717 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) 718 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) { 719 scheduleRemove(session); 720 } 721 } 722 723 IoFilterChain filterChain = session.getFilterChain(); 724 filterChain.fireExceptionCaught(e); 725 } 726 } 727 728 private void notifyIdleSessions(long currentTime) throws Exception { 729 // process idle sessions 730 if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) { 731 lastIdleCheckTime = currentTime; 732 AbstractIoSession.notifyIdleness(allSessions(), currentTime); 733 } 734 } 735 736 /** 737 * Write all the pending messages 738 */ 739 private void flush(long currentTime) { 740 if (flushingSessions.isEmpty()) { 741 return; 742 } 743 744 do { 745 S session = flushingSessions.poll(); // the same one with 746 // firstSession 747 748 if (session == null) { 749 // Just in case ... It should not happen. 750 break; 751 } 752 753 // Reset the Schedule for flush flag for this session, 754 // as we are flushing it now 755 session.unscheduledForFlush(); 756 757 SessionState state = getState(session); 758 759 switch (state) { 760 case OPENED: 761 try { 762 boolean flushedAll = flushNow(session, currentTime); 763 764 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) 765 && !session.isScheduledForFlush()) { 766 scheduleFlush(session); 767 } 768 } catch (Exception e) { 769 scheduleRemove(session); 770 session.close(true); 771 IoFilterChain filterChain = session.getFilterChain(); 772 filterChain.fireExceptionCaught(e); 773 } 774 775 break; 776 777 case CLOSING: 778 // Skip if the channel is already closed. 779 break; 780 781 case OPENING: 782 // Retry later if session is not yet fully initialized. 783 // (In case that Session.write() is called before addSession() 784 // is processed) 785 scheduleFlush(session); 786 return; 787 788 default: 789 throw new IllegalStateException(String.valueOf(state)); 790 } 791 792 } while (!flushingSessions.isEmpty()); 793 } 794 795 private boolean flushNow(S session, long currentTime) { 796 if (!session.isConnected()) { 797 scheduleRemove(session); 798 return false; 799 } 800 801 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); 802 803 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); 804 805 // Set limitation for the number of written bytes for read-write 806 // fairness. I used maxReadBufferSize * 3 / 2, which yields best 807 // performance in my experience while not breaking fairness much. 808 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() 809 + (session.getConfig().getMaxReadBufferSize() >>> 1); 810 int writtenBytes = 0; 811 WriteRequest req = null; 812 813 try { 814 // Clear OP_WRITE 815 setInterestedInWrite(session, false); 816 817 do { 818 // Check for pending writes. 819 req = session.getCurrentWriteRequest(); 820 821 if (req == null) { 822 req = writeRequestQueue.poll(session); 823 824 if (req == null) { 825 break; 826 } 827 828 session.setCurrentWriteRequest(req); 829 } 830 831 int localWrittenBytes = 0; 832 Object message = req.getMessage(); 833 834 if (message instanceof IoBuffer) { 835 localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, 836 currentTime); 837 838 if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) { 839 // the buffer isn't empty, we re-interest it in writing 840 writtenBytes += localWrittenBytes; 841 setInterestedInWrite(session, true); 842 return false; 843 } 844 } else if (message instanceof FileRegion) { 845 localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, 846 currentTime); 847 848 // Fix for Java bug on Linux 849 // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988 850 // If there's still data to be written in the FileRegion, 851 // return 0 indicating that we need 852 // to pause until writing may resume. 853 if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) { 854 writtenBytes += localWrittenBytes; 855 setInterestedInWrite(session, true); 856 return false; 857 } 858 } else { 859 throw new IllegalStateException("Don't know how to handle message of type '" 860 + message.getClass().getName() + "'. Are you missing a protocol encoder?"); 861 } 862 863 if (localWrittenBytes == 0) { 864 // Kernel buffer is full. 865 setInterestedInWrite(session, true); 866 return false; 867 } 868 869 writtenBytes += localWrittenBytes; 870 871 if (writtenBytes >= maxWrittenBytes) { 872 // Wrote too much 873 scheduleFlush(session); 874 return false; 875 } 876 877 if (message instanceof IoBuffer) { 878 ((IoBuffer) message).free(); 879 } 880 } while (writtenBytes < maxWrittenBytes); 881 } catch (Exception e) { 882 if (req != null) { 883 req.getFuture().setException(e); 884 } 885 886 IoFilterChain filterChain = session.getFilterChain(); 887 filterChain.fireExceptionCaught(e); 888 return false; 889 } 890 891 return true; 892 } 893 894 private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) 895 throws Exception { 896 IoBuffer buf = (IoBuffer) req.getMessage(); 897 int localWrittenBytes = 0; 898 899 if (buf.hasRemaining()) { 900 int length; 901 902 if (hasFragmentation) { 903 length = Math.min(buf.remaining(), maxLength); 904 } else { 905 length = buf.remaining(); 906 } 907 908 try { 909 localWrittenBytes = write(session, buf, length); 910 } catch (IOException ioe) { 911 // We have had an issue while trying to send data to the 912 // peer : let's close the session. 913 buf.free(); 914 session.close(true); 915 destroy(session); 916 917 return 0; 918 } 919 920 } 921 922 session.increaseWrittenBytes(localWrittenBytes, currentTime); 923 924 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) { 925 // Buffer has been sent, clear the current request. 926 int pos = buf.position(); 927 buf.reset(); 928 929 fireMessageSent(session, req); 930 931 // And set it back to its position 932 buf.position(pos); 933 } 934 935 return localWrittenBytes; 936 } 937 938 private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) 939 throws Exception { 940 int localWrittenBytes; 941 FileRegion region = (FileRegion) req.getMessage(); 942 943 if (region.getRemainingBytes() > 0) { 944 int length; 945 946 if (hasFragmentation) { 947 length = (int) Math.min(region.getRemainingBytes(), maxLength); 948 } else { 949 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes()); 950 } 951 952 localWrittenBytes = transferFile(session, region, length); 953 region.update(localWrittenBytes); 954 } else { 955 localWrittenBytes = 0; 956 } 957 958 session.increaseWrittenBytes(localWrittenBytes, currentTime); 959 960 if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) { 961 fireMessageSent(session, req); 962 } 963 964 return localWrittenBytes; 965 } 966 967 private void fireMessageSent(S session, WriteRequest req) { 968 session.setCurrentWriteRequest(null); 969 IoFilterChain filterChain = session.getFilterChain(); 970 filterChain.fireMessageSent(req); 971 } 972 973 /** 974 * Update the trafficControl for all the session. 975 */ 976 private void updateTrafficMask() { 977 int queueSize = trafficControllingSessions.size(); 978 979 while (queueSize > 0) { 980 S session = trafficControllingSessions.poll(); 981 982 if (session == null) { 983 // We are done with this queue. 984 return; 985 } 986 987 SessionState state = getState(session); 988 989 switch (state) { 990 case OPENED: 991 updateTrafficControl(session); 992 993 break; 994 995 case CLOSING: 996 break; 997 998 case OPENING: 999 // Retry later if session is not yet fully initialized. 1000 // (In case that Session.suspend??() or session.resume??() is 1001 // called before addSession() is processed) 1002 // We just put back the session at the end of the queue. 1003 trafficControllingSessions.add(session); 1004 break; 1005 1006 default: 1007 throw new IllegalStateException(String.valueOf(state)); 1008 } 1009 1010 // As we have handled one session, decrement the number of 1011 // remaining sessions. The OPENING session will be processed 1012 // with the next select(), as the queue size has been decreased, 1013 // even 1014 // if the session has been pushed at the end of the queue 1015 queueSize--; 1016 } 1017 } 1018 1019 /** 1020 * {@inheritDoc} 1021 */ 1022 public void updateTrafficControl(S session) { 1023 // 1024 try { 1025 setInterestedInRead(session, !session.isReadSuspended()); 1026 } catch (Exception e) { 1027 IoFilterChain filterChain = session.getFilterChain(); 1028 filterChain.fireExceptionCaught(e); 1029 } 1030 1031 try { 1032 setInterestedInWrite(session, 1033 !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended()); 1034 } catch (Exception e) { 1035 IoFilterChain filterChain = session.getFilterChain(); 1036 filterChain.fireExceptionCaught(e); 1037 } 1038 } 1039 1040 /** 1041 * The main loop. This is the place in charge to poll the Selector, and to 1042 * process the active sessions. It's done in - handle the newly created 1043 * sessions - 1044 */ 1045 private class Processor implements Runnable { 1046 public void run() { 1047 assert (processorRef.get() == this); 1048 1049 int nSessions = 0; 1050 lastIdleCheckTime = System.currentTimeMillis(); 1051 1052 for (;;) { 1053 try { 1054 // This select has a timeout so that we can manage 1055 // idle session when we get out of the select every 1056 // second. (note : this is a hack to avoid creating 1057 // a dedicated thread). 1058 long t0 = System.currentTimeMillis(); 1059 int selected = select(SELECT_TIMEOUT); 1060 long t1 = System.currentTimeMillis(); 1061 long delta = (t1 - t0); 1062 1063 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { 1064 // Last chance : the select() may have been 1065 // interrupted because we have had an closed channel. 1066 if (isBrokenConnection()) { 1067 LOG.warn("Broken connection"); 1068 } else { 1069 LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0)); 1070 // Ok, we are hit by the nasty epoll 1071 // spinning. 1072 // Basically, there is a race condition 1073 // which causes a closing file descriptor not to be 1074 // considered as available as a selected channel, 1075 // but 1076 // it stopped the select. The next time we will 1077 // call select(), it will exit immediately for the 1078 // same 1079 // reason, and do so forever, consuming 100% 1080 // CPU. 1081 // We have to destroy the selector, and 1082 // register all the socket on a new one. 1083 registerNewSelector(); 1084 } 1085 } 1086 1087 // Manage newly created session first 1088 nSessions += handleNewSessions(); 1089 1090 updateTrafficMask(); 1091 1092 // Now, if we have had some incoming or outgoing events, 1093 // deal with them 1094 if (selected > 0) { 1095 // LOG.debug("Processing ..."); // This log hurts one of 1096 // the MDCFilter test... 1097 process(); 1098 } 1099 1100 // Write the pending requests 1101 long currentTime = System.currentTimeMillis(); 1102 flush(currentTime); 1103 1104 // And manage removed sessions 1105 nSessions -= removeSessions(); 1106 1107 // Last, not least, send Idle events to the idle sessions 1108 notifyIdleSessions(currentTime); 1109 1110 // Get a chance to exit the infinite loop if there are no 1111 // more sessions on this Processor 1112 if (nSessions == 0) { 1113 processorRef.set(null); 1114 1115 if (newSessions.isEmpty() && isSelectorEmpty()) { 1116 // newSessions.add() precedes startupProcessor 1117 assert (processorRef.get() != this); 1118 break; 1119 } 1120 1121 assert (processorRef.get() != this); 1122 1123 if (!processorRef.compareAndSet(null, this)) { 1124 // startupProcessor won race, so must exit processor 1125 assert (processorRef.get() != this); 1126 break; 1127 } 1128 1129 assert (processorRef.get() == this); 1130 } 1131 1132 // Disconnect all sessions immediately if disposal has been 1133 // requested so that we exit this loop eventually. 1134 if (isDisposing()) { 1135 boolean hasKeys = false; 1136 1137 for (Iterator<S> i = allSessions(); i.hasNext();) { 1138 IoSession session = i.next(); 1139 1140 if (session.isActive()) { 1141 scheduleRemove((S)session); 1142 hasKeys = true; 1143 } 1144 } 1145 1146 if (hasKeys) { 1147 wakeup(); 1148 } 1149 } 1150 } catch (ClosedSelectorException cse) { 1151 // If the selector has been closed, we can exit the loop 1152 // But first, dump a stack trace 1153 ExceptionMonitor.getInstance().exceptionCaught(cse); 1154 break; 1155 } catch (Exception e) { 1156 ExceptionMonitor.getInstance().exceptionCaught(e); 1157 1158 try { 1159 Thread.sleep(1000); 1160 } catch (InterruptedException e1) { 1161 ExceptionMonitor.getInstance().exceptionCaught(e1); 1162 } 1163 } 1164 } 1165 1166 try { 1167 synchronized (disposalLock) { 1168 if (disposing) { 1169 doDispose(); 1170 } 1171 } 1172 } catch (Exception e) { 1173 ExceptionMonitor.getInstance().exceptionCaught(e); 1174 } finally { 1175 disposalFuture.setValue(true); 1176 } 1177 } 1178 } 1179}