View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.Inet4Address;
24  import java.net.Inet6Address;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.SocketAddress;
28  import java.nio.channels.ClosedSelectorException;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.SelectionKey;
31  import java.nio.channels.Selector;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.Semaphore;
43  
44  import org.apache.mina.core.RuntimeIoException;
45  import org.apache.mina.core.buffer.IoBuffer;
46  import org.apache.mina.core.service.AbstractIoAcceptor;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoProcessor;
49  import org.apache.mina.core.service.TransportMetadata;
50  import org.apache.mina.core.session.AbstractIoSession;
51  import org.apache.mina.core.session.ExpiringSessionRecycler;
52  import org.apache.mina.core.session.IoSession;
53  import org.apache.mina.core.session.IoSessionConfig;
54  import org.apache.mina.core.session.IoSessionRecycler;
55  import org.apache.mina.core.write.WriteRequest;
56  import org.apache.mina.core.write.WriteRequestQueue;
57  import org.apache.mina.transport.socket.DatagramAcceptor;
58  import org.apache.mina.transport.socket.DatagramSessionConfig;
59  import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
60  import org.apache.mina.util.ExceptionMonitor;
61  
62  /**
63   * {@link IoAcceptor} for datagram transport (UDP/IP).
64   *
65   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
66   * @org.apache.xbean.XBean
67   */
68  public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
69      /**
70       * A session recycler that is used to retrieve an existing session, unless it's too old.
71       **/
72      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
73  
74      /**
75       * A timeout used for the select, as we need to get out to deal with idle
76       * sessions
77       */
78      private static final long SELECT_TIMEOUT = 1000L;
79  
80      /** A lock used to protect the selector to be waked up before it's created */
81      private final Semaphore lock = new Semaphore(1);
82  
83      /** A queue used to store the list of pending Binds */
84      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
85  
86      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
87  
88      private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
89  
90      private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
91              .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
92  
93      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
94  
95      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
96  
97      private volatile boolean selectable;
98  
99      /** The thread responsible of accepting incoming requests */
100     private Acceptor acceptor;
101 
102     private long lastIdleCheckTime;
103 
104     /** The Selector used by this acceptor */
105     private volatile Selector selector;
106 
107     /**
108      * Creates a new instance.
109      */
110     public NioDatagramAcceptor() {
111         this(new DefaultDatagramSessionConfig(), null);
112     }
113 
114     /**
115      * Creates a new instance.
116      * 
117      * @param executor The executor to use
118      */
119     public NioDatagramAcceptor(Executor executor) {
120         this(new DefaultDatagramSessionConfig(), executor);
121     }
122 
123     /**
124      * Creates a new instance.
125      */
126     private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127         super(sessionConfig, executor);
128 
129         try {
130             init();
131             selectable = true;
132         } catch (RuntimeException e) {
133             throw e;
134         } catch (Exception e) {
135             throw new RuntimeIoException("Failed to initialize.", e);
136         } finally {
137             if (!selectable) {
138                 try {
139                     destroy();
140                 } catch (Exception e) {
141                     ExceptionMonitor.getInstance().exceptionCaught(e);
142                 }
143             }
144         }
145     }
146 
147     /**
148      * This private class is used to accept incoming connection from
149      * clients. It's an infinite loop, which can be stopped when all
150      * the registered handles have been removed (unbound).
151      */
152     private class Acceptor implements Runnable {
153         @Override
154         public void run() {
155             int nHandles = 0;
156             lastIdleCheckTime = System.currentTimeMillis();
157 
158             // Release the lock
159             lock.release();
160 
161             while (selectable) {
162                 try {
163                     int selected = select(SELECT_TIMEOUT);
164 
165                     nHandles += registerHandles();
166 
167                     if (nHandles == 0) {
168                         try {
169                             lock.acquire();
170 
171                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
172                                 acceptor = null;
173                                 break;
174                             }
175                         } finally {
176                             lock.release();
177                         }
178                     }
179 
180                     if (selected > 0) {
181                         processReadySessions(selectedHandles());
182                     }
183 
184                     long currentTime = System.currentTimeMillis();
185                     flushSessions(currentTime);
186                     nHandles -= unregisterHandles();
187 
188                     notifyIdleSessions(currentTime);
189                 } catch (ClosedSelectorException cse) {
190                     // If the selector has been closed, we can exit the loop
191                     ExceptionMonitor.getInstance().exceptionCaught(cse);
192                     break;
193                 } catch (Exception e) {
194                     ExceptionMonitor.getInstance().exceptionCaught(e);
195 
196                     try {
197                         Thread.sleep(1000);
198                     } catch (InterruptedException e1) {
199                     }
200                 }
201             }
202 
203             if (selectable && isDisposing()) {
204                 selectable = false;
205                 try {
206                     destroy();
207                 } catch (Exception e) {
208                     ExceptionMonitor.getInstance().exceptionCaught(e);
209                 } finally {
210                     disposalFuture.setValue(true);
211                 }
212             }
213         }
214     }
215 
216     private int registerHandles() {
217         for (;;) {
218             AcceptorOperationFuture req = registerQueue.poll();
219 
220             if (req == null) {
221                 break;
222             }
223 
224             Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
225             List<SocketAddress> localAddresses = req.getLocalAddresses();
226 
227             try {
228                 for (SocketAddress socketAddress : localAddresses) {
229                     DatagramChannel handle = open(socketAddress);
230                     newHandles.put(localAddress(handle), handle);
231                 }
232 
233                 boundHandles.putAll(newHandles);
234 
235                 getListeners().fireServiceActivated();
236                 req.setDone();
237 
238                 return newHandles.size();
239             } catch (Exception e) {
240                 req.setException(e);
241             } finally {
242                 // Roll back if failed to bind all addresses.
243                 if (req.getException() != null) {
244                     for (DatagramChannel handle : newHandles.values()) {
245                         try {
246                             close(handle);
247                         } catch (Exception e) {
248                             ExceptionMonitor.getInstance().exceptionCaught(e);
249                         }
250                     }
251 
252                     wakeup();
253                 }
254             }
255         }
256 
257         return 0;
258     }
259 
260     private void processReadySessions(Set<SelectionKey> handles) {
261 	final Iterator<SelectionKey> iterator = handles.iterator();
262 
263 	while (iterator.hasNext()) {
264 	    try {
265 		final SelectionKey key = iterator.next();
266 		final DatagramChannel handle = (DatagramChannel) key.channel();
267 
268 		if (key.isValid()) {
269 		    if (key.isReadable()) {
270 			readHandle(handle);
271 		    }
272 
273 		    if (key.isWritable()) {
274 			for (IoSession session : getManagedSessions().values()) {
275 			    final NioSession"../../../../../../org/apache/mina/transport/socket/nio/NioSession.html#NioSession">NioSession x = (NioSession) session;
276 			    if (x.getChannel() == handle) {
277 				scheduleFlush(x);
278 			    }
279 			}
280 		    }
281 		}
282 
283 	    } catch (Exception e) {
284 		ExceptionMonitor.getInstance().exceptionCaught(e);
285 	    } finally {
286 		iterator.remove();
287 	    }
288 	}
289     }
290 
291     private boolean scheduleFlush(NioSession session) {
292         // Set the schedule for flush flag if the session
293         // has not already be added to the flushingSessions
294         // queue
295         if (session.setScheduledForFlush(true)) {
296             flushingSessions.add(session);
297             return true;
298         } else {
299             return false;
300         }
301     }
302 
303     private void readHandle(DatagramChannel handle) throws Exception {
304         IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
305 
306         SocketAddress remoteAddress = receive(handle, readBuf);
307 
308         if (remoteAddress != null) {
309             IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
310 
311             readBuf.flip();
312 
313             if (!session.isReadSuspended()) {
314                 session.getFilterChain().fireMessageReceived(readBuf);
315             }
316         }
317     }
318 
319     private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
320         DatagramChannel handle = boundHandles.get(localAddress);
321 
322         if (handle == null) {
323             throw new IllegalArgumentException("Unknown local address: " + localAddress);
324         }
325 
326         IoSession session;
327 
328         synchronized (sessionRecycler) {
329             session = sessionRecycler.recycle(remoteAddress);
330 
331             if (session != null) {
332                 return session;
333             }
334 
335             // If a new session needs to be created.
336             NioSession newSession = newSession(this, handle, remoteAddress);
337             getSessionRecycler().put(newSession);
338             session = newSession;
339         }
340 
341         initSession(session, null, null);
342 
343         try {
344             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
345             getListeners().fireSessionCreated(session);
346         } catch (Exception e) {
347             ExceptionMonitor.getInstance().exceptionCaught(e);
348         }
349 
350         return session;
351     }
352 
353     private void flushSessions(long currentTime) {
354         for (;;) {
355             NioSession session = flushingSessions.poll();
356 
357             if (session == null) {
358                 break;
359             }
360 
361             // Reset the Schedule for flush flag for this session,
362             // as we are flushing it now
363             session.unscheduledForFlush();
364 
365             try {
366                 boolean flushedAll = flush(session, currentTime);
367 
368                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
369                     scheduleFlush(session);
370                 }
371             } catch (Exception e) {
372                 session.getFilterChain().fireExceptionCaught(e);
373             }
374         }
375     }
376 
377     private boolean flush(NioSession session, long currentTime) throws Exception {
378         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
379         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
380                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
381 
382         int writtenBytes = 0;
383 
384         try {
385             for (;;) {
386                 WriteRequest req = session.getCurrentWriteRequest();
387 
388                 if (req == null) {
389                     req = writeRequestQueue.poll(session);
390 
391                     if (req == null) {
392                         setInterestedInWrite(session, false);
393                         break;
394                     }
395 
396                     session.setCurrentWriteRequest(req);
397                 }
398 
399                 IoBuffer"../../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) req.getMessage();
400 
401                 if (buf.remaining() == 0) {
402                     // Clear and fire event
403                     session.setCurrentWriteRequest(null);
404                     buf.reset();
405                     session.getFilterChain().fireMessageSent(req);
406                     continue;
407                 }
408 
409                 SocketAddress destination = req.getDestination();
410 
411                 if (destination == null) {
412                     destination = session.getRemoteAddress();
413                 }
414 
415                 int localWrittenBytes = send(session, buf, destination);
416 
417                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
418                     // Kernel buffer is full or wrote too much
419                     setInterestedInWrite(session, true);
420 
421                     return false;
422                 } else {
423                     setInterestedInWrite(session, false);
424 
425                     // Clear and fire event
426                     session.setCurrentWriteRequest(null);
427                     writtenBytes += localWrittenBytes;
428                     buf.reset();
429                     session.getFilterChain().fireMessageSent(req);
430                 }
431             }
432         } finally {
433             session.increaseWrittenBytes(writtenBytes, currentTime);
434         }
435 
436         return true;
437     }
438 
439     private int unregisterHandles() {
440         int nHandles = 0;
441 
442         for (;;) {
443             AcceptorOperationFuture request = cancelQueue.poll();
444             if (request == null) {
445                 break;
446             }
447 
448             // close the channels
449             for (SocketAddress socketAddress : request.getLocalAddresses()) {
450                 DatagramChannel handle = boundHandles.remove(socketAddress);
451 
452                 if (handle == null) {
453                     continue;
454                 }
455 
456                 try {
457                     close(handle);
458                     wakeup(); // wake up again to trigger thread death
459                 } catch (Exception e) {
460                     ExceptionMonitor.getInstance().exceptionCaught(e);
461                 } finally {
462                     nHandles++;
463                 }
464             }
465 
466             request.setDone();
467         }
468 
469         return nHandles;
470     }
471 
472     private void notifyIdleSessions(long currentTime) {
473         // process idle sessions
474         if (currentTime - lastIdleCheckTime >= 1000) {
475             lastIdleCheckTime = currentTime;
476             AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
477         }
478     }
479 
480     /**
481      * Starts the inner Acceptor thread.
482      */
483     private void startupAcceptor() throws InterruptedException {
484         if (!selectable) {
485             registerQueue.clear();
486             cancelQueue.clear();
487             flushingSessions.clear();
488         }
489 
490         lock.acquire();
491 
492         if (acceptor == null) {
493             acceptor = new Acceptor();
494             executeWorker(acceptor);
495         } else {
496             lock.release();
497         }
498     }
499 
500     protected void init() throws Exception {
501         this.selector = Selector.open();
502     }
503 
504     /**
505      * {@inheritDoc}
506      */
507     @Override
508     public void add(NioSession session) {
509         // Nothing to do for UDP
510     }
511 
512     /**
513      * {@inheritDoc}
514      */
515     @Override
516     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
517         // Create a bind request as a Future operation. When the selector
518         // have handled the registration, it will signal this future.
519         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
520 
521         // adds the Registration request to the queue for the Workers
522         // to handle
523         registerQueue.add(request);
524 
525         // creates the Acceptor instance and has the local
526         // executor kick it off.
527         startupAcceptor();
528 
529         // As we just started the acceptor, we have to unblock the select()
530         // in order to process the bind request we just have added to the
531         // registerQueue.
532         try {
533             lock.acquire();
534 
535             // Wait a bit to give a chance to the Acceptor thread to do the select()
536             Thread.sleep(10);
537             wakeup();
538         } finally {
539             lock.release();
540         }
541 
542         // Now, we wait until this request is completed.
543         request.awaitUninterruptibly();
544 
545         if (request.getException() != null) {
546             throw request.getException();
547         }
548 
549         // Update the local addresses.
550         // setLocalAddresses() shouldn't be called from the worker thread
551         // because of deadlock.
552         Set<SocketAddress> newLocalAddresses = new HashSet<>();
553 
554         for (DatagramChannel handle : boundHandles.values()) {
555             newLocalAddresses.add(localAddress(handle));
556         }
557 
558         return newLocalAddresses;
559     }
560 
561     protected void close(DatagramChannel handle) throws Exception {
562         SelectionKey key = handle.keyFor(selector);
563 
564         if (key != null) {
565             key.cancel();
566         }
567 
568         handle.disconnect();
569         handle.close();
570     }
571 
572     protected void destroy() throws Exception {
573         if (selector != null) {
574             selector.close();
575         }
576     }
577 
578     /**
579      * {@inheritDoc}
580      */
581     @Override
582     protected void dispose0() throws Exception {
583         unbind();
584         startupAcceptor();
585         wakeup();
586     }
587 
588     /**
589      * {@inheritDoc}
590      */
591     @Override
592     public void flush(NioSession session) {
593         if (scheduleFlush(session)) {
594             wakeup();
595         }
596     }
597 
598     @Override
599     public InetSocketAddress getDefaultLocalAddress() {
600         return (InetSocketAddress) super.getDefaultLocalAddress();
601     }
602 
603     @Override
604     public InetSocketAddress getLocalAddress() {
605         return (InetSocketAddress) super.getLocalAddress();
606     }
607 
608     /**
609      * {@inheritDoc}
610      */
611     @Override
612     public DatagramSessionConfig getSessionConfig() {
613         return (DatagramSessionConfig) sessionConfig;
614     }
615 
616     @Override
617     public final IoSessionRecycler getSessionRecycler() {
618         return sessionRecycler;
619     }
620 
621     @Override
622     public TransportMetadata getTransportMetadata() {
623         return NioDatagramSession.METADATA;
624     }
625 
626     protected boolean isReadable(DatagramChannel handle) {
627         SelectionKey key = handle.keyFor(selector);
628 
629         if ((key == null) || (!key.isValid())) {
630             return false;
631         }
632 
633         return key.isReadable();
634     }
635 
636     protected boolean isWritable(DatagramChannel handle) {
637         SelectionKey key = handle.keyFor(selector);
638 
639         if ((key == null) || (!key.isValid())) {
640             return false;
641         }
642 
643         return key.isWritable();
644     }
645 
646     protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
647         InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
648         InetAddress inetAddress = inetSocketAddress.getAddress();
649 
650         if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
651             // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
652             // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
653             // ANY address in the map.
654             byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
655             byte[] ipV4Address = new byte[4];
656 
657             System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);
658 
659             InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
660             return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
661         } else {
662             return inetSocketAddress;
663         }
664     }
665 
666     protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
667             SocketAddress remoteAddress) {
668         SelectionKey key = handle.keyFor(selector);
669 
670         if ((key == null) || (!key.isValid())) {
671             return null;
672         }
673 
674         NioDatagramSessionioDatagramSession.html#NioDatagramSession">NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
675         newSession.setSelectionKey(key);
676 
677         return newSession;
678     }
679 
680     /**
681      * {@inheritDoc}
682      */
683     @Override
684     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
685         if (isDisposing()) {
686             throw new IllegalStateException("The Acceptor is being disposed.");
687         }
688 
689         if (remoteAddress == null) {
690             throw new IllegalArgumentException("remoteAddress");
691         }
692 
693         synchronized (bindLock) {
694             if (!isActive()) {
695                 throw new IllegalStateException("Can't create a session from a unbound service.");
696             }
697 
698             try {
699                 return newSessionWithoutLock(remoteAddress, localAddress);
700             } catch (RuntimeException | Error e) {
701                 throw e;
702             } catch (Exception e) {
703                 throw new RuntimeIoException("Failed to create a session.", e);
704             }
705         }
706     }
707 
708     protected DatagramChannel open(SocketAddress localAddress) throws Exception {
709         final DatagramChannel ch = DatagramChannel.open();
710         boolean success = false;
711         try {
712             new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
713             ch.configureBlocking(false);
714 
715             try {
716                 ch.socket().bind(localAddress);
717             } catch (IOException ioe) {
718                 // Add some info regarding the address we try to bind to the
719                 // message
720                 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
721                         + ioe.getMessage();
722                 Exception e = new IOException(newMessage);
723                 e.initCause(ioe.getCause());
724 
725                 // And close the channel
726                 ch.close();
727 
728                 throw e;
729             }
730 
731             ch.register(selector, SelectionKey.OP_READ);
732             success = true;
733         } finally {
734             if (!success) {
735                 close(ch);
736             }
737         }
738 
739         return ch;
740     }
741 
742     protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
743         return handle.receive(buffer.buf());
744     }
745 
746     /**
747      * {@inheritDoc}
748      */
749     @Override
750     public void remove(NioSession session) {
751         getSessionRecycler().remove(session);
752         getListeners().fireSessionDestroyed(session);
753     }
754 
755     protected int select() throws Exception {
756         return selector.select();
757     }
758 
759     protected int select(long timeout) throws Exception {
760         return selector.select(timeout);
761     }
762 
763     protected Set<SelectionKey> selectedHandles() {
764         return selector.selectedKeys();
765     }
766 
767     protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
768         return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
769     }
770 
771     @Override
772     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
773         setDefaultLocalAddress((SocketAddress) localAddress);
774     }
775 
776     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
777         SelectionKey key = session.getSelectionKey();
778 
779         if (key == null) {
780             return;
781         }
782 
783         int newInterestOps = key.interestOps();
784 
785         if (isInterested) {
786             newInterestOps |= SelectionKey.OP_WRITE;
787         } else {
788             newInterestOps &= ~SelectionKey.OP_WRITE;
789         }
790 
791         key.interestOps(newInterestOps);
792     }
793 
794     @Override
795     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
796         synchronized (bindLock) {
797             if (isActive()) {
798                 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
799             }
800 
801             if (sessionRecycler == null) {
802                 sessionRecycler = DEFAULT_RECYCLER;
803             }
804 
805             this.sessionRecycler = sessionRecycler;
806         }
807     }
808 
809     /**
810      * {@inheritDoc}
811      */
812     @Override
813     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
814         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
815 
816         cancelQueue.add(request);
817         startupAcceptor();
818         wakeup();
819 
820         request.awaitUninterruptibly();
821 
822         if (request.getException() != null) {
823             throw request.getException();
824         }
825     }
826 
827     /**
828      * {@inheritDoc}
829      */
830     @Override
831     public void updateTrafficControl(NioSession session) {
832         // Nothing to do
833     }
834 
835     protected void wakeup() {
836         selector.wakeup();
837     }
838 
839     /**
840      * {@inheritDoc}
841      */
842     @Override
843     public void write(NioSession session, WriteRequest writeRequest) {
844         // We will try to write the message directly
845         long currentTime = System.currentTimeMillis();
846         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
847         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
848                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
849 
850         int writtenBytes = 0;
851 
852         // Deal with the special case of a Message marker (no bytes in the request)
853         // We just have to return after having calle dthe messageSent event
854         IoBuffer"../../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) writeRequest.getMessage();
855 
856         if (buf.remaining() == 0) {
857             // Clear and fire event
858             session.setCurrentWriteRequest(null);
859             buf.reset();
860             session.getFilterChain().fireMessageSent(writeRequest);
861             return;
862         }
863 
864         // Now, write the data
865         try {
866             for (;;) {
867                 if (writeRequest == null) {
868                     writeRequest = writeRequestQueue.poll(session);
869 
870                     if (writeRequest == null) {
871                         setInterestedInWrite(session, false);
872                         break;
873                     }
874 
875                     session.setCurrentWriteRequest(writeRequest);
876                 }
877 
878                 buf = (IoBuffer) writeRequest.getMessage();
879 
880                 if (buf.remaining() == 0) {
881                     // Clear and fire event
882                     session.setCurrentWriteRequest(null);
883                     session.getFilterChain().fireMessageSent(writeRequest);
884                     continue;
885                 }
886 
887                 SocketAddress destination = writeRequest.getDestination();
888 
889                 if (destination == null) {
890                     destination = session.getRemoteAddress();
891                 }
892 
893                 int localWrittenBytes = send(session, buf, destination);
894 
895                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
896                     // Kernel buffer is full or wrote too much
897                     setInterestedInWrite(session, true);
898 
899                     writeRequestQueue.offer(session, writeRequest);
900                     scheduleFlush(session);
901                     
902                     break;
903                 } else {
904                     setInterestedInWrite(session, false);
905 
906                     // Clear and fire event
907                     session.setCurrentWriteRequest(null);
908                     writtenBytes += localWrittenBytes;
909                     session.getFilterChain().fireMessageSent(writeRequest);
910 
911                     break;
912                 }
913             }
914         } catch (Exception e) {
915             session.getFilterChain().fireExceptionCaught(e);
916         } finally {
917             session.increaseWrittenBytes(writtenBytes, currentTime);
918         }
919     }
920 }