001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.transport.socket.nio; 021 022import java.io.IOException; 023import java.net.Inet4Address; 024import java.net.Inet6Address; 025import java.net.InetAddress; 026import java.net.InetSocketAddress; 027import java.net.SocketAddress; 028import java.nio.channels.ClosedSelectorException; 029import java.nio.channels.DatagramChannel; 030import java.nio.channels.SelectionKey; 031import java.nio.channels.Selector; 032import java.util.Collections; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.Iterator; 036import java.util.List; 037import java.util.Map; 038import java.util.Queue; 039import java.util.Set; 040import java.util.concurrent.ConcurrentLinkedQueue; 041import java.util.concurrent.Executor; 042import java.util.concurrent.Semaphore; 043 044import org.apache.mina.core.RuntimeIoException; 045import org.apache.mina.core.buffer.IoBuffer; 046import org.apache.mina.core.service.AbstractIoAcceptor; 047import org.apache.mina.core.service.IoAcceptor; 048import org.apache.mina.core.service.IoProcessor; 049import org.apache.mina.core.service.TransportMetadata; 050import org.apache.mina.core.session.AbstractIoSession; 051import org.apache.mina.core.session.ExpiringSessionRecycler; 052import org.apache.mina.core.session.IoSession; 053import org.apache.mina.core.session.IoSessionConfig; 054import org.apache.mina.core.session.IoSessionRecycler; 055import org.apache.mina.core.write.WriteRequest; 056import org.apache.mina.core.write.WriteRequestQueue; 057import org.apache.mina.transport.socket.DatagramAcceptor; 058import org.apache.mina.transport.socket.DatagramSessionConfig; 059import org.apache.mina.transport.socket.DefaultDatagramSessionConfig; 060import org.apache.mina.util.ExceptionMonitor; 061 062/** 063 * {@link IoAcceptor} for datagram transport (UDP/IP). 064 * 065 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 066 * @org.apache.xbean.XBean 067 */ 068public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> { 069 /** 070 * A session recycler that is used to retrieve an existing session, unless it's too old. 071 **/ 072 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler(); 073 074 /** 075 * A timeout used for the select, as we need to get out to deal with idle 076 * sessions 077 */ 078 private static final long SELECT_TIMEOUT = 1000L; 079 080 /** A lock used to protect the selector to be waked up before it's created */ 081 private final Semaphore lock = new Semaphore(1); 082 083 /** A queue used to store the list of pending Binds */ 084 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); 085 086 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); 087 088 private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<NioSession>(); 089 090 private final Map<SocketAddress, DatagramChannel> boundHandles = Collections 091 .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>()); 092 093 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER; 094 095 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); 096 097 private volatile boolean selectable; 098 099 /** The thread responsible of accepting incoming requests */ 100 private Acceptor acceptor; 101 102 private long lastIdleCheckTime; 103 104 /** The Selector used by this acceptor */ 105 private volatile Selector selector; 106 107 /** 108 * Creates a new instance. 109 */ 110 public NioDatagramAcceptor() { 111 this(new DefaultDatagramSessionConfig(), null); 112 } 113 114 /** 115 * Creates a new instance. 116 * 117 * @param executor The executor to use 118 */ 119 public NioDatagramAcceptor(Executor executor) { 120 this(new DefaultDatagramSessionConfig(), executor); 121 } 122 123 /** 124 * Creates a new instance. 125 */ 126 private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) { 127 super(sessionConfig, executor); 128 129 try { 130 init(); 131 selectable = true; 132 } catch (RuntimeException e) { 133 throw e; 134 } catch (Exception e) { 135 throw new RuntimeIoException("Failed to initialize.", e); 136 } finally { 137 if (!selectable) { 138 try { 139 destroy(); 140 } catch (Exception e) { 141 ExceptionMonitor.getInstance().exceptionCaught(e); 142 } 143 } 144 } 145 } 146 147 /** 148 * This private class is used to accept incoming connection from 149 * clients. It's an infinite loop, which can be stopped when all 150 * the registered handles have been removed (unbound). 151 */ 152 private class Acceptor implements Runnable { 153 public void run() { 154 int nHandles = 0; 155 lastIdleCheckTime = System.currentTimeMillis(); 156 157 // Release the lock 158 lock.release(); 159 160 while (selectable) { 161 try { 162 int selected = select(SELECT_TIMEOUT); 163 164 nHandles += registerHandles(); 165 166 if (nHandles == 0) { 167 try { 168 lock.acquire(); 169 170 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { 171 acceptor = null; 172 break; 173 } 174 } finally { 175 lock.release(); 176 } 177 } 178 179 if (selected > 0) { 180 processReadySessions(selectedHandles()); 181 } 182 183 long currentTime = System.currentTimeMillis(); 184 flushSessions(currentTime); 185 nHandles -= unregisterHandles(); 186 187 notifyIdleSessions(currentTime); 188 } catch (ClosedSelectorException cse) { 189 // If the selector has been closed, we can exit the loop 190 ExceptionMonitor.getInstance().exceptionCaught(cse); 191 break; 192 } catch (Exception e) { 193 ExceptionMonitor.getInstance().exceptionCaught(e); 194 195 try { 196 Thread.sleep(1000); 197 } catch (InterruptedException e1) { 198 } 199 } 200 } 201 202 if (selectable && isDisposing()) { 203 selectable = false; 204 try { 205 destroy(); 206 } catch (Exception e) { 207 ExceptionMonitor.getInstance().exceptionCaught(e); 208 } finally { 209 disposalFuture.setValue(true); 210 } 211 } 212 } 213 } 214 215 private int registerHandles() { 216 for (;;) { 217 AcceptorOperationFuture req = registerQueue.poll(); 218 219 if (req == null) { 220 break; 221 } 222 223 Map<SocketAddress, DatagramChannel> newHandles = new HashMap<SocketAddress, DatagramChannel>(); 224 List<SocketAddress> localAddresses = req.getLocalAddresses(); 225 226 try { 227 for (SocketAddress socketAddress : localAddresses) { 228 DatagramChannel handle = open(socketAddress); 229 newHandles.put(localAddress(handle), handle); 230 } 231 232 boundHandles.putAll(newHandles); 233 234 getListeners().fireServiceActivated(); 235 req.setDone(); 236 237 return newHandles.size(); 238 } catch (Exception e) { 239 req.setException(e); 240 } finally { 241 // Roll back if failed to bind all addresses. 242 if (req.getException() != null) { 243 for (DatagramChannel handle : newHandles.values()) { 244 try { 245 close(handle); 246 } catch (Exception e) { 247 ExceptionMonitor.getInstance().exceptionCaught(e); 248 } 249 } 250 251 wakeup(); 252 } 253 } 254 } 255 256 return 0; 257 } 258 259 private void processReadySessions(Set<SelectionKey> handles) { 260 Iterator<SelectionKey> iterator = handles.iterator(); 261 262 while (iterator.hasNext()) { 263 SelectionKey key = iterator.next(); 264 DatagramChannel handle = (DatagramChannel) key.channel(); 265 iterator.remove(); 266 267 try { 268 if (key.isValid() && key.isReadable()) { 269 readHandle(handle); 270 } 271 272 if (key.isValid() && key.isWritable()) { 273 for (IoSession session : getManagedSessions().values()) { 274 scheduleFlush((NioSession) session); 275 } 276 } 277 } catch (Exception e) { 278 ExceptionMonitor.getInstance().exceptionCaught(e); 279 } 280 } 281 } 282 283 private boolean scheduleFlush(NioSession session) { 284 // Set the schedule for flush flag if the session 285 // has not already be added to the flushingSessions 286 // queue 287 if (session.setScheduledForFlush(true)) { 288 flushingSessions.add(session); 289 return true; 290 } else { 291 return false; 292 } 293 } 294 295 private void readHandle(DatagramChannel handle) throws Exception { 296 IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize()); 297 298 SocketAddress remoteAddress = receive(handle, readBuf); 299 300 if (remoteAddress != null) { 301 IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle)); 302 303 readBuf.flip(); 304 305 session.getFilterChain().fireMessageReceived(readBuf); 306 } 307 } 308 309 private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { 310 DatagramChannel handle = boundHandles.get(localAddress); 311 312 if (handle == null) { 313 throw new IllegalArgumentException("Unknown local address: " + localAddress); 314 } 315 316 IoSession session; 317 318 synchronized (sessionRecycler) { 319 session = sessionRecycler.recycle(remoteAddress); 320 321 if (session != null) { 322 return session; 323 } 324 325 // If a new session needs to be created. 326 NioSession newSession = newSession(this, handle, remoteAddress); 327 getSessionRecycler().put(newSession); 328 session = newSession; 329 } 330 331 initSession(session, null, null); 332 333 try { 334 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); 335 getListeners().fireSessionCreated(session); 336 } catch (Exception e) { 337 ExceptionMonitor.getInstance().exceptionCaught(e); 338 } 339 340 return session; 341 } 342 343 private void flushSessions(long currentTime) { 344 for (;;) { 345 NioSession session = flushingSessions.poll(); 346 347 if (session == null) { 348 break; 349 } 350 351 // Reset the Schedule for flush flag for this session, 352 // as we are flushing it now 353 session.unscheduledForFlush(); 354 355 try { 356 boolean flushedAll = flush(session, currentTime); 357 358 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { 359 scheduleFlush(session); 360 } 361 } catch (Exception e) { 362 session.getFilterChain().fireExceptionCaught(e); 363 } 364 } 365 } 366 367 private boolean flush(NioSession session, long currentTime) throws Exception { 368 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); 369 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() 370 + (session.getConfig().getMaxReadBufferSize() >>> 1); 371 372 int writtenBytes = 0; 373 374 try { 375 for (;;) { 376 WriteRequest req = session.getCurrentWriteRequest(); 377 378 if (req == null) { 379 req = writeRequestQueue.poll(session); 380 381 if (req == null) { 382 setInterestedInWrite(session, false); 383 break; 384 } 385 386 session.setCurrentWriteRequest(req); 387 } 388 389 IoBuffer buf = (IoBuffer) req.getMessage(); 390 391 if (buf.remaining() == 0) { 392 // Clear and fire event 393 session.setCurrentWriteRequest(null); 394 buf.reset(); 395 session.getFilterChain().fireMessageSent(req); 396 continue; 397 } 398 399 SocketAddress destination = req.getDestination(); 400 401 if (destination == null) { 402 destination = session.getRemoteAddress(); 403 } 404 405 int localWrittenBytes = send(session, buf, destination); 406 407 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) { 408 // Kernel buffer is full or wrote too much 409 setInterestedInWrite(session, true); 410 411 return false; 412 } else { 413 setInterestedInWrite(session, false); 414 415 // Clear and fire event 416 session.setCurrentWriteRequest(null); 417 writtenBytes += localWrittenBytes; 418 buf.reset(); 419 session.getFilterChain().fireMessageSent(req); 420 } 421 } 422 } finally { 423 session.increaseWrittenBytes(writtenBytes, currentTime); 424 } 425 426 return true; 427 } 428 429 private int unregisterHandles() { 430 int nHandles = 0; 431 432 for (;;) { 433 AcceptorOperationFuture request = cancelQueue.poll(); 434 if (request == null) { 435 break; 436 } 437 438 // close the channels 439 for (SocketAddress socketAddress : request.getLocalAddresses()) { 440 DatagramChannel handle = boundHandles.remove(socketAddress); 441 442 if (handle == null) { 443 continue; 444 } 445 446 try { 447 close(handle); 448 wakeup(); // wake up again to trigger thread death 449 } catch (Exception e) { 450 ExceptionMonitor.getInstance().exceptionCaught(e); 451 } finally { 452 nHandles++; 453 } 454 } 455 456 request.setDone(); 457 } 458 459 return nHandles; 460 } 461 462 private void notifyIdleSessions(long currentTime) { 463 // process idle sessions 464 if (currentTime - lastIdleCheckTime >= 1000) { 465 lastIdleCheckTime = currentTime; 466 AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime); 467 } 468 } 469 470 /** 471 * Starts the inner Acceptor thread. 472 */ 473 private void startupAcceptor() throws InterruptedException { 474 if (!selectable) { 475 registerQueue.clear(); 476 cancelQueue.clear(); 477 flushingSessions.clear(); 478 } 479 480 lock.acquire(); 481 482 if (acceptor == null) { 483 acceptor = new Acceptor(); 484 executeWorker(acceptor); 485 } else { 486 lock.release(); 487 } 488 } 489 490 protected void init() throws Exception { 491 this.selector = Selector.open(); 492 } 493 494 /** 495 * {@inheritDoc} 496 */ 497 public void add(NioSession session) { 498 // Nothing to do for UDP 499 } 500 501 /** 502 * {@inheritDoc} 503 */ 504 @Override 505 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { 506 // Create a bind request as a Future operation. When the selector 507 // have handled the registration, it will signal this future. 508 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); 509 510 // adds the Registration request to the queue for the Workers 511 // to handle 512 registerQueue.add(request); 513 514 // creates the Acceptor instance and has the local 515 // executor kick it off. 516 startupAcceptor(); 517 518 // As we just started the acceptor, we have to unblock the select() 519 // in order to process the bind request we just have added to the 520 // registerQueue. 521 try { 522 lock.acquire(); 523 524 // Wait a bit to give a chance to the Acceptor thread to do the select() 525 Thread.sleep(10); 526 wakeup(); 527 } finally { 528 lock.release(); 529 } 530 531 // Now, we wait until this request is completed. 532 request.awaitUninterruptibly(); 533 534 if (request.getException() != null) { 535 throw request.getException(); 536 } 537 538 // Update the local addresses. 539 // setLocalAddresses() shouldn't be called from the worker thread 540 // because of deadlock. 541 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>(); 542 543 for (DatagramChannel handle : boundHandles.values()) { 544 newLocalAddresses.add(localAddress(handle)); 545 } 546 547 return newLocalAddresses; 548 } 549 550 protected void close(DatagramChannel handle) throws Exception { 551 SelectionKey key = handle.keyFor(selector); 552 553 if (key != null) { 554 key.cancel(); 555 } 556 557 handle.disconnect(); 558 handle.close(); 559 } 560 561 protected void destroy() throws Exception { 562 if (selector != null) { 563 selector.close(); 564 } 565 } 566 567 /** 568 * {@inheritDoc} 569 */ 570 @Override 571 protected void dispose0() throws Exception { 572 unbind(); 573 startupAcceptor(); 574 wakeup(); 575 } 576 577 /** 578 * {@inheritDoc} 579 */ 580 public void flush(NioSession session) { 581 if (scheduleFlush(session)) { 582 wakeup(); 583 } 584 } 585 586 @Override 587 public InetSocketAddress getDefaultLocalAddress() { 588 return (InetSocketAddress) super.getDefaultLocalAddress(); 589 } 590 591 @Override 592 public InetSocketAddress getLocalAddress() { 593 return (InetSocketAddress) super.getLocalAddress(); 594 } 595 596 /** 597 * {@inheritDoc} 598 */ 599 public DatagramSessionConfig getSessionConfig() { 600 return (DatagramSessionConfig) sessionConfig; 601 } 602 603 public final IoSessionRecycler getSessionRecycler() { 604 return sessionRecycler; 605 } 606 607 public TransportMetadata getTransportMetadata() { 608 return NioDatagramSession.METADATA; 609 } 610 611 protected boolean isReadable(DatagramChannel handle) { 612 SelectionKey key = handle.keyFor(selector); 613 614 if ((key == null) || (!key.isValid())) { 615 return false; 616 } 617 618 return key.isReadable(); 619 } 620 621 protected boolean isWritable(DatagramChannel handle) { 622 SelectionKey key = handle.keyFor(selector); 623 624 if ((key == null) || (!key.isValid())) { 625 return false; 626 } 627 628 return key.isWritable(); 629 } 630 631 protected SocketAddress localAddress(DatagramChannel handle) throws Exception { 632 InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress(); 633 InetAddress inetAddress = inetSocketAddress.getAddress(); 634 635 if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) { 636 // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6 637 // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6 638 // ANY address in the map. 639 byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress(); 640 byte[] ipV4Address = new byte[4]; 641 642 System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4); 643 644 InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address); 645 return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort()); 646 } else { 647 return inetSocketAddress; 648 } 649 } 650 651 protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle, 652 SocketAddress remoteAddress) { 653 SelectionKey key = handle.keyFor(selector); 654 655 if ((key == null) || (!key.isValid())) { 656 return null; 657 } 658 659 NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress); 660 newSession.setSelectionKey(key); 661 662 return newSession; 663 } 664 665 /** 666 * {@inheritDoc} 667 */ 668 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) { 669 if (isDisposing()) { 670 throw new IllegalStateException("The Acceptor is being disposed."); 671 } 672 673 if (remoteAddress == null) { 674 throw new IllegalArgumentException("remoteAddress"); 675 } 676 677 synchronized (bindLock) { 678 if (!isActive()) { 679 throw new IllegalStateException("Can't create a session from a unbound service."); 680 } 681 682 try { 683 return newSessionWithoutLock(remoteAddress, localAddress); 684 } catch (RuntimeException e) { 685 throw e; 686 } catch (Error e) { 687 throw e; 688 } catch (Exception e) { 689 throw new RuntimeIoException("Failed to create a session.", e); 690 } 691 } 692 } 693 694 protected DatagramChannel open(SocketAddress localAddress) throws Exception { 695 final DatagramChannel ch = DatagramChannel.open(); 696 boolean success = false; 697 try { 698 new NioDatagramSessionConfig(ch).setAll(getSessionConfig()); 699 ch.configureBlocking(false); 700 701 try { 702 ch.socket().bind(localAddress); 703 } catch (IOException ioe) { 704 // Add some info regarding the address we try to bind to the 705 // message 706 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : " 707 + ioe.getMessage(); 708 Exception e = new IOException(newMessage); 709 e.initCause(ioe.getCause()); 710 711 // And close the channel 712 ch.close(); 713 714 throw e; 715 } 716 717 ch.register(selector, SelectionKey.OP_READ); 718 success = true; 719 } finally { 720 if (!success) { 721 close(ch); 722 } 723 } 724 725 return ch; 726 } 727 728 protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception { 729 return handle.receive(buffer.buf()); 730 } 731 732 /** 733 * {@inheritDoc} 734 */ 735 public void remove(NioSession session) { 736 getSessionRecycler().remove(session); 737 getListeners().fireSessionDestroyed(session); 738 } 739 740 protected int select() throws Exception { 741 return selector.select(); 742 } 743 744 protected int select(long timeout) throws Exception { 745 return selector.select(timeout); 746 } 747 748 protected Set<SelectionKey> selectedHandles() { 749 return selector.selectedKeys(); 750 } 751 752 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception { 753 return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress); 754 } 755 756 public void setDefaultLocalAddress(InetSocketAddress localAddress) { 757 setDefaultLocalAddress((SocketAddress) localAddress); 758 } 759 760 protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception { 761 SelectionKey key = session.getSelectionKey(); 762 763 if (key == null) { 764 return; 765 } 766 767 int newInterestOps = key.interestOps(); 768 769 if (isInterested) { 770 newInterestOps |= SelectionKey.OP_WRITE; 771 } else { 772 newInterestOps &= ~SelectionKey.OP_WRITE; 773 } 774 775 key.interestOps(newInterestOps); 776 } 777 778 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) { 779 synchronized (bindLock) { 780 if (isActive()) { 781 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound."); 782 } 783 784 if (sessionRecycler == null) { 785 sessionRecycler = DEFAULT_RECYCLER; 786 } 787 788 this.sessionRecycler = sessionRecycler; 789 } 790 } 791 792 /** 793 * {@inheritDoc} 794 */ 795 @Override 796 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception { 797 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); 798 799 cancelQueue.add(request); 800 startupAcceptor(); 801 wakeup(); 802 803 request.awaitUninterruptibly(); 804 805 if (request.getException() != null) { 806 throw request.getException(); 807 } 808 } 809 810 /** 811 * {@inheritDoc} 812 */ 813 public void updateTrafficControl(NioSession session) { 814 throw new UnsupportedOperationException(); 815 } 816 817 protected void wakeup() { 818 selector.wakeup(); 819 } 820 821 /** 822 * {@inheritDoc} 823 */ 824 public void write(NioSession session, WriteRequest writeRequest) { 825 // We will try to write the message directly 826 long currentTime = System.currentTimeMillis(); 827 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); 828 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() 829 + (session.getConfig().getMaxReadBufferSize() >>> 1); 830 831 int writtenBytes = 0; 832 833 // Deal with the special case of a Message marker (no bytes in the request) 834 // We just have to return after having calle dthe messageSent event 835 IoBuffer buf = (IoBuffer) writeRequest.getMessage(); 836 837 if (buf.remaining() == 0) { 838 // Clear and fire event 839 session.setCurrentWriteRequest(null); 840 buf.reset(); 841 session.getFilterChain().fireMessageSent(writeRequest); 842 return; 843 } 844 845 // Now, write the data 846 try { 847 for (;;) { 848 if (writeRequest == null) { 849 writeRequest = writeRequestQueue.poll(session); 850 851 if (writeRequest == null) { 852 setInterestedInWrite(session, false); 853 break; 854 } 855 856 session.setCurrentWriteRequest(writeRequest); 857 } 858 859 buf = (IoBuffer) writeRequest.getMessage(); 860 861 if (buf.remaining() == 0) { 862 // Clear and fire event 863 session.setCurrentWriteRequest(null); 864 buf.reset(); 865 session.getFilterChain().fireMessageSent(writeRequest); 866 continue; 867 } 868 869 SocketAddress destination = writeRequest.getDestination(); 870 871 if (destination == null) { 872 destination = session.getRemoteAddress(); 873 } 874 875 int localWrittenBytes = send(session, buf, destination); 876 877 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) { 878 // Kernel buffer is full or wrote too much 879 setInterestedInWrite(session, true); 880 881 session.getWriteRequestQueue().offer(session, writeRequest); 882 scheduleFlush(session); 883 } else { 884 setInterestedInWrite(session, false); 885 886 // Clear and fire event 887 session.setCurrentWriteRequest(null); 888 writtenBytes += localWrittenBytes; 889 buf.reset(); 890 session.getFilterChain().fireMessageSent(writeRequest); 891 892 break; 893 } 894 } 895 } catch (Exception e) { 896 session.getFilterChain().fireExceptionCaught(e); 897 } finally { 898 session.increaseWrittenBytes(writtenBytes, currentTime); 899 } 900 } 901}