001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.nio;
021
022import java.io.IOException;
023import java.net.Inet4Address;
024import java.net.Inet6Address;
025import java.net.InetAddress;
026import java.net.InetSocketAddress;
027import java.net.SocketAddress;
028import java.nio.channels.ClosedSelectorException;
029import java.nio.channels.DatagramChannel;
030import java.nio.channels.SelectionKey;
031import java.nio.channels.Selector;
032import java.util.Collections;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.Iterator;
036import java.util.List;
037import java.util.Map;
038import java.util.Queue;
039import java.util.Set;
040import java.util.concurrent.ConcurrentLinkedQueue;
041import java.util.concurrent.Executor;
042import java.util.concurrent.Semaphore;
043
044import org.apache.mina.core.RuntimeIoException;
045import org.apache.mina.core.buffer.IoBuffer;
046import org.apache.mina.core.service.AbstractIoAcceptor;
047import org.apache.mina.core.service.IoAcceptor;
048import org.apache.mina.core.service.IoProcessor;
049import org.apache.mina.core.service.TransportMetadata;
050import org.apache.mina.core.session.AbstractIoSession;
051import org.apache.mina.core.session.ExpiringSessionRecycler;
052import org.apache.mina.core.session.IoSession;
053import org.apache.mina.core.session.IoSessionConfig;
054import org.apache.mina.core.session.IoSessionRecycler;
055import org.apache.mina.core.write.WriteRequest;
056import org.apache.mina.core.write.WriteRequestQueue;
057import org.apache.mina.transport.socket.DatagramAcceptor;
058import org.apache.mina.transport.socket.DatagramSessionConfig;
059import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
060import org.apache.mina.util.ExceptionMonitor;
061
062/**
063 * {@link IoAcceptor} for datagram transport (UDP/IP).
064 *
065 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
066 * @org.apache.xbean.XBean
067 */
068public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
069    /**
070     * A session recycler that is used to retrieve an existing session, unless it's too old.
071     **/
072    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
073
074    /**
075     * A timeout used for the select, as we need to get out to deal with idle
076     * sessions
077     */
078    private static final long SELECT_TIMEOUT = 1000L;
079
080    /** A lock used to protect the selector to be waked up before it's created */
081    private final Semaphore lock = new Semaphore(1);
082
083    /** A queue used to store the list of pending Binds */
084    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
085
086    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
087
088    private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<NioSession>();
089
090    private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
091            .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
092
093    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
094
095    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
096
097    private volatile boolean selectable;
098
099    /** The thread responsible of accepting incoming requests */
100    private Acceptor acceptor;
101
102    private long lastIdleCheckTime;
103
104    /** The Selector used by this acceptor */
105    private volatile Selector selector;
106
107    /**
108     * Creates a new instance.
109     */
110    public NioDatagramAcceptor() {
111        this(new DefaultDatagramSessionConfig(), null);
112    }
113
114    /**
115     * Creates a new instance.
116     * 
117     * @param executor The executor to use
118     */
119    public NioDatagramAcceptor(Executor executor) {
120        this(new DefaultDatagramSessionConfig(), executor);
121    }
122
123    /**
124     * Creates a new instance.
125     */
126    private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127        super(sessionConfig, executor);
128
129        try {
130            init();
131            selectable = true;
132        } catch (RuntimeException e) {
133            throw e;
134        } catch (Exception e) {
135            throw new RuntimeIoException("Failed to initialize.", e);
136        } finally {
137            if (!selectable) {
138                try {
139                    destroy();
140                } catch (Exception e) {
141                    ExceptionMonitor.getInstance().exceptionCaught(e);
142                }
143            }
144        }
145    }
146
147    /**
148     * This private class is used to accept incoming connection from
149     * clients. It's an infinite loop, which can be stopped when all
150     * the registered handles have been removed (unbound).
151     */
152    private class Acceptor implements Runnable {
153        public void run() {
154            int nHandles = 0;
155            lastIdleCheckTime = System.currentTimeMillis();
156
157            // Release the lock
158            lock.release();
159
160            while (selectable) {
161                try {
162                    int selected = select(SELECT_TIMEOUT);
163
164                    nHandles += registerHandles();
165
166                    if (nHandles == 0) {
167                        try {
168                            lock.acquire();
169
170                            if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
171                                acceptor = null;
172                                break;
173                            }
174                        } finally {
175                            lock.release();
176                        }
177                    }
178
179                    if (selected > 0) {
180                        processReadySessions(selectedHandles());
181                    }
182
183                    long currentTime = System.currentTimeMillis();
184                    flushSessions(currentTime);
185                    nHandles -= unregisterHandles();
186
187                    notifyIdleSessions(currentTime);
188                } catch (ClosedSelectorException cse) {
189                    // If the selector has been closed, we can exit the loop
190                    ExceptionMonitor.getInstance().exceptionCaught(cse);
191                    break;
192                } catch (Exception e) {
193                    ExceptionMonitor.getInstance().exceptionCaught(e);
194
195                    try {
196                        Thread.sleep(1000);
197                    } catch (InterruptedException e1) {
198                    }
199                }
200            }
201
202            if (selectable && isDisposing()) {
203                selectable = false;
204                try {
205                    destroy();
206                } catch (Exception e) {
207                    ExceptionMonitor.getInstance().exceptionCaught(e);
208                } finally {
209                    disposalFuture.setValue(true);
210                }
211            }
212        }
213    }
214
215    private int registerHandles() {
216        for (;;) {
217            AcceptorOperationFuture req = registerQueue.poll();
218
219            if (req == null) {
220                break;
221            }
222
223            Map<SocketAddress, DatagramChannel> newHandles = new HashMap<SocketAddress, DatagramChannel>();
224            List<SocketAddress> localAddresses = req.getLocalAddresses();
225
226            try {
227                for (SocketAddress socketAddress : localAddresses) {
228                    DatagramChannel handle = open(socketAddress);
229                    newHandles.put(localAddress(handle), handle);
230                }
231
232                boundHandles.putAll(newHandles);
233
234                getListeners().fireServiceActivated();
235                req.setDone();
236
237                return newHandles.size();
238            } catch (Exception e) {
239                req.setException(e);
240            } finally {
241                // Roll back if failed to bind all addresses.
242                if (req.getException() != null) {
243                    for (DatagramChannel handle : newHandles.values()) {
244                        try {
245                            close(handle);
246                        } catch (Exception e) {
247                            ExceptionMonitor.getInstance().exceptionCaught(e);
248                        }
249                    }
250
251                    wakeup();
252                }
253            }
254        }
255
256        return 0;
257    }
258
259    private void processReadySessions(Set<SelectionKey> handles) {
260        Iterator<SelectionKey> iterator = handles.iterator();
261
262        while (iterator.hasNext()) {
263            SelectionKey key = iterator.next();
264            DatagramChannel handle = (DatagramChannel) key.channel();
265            iterator.remove();
266
267            try {
268                if (key.isValid() && key.isReadable()) {
269                    readHandle(handle);
270                }
271
272                if (key.isValid() && key.isWritable()) {
273                    for (IoSession session : getManagedSessions().values()) {
274                        scheduleFlush((NioSession) session);
275                    }
276                }
277            } catch (Exception e) {
278                ExceptionMonitor.getInstance().exceptionCaught(e);
279            }
280        }
281    }
282
283    private boolean scheduleFlush(NioSession session) {
284        // Set the schedule for flush flag if the session
285        // has not already be added to the flushingSessions
286        // queue
287        if (session.setScheduledForFlush(true)) {
288            flushingSessions.add(session);
289            return true;
290        } else {
291            return false;
292        }
293    }
294
295    private void readHandle(DatagramChannel handle) throws Exception {
296        IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
297
298        SocketAddress remoteAddress = receive(handle, readBuf);
299
300        if (remoteAddress != null) {
301            IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
302
303            readBuf.flip();
304
305            session.getFilterChain().fireMessageReceived(readBuf);
306        }
307    }
308
309    private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
310        DatagramChannel handle = boundHandles.get(localAddress);
311
312        if (handle == null) {
313            throw new IllegalArgumentException("Unknown local address: " + localAddress);
314        }
315
316        IoSession session;
317
318        synchronized (sessionRecycler) {
319            session = sessionRecycler.recycle(remoteAddress);
320
321            if (session != null) {
322                return session;
323            }
324
325            // If a new session needs to be created.
326            NioSession newSession = newSession(this, handle, remoteAddress);
327            getSessionRecycler().put(newSession);
328            session = newSession;
329        }
330
331        initSession(session, null, null);
332
333        try {
334            this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
335            getListeners().fireSessionCreated(session);
336        } catch (Exception e) {
337            ExceptionMonitor.getInstance().exceptionCaught(e);
338        }
339
340        return session;
341    }
342
343    private void flushSessions(long currentTime) {
344        for (;;) {
345            NioSession session = flushingSessions.poll();
346
347            if (session == null) {
348                break;
349            }
350
351            // Reset the Schedule for flush flag for this session,
352            // as we are flushing it now
353            session.unscheduledForFlush();
354
355            try {
356                boolean flushedAll = flush(session, currentTime);
357
358                if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
359                    scheduleFlush(session);
360                }
361            } catch (Exception e) {
362                session.getFilterChain().fireExceptionCaught(e);
363            }
364        }
365    }
366
367    private boolean flush(NioSession session, long currentTime) throws Exception {
368        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
369        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
370                + (session.getConfig().getMaxReadBufferSize() >>> 1);
371
372        int writtenBytes = 0;
373
374        try {
375            for (;;) {
376                WriteRequest req = session.getCurrentWriteRequest();
377
378                if (req == null) {
379                    req = writeRequestQueue.poll(session);
380
381                    if (req == null) {
382                        setInterestedInWrite(session, false);
383                        break;
384                    }
385
386                    session.setCurrentWriteRequest(req);
387                }
388
389                IoBuffer buf = (IoBuffer) req.getMessage();
390
391                if (buf.remaining() == 0) {
392                    // Clear and fire event
393                    session.setCurrentWriteRequest(null);
394                    buf.reset();
395                    session.getFilterChain().fireMessageSent(req);
396                    continue;
397                }
398
399                SocketAddress destination = req.getDestination();
400
401                if (destination == null) {
402                    destination = session.getRemoteAddress();
403                }
404
405                int localWrittenBytes = send(session, buf, destination);
406
407                if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
408                    // Kernel buffer is full or wrote too much
409                    setInterestedInWrite(session, true);
410
411                    return false;
412                } else {
413                    setInterestedInWrite(session, false);
414
415                    // Clear and fire event
416                    session.setCurrentWriteRequest(null);
417                    writtenBytes += localWrittenBytes;
418                    buf.reset();
419                    session.getFilterChain().fireMessageSent(req);
420                }
421            }
422        } finally {
423            session.increaseWrittenBytes(writtenBytes, currentTime);
424        }
425
426        return true;
427    }
428
429    private int unregisterHandles() {
430        int nHandles = 0;
431
432        for (;;) {
433            AcceptorOperationFuture request = cancelQueue.poll();
434            if (request == null) {
435                break;
436            }
437
438            // close the channels
439            for (SocketAddress socketAddress : request.getLocalAddresses()) {
440                DatagramChannel handle = boundHandles.remove(socketAddress);
441
442                if (handle == null) {
443                    continue;
444                }
445
446                try {
447                    close(handle);
448                    wakeup(); // wake up again to trigger thread death
449                } catch (Exception e) {
450                    ExceptionMonitor.getInstance().exceptionCaught(e);
451                } finally {
452                    nHandles++;
453                }
454            }
455
456            request.setDone();
457        }
458
459        return nHandles;
460    }
461
462    private void notifyIdleSessions(long currentTime) {
463        // process idle sessions
464        if (currentTime - lastIdleCheckTime >= 1000) {
465            lastIdleCheckTime = currentTime;
466            AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
467        }
468    }
469
470    /**
471     * Starts the inner Acceptor thread.
472     */
473    private void startupAcceptor() throws InterruptedException {
474        if (!selectable) {
475            registerQueue.clear();
476            cancelQueue.clear();
477            flushingSessions.clear();
478        }
479
480        lock.acquire();
481
482        if (acceptor == null) {
483            acceptor = new Acceptor();
484            executeWorker(acceptor);
485        } else {
486            lock.release();
487        }
488    }
489
490    protected void init() throws Exception {
491        this.selector = Selector.open();
492    }
493
494    /**
495     * {@inheritDoc}
496     */
497    public void add(NioSession session) {
498        // Nothing to do for UDP
499    }
500
501    /**
502     * {@inheritDoc}
503     */
504    @Override
505    protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
506        // Create a bind request as a Future operation. When the selector
507        // have handled the registration, it will signal this future.
508        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
509
510        // adds the Registration request to the queue for the Workers
511        // to handle
512        registerQueue.add(request);
513
514        // creates the Acceptor instance and has the local
515        // executor kick it off.
516        startupAcceptor();
517
518        // As we just started the acceptor, we have to unblock the select()
519        // in order to process the bind request we just have added to the
520        // registerQueue.
521        try {
522            lock.acquire();
523
524            // Wait a bit to give a chance to the Acceptor thread to do the select()
525            Thread.sleep(10);
526            wakeup();
527        } finally {
528            lock.release();
529        }
530
531        // Now, we wait until this request is completed.
532        request.awaitUninterruptibly();
533
534        if (request.getException() != null) {
535            throw request.getException();
536        }
537
538        // Update the local addresses.
539        // setLocalAddresses() shouldn't be called from the worker thread
540        // because of deadlock.
541        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
542
543        for (DatagramChannel handle : boundHandles.values()) {
544            newLocalAddresses.add(localAddress(handle));
545        }
546
547        return newLocalAddresses;
548    }
549
550    protected void close(DatagramChannel handle) throws Exception {
551        SelectionKey key = handle.keyFor(selector);
552
553        if (key != null) {
554            key.cancel();
555        }
556
557        handle.disconnect();
558        handle.close();
559    }
560
561    protected void destroy() throws Exception {
562        if (selector != null) {
563            selector.close();
564        }
565    }
566
567    /**
568     * {@inheritDoc}
569     */
570    @Override
571    protected void dispose0() throws Exception {
572        unbind();
573        startupAcceptor();
574        wakeup();
575    }
576
577    /**
578     * {@inheritDoc}
579     */
580    public void flush(NioSession session) {
581        if (scheduleFlush(session)) {
582            wakeup();
583        }
584    }
585
586    @Override
587    public InetSocketAddress getDefaultLocalAddress() {
588        return (InetSocketAddress) super.getDefaultLocalAddress();
589    }
590
591    @Override
592    public InetSocketAddress getLocalAddress() {
593        return (InetSocketAddress) super.getLocalAddress();
594    }
595
596    /**
597     * {@inheritDoc}
598     */
599    public DatagramSessionConfig getSessionConfig() {
600        return (DatagramSessionConfig) sessionConfig;
601    }
602
603    public final IoSessionRecycler getSessionRecycler() {
604        return sessionRecycler;
605    }
606
607    public TransportMetadata getTransportMetadata() {
608        return NioDatagramSession.METADATA;
609    }
610
611    protected boolean isReadable(DatagramChannel handle) {
612        SelectionKey key = handle.keyFor(selector);
613
614        if ((key == null) || (!key.isValid())) {
615            return false;
616        }
617
618        return key.isReadable();
619    }
620
621    protected boolean isWritable(DatagramChannel handle) {
622        SelectionKey key = handle.keyFor(selector);
623
624        if ((key == null) || (!key.isValid())) {
625            return false;
626        }
627
628        return key.isWritable();
629    }
630
631    protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
632        InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
633        InetAddress inetAddress = inetSocketAddress.getAddress();
634
635        if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
636            // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
637            // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
638            // ANY address in the map.
639            byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
640            byte[] ipV4Address = new byte[4];
641
642            System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);
643
644            InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
645            return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
646        } else {
647            return inetSocketAddress;
648        }
649    }
650
651    protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
652            SocketAddress remoteAddress) {
653        SelectionKey key = handle.keyFor(selector);
654
655        if ((key == null) || (!key.isValid())) {
656            return null;
657        }
658
659        NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
660        newSession.setSelectionKey(key);
661
662        return newSession;
663    }
664
665    /**
666     * {@inheritDoc}
667     */
668    public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
669        if (isDisposing()) {
670            throw new IllegalStateException("The Acceptor is being disposed.");
671        }
672
673        if (remoteAddress == null) {
674            throw new IllegalArgumentException("remoteAddress");
675        }
676
677        synchronized (bindLock) {
678            if (!isActive()) {
679                throw new IllegalStateException("Can't create a session from a unbound service.");
680            }
681
682            try {
683                return newSessionWithoutLock(remoteAddress, localAddress);
684            } catch (RuntimeException e) {
685                throw e;
686            } catch (Error e) {
687                throw e;
688            } catch (Exception e) {
689                throw new RuntimeIoException("Failed to create a session.", e);
690            }
691        }
692    }
693
694    protected DatagramChannel open(SocketAddress localAddress) throws Exception {
695        final DatagramChannel ch = DatagramChannel.open();
696        boolean success = false;
697        try {
698            new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
699            ch.configureBlocking(false);
700
701            try {
702                ch.socket().bind(localAddress);
703            } catch (IOException ioe) {
704                // Add some info regarding the address we try to bind to the
705                // message
706                String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
707                        + ioe.getMessage();
708                Exception e = new IOException(newMessage);
709                e.initCause(ioe.getCause());
710
711                // And close the channel
712                ch.close();
713
714                throw e;
715            }
716
717            ch.register(selector, SelectionKey.OP_READ);
718            success = true;
719        } finally {
720            if (!success) {
721                close(ch);
722            }
723        }
724
725        return ch;
726    }
727
728    protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
729        return handle.receive(buffer.buf());
730    }
731
732    /**
733     * {@inheritDoc}
734     */
735    public void remove(NioSession session) {
736        getSessionRecycler().remove(session);
737        getListeners().fireSessionDestroyed(session);
738    }
739
740    protected int select() throws Exception {
741        return selector.select();
742    }
743
744    protected int select(long timeout) throws Exception {
745        return selector.select(timeout);
746    }
747
748    protected Set<SelectionKey> selectedHandles() {
749        return selector.selectedKeys();
750    }
751
752    protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
753        return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
754    }
755
756    public void setDefaultLocalAddress(InetSocketAddress localAddress) {
757        setDefaultLocalAddress((SocketAddress) localAddress);
758    }
759
760    protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
761        SelectionKey key = session.getSelectionKey();
762
763        if (key == null) {
764            return;
765        }
766
767        int newInterestOps = key.interestOps();
768
769        if (isInterested) {
770            newInterestOps |= SelectionKey.OP_WRITE;
771        } else {
772            newInterestOps &= ~SelectionKey.OP_WRITE;
773        }
774
775        key.interestOps(newInterestOps);
776    }
777
778    public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
779        synchronized (bindLock) {
780            if (isActive()) {
781                throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
782            }
783
784            if (sessionRecycler == null) {
785                sessionRecycler = DEFAULT_RECYCLER;
786            }
787
788            this.sessionRecycler = sessionRecycler;
789        }
790    }
791
792    /**
793     * {@inheritDoc}
794     */
795    @Override
796    protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
797        AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
798
799        cancelQueue.add(request);
800        startupAcceptor();
801        wakeup();
802
803        request.awaitUninterruptibly();
804
805        if (request.getException() != null) {
806            throw request.getException();
807        }
808    }
809
810    /**
811     * {@inheritDoc}
812     */
813    public void updateTrafficControl(NioSession session) {
814        throw new UnsupportedOperationException();
815    }
816
817    protected void wakeup() {
818        selector.wakeup();
819    }
820
821    /**
822     * {@inheritDoc}
823     */
824    public void write(NioSession session, WriteRequest writeRequest) {
825        // We will try to write the message directly
826        long currentTime = System.currentTimeMillis();
827        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
828        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
829                + (session.getConfig().getMaxReadBufferSize() >>> 1);
830
831        int writtenBytes = 0;
832
833        // Deal with the special case of a Message marker (no bytes in the request)
834        // We just have to return after having calle dthe messageSent event
835        IoBuffer buf = (IoBuffer) writeRequest.getMessage();
836
837        if (buf.remaining() == 0) {
838            // Clear and fire event
839            session.setCurrentWriteRequest(null);
840            buf.reset();
841            session.getFilterChain().fireMessageSent(writeRequest);
842            return;
843        }
844
845        // Now, write the data
846        try {
847            for (;;) {
848                if (writeRequest == null) {
849                    writeRequest = writeRequestQueue.poll(session);
850
851                    if (writeRequest == null) {
852                        setInterestedInWrite(session, false);
853                        break;
854                    }
855
856                    session.setCurrentWriteRequest(writeRequest);
857                }
858
859                buf = (IoBuffer) writeRequest.getMessage();
860
861                if (buf.remaining() == 0) {
862                    // Clear and fire event
863                    session.setCurrentWriteRequest(null);
864                    buf.reset();
865                    session.getFilterChain().fireMessageSent(writeRequest);
866                    continue;
867                }
868
869                SocketAddress destination = writeRequest.getDestination();
870
871                if (destination == null) {
872                    destination = session.getRemoteAddress();
873                }
874
875                int localWrittenBytes = send(session, buf, destination);
876
877                if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
878                    // Kernel buffer is full or wrote too much
879                    setInterestedInWrite(session, true);
880
881                    session.getWriteRequestQueue().offer(session, writeRequest);
882                    scheduleFlush(session);
883                } else {
884                    setInterestedInWrite(session, false);
885
886                    // Clear and fire event
887                    session.setCurrentWriteRequest(null);
888                    writtenBytes += localWrittenBytes;
889                    buf.reset();
890                    session.getFilterChain().fireMessageSent(writeRequest);
891
892                    break;
893                }
894            }
895        } catch (Exception e) {
896            session.getFilterChain().fireExceptionCaught(e);
897        } finally {
898            session.increaseWrittenBytes(writtenBytes, currentTime);
899        }
900    }
901}