View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.net.PortUnreachableException;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.mina.core.buffer.IoBuffer;
37  import org.apache.mina.core.file.FileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40  import org.apache.mina.core.future.DefaultIoFuture;
41  import org.apache.mina.core.service.AbstractIoService;
42  import org.apache.mina.core.service.IoProcessor;
43  import org.apache.mina.core.service.IoServiceListenerSupport;
44  import org.apache.mina.core.session.AbstractIoSession;
45  import org.apache.mina.core.session.IoSession;
46  import org.apache.mina.core.session.IoSessionConfig;
47  import org.apache.mina.core.session.SessionState;
48  import org.apache.mina.core.write.WriteRequest;
49  import org.apache.mina.core.write.WriteRequestQueue;
50  import org.apache.mina.core.write.WriteToClosedSessionException;
51  import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52  import org.apache.mina.util.ExceptionMonitor;
53  import org.apache.mina.util.NamePreservingRunnable;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  /**
58   * An abstract implementation of {@link IoProcessor} which helps transport
59   * developers to write an {@link IoProcessor} easily. This class is in charge of
60   * active polling a set of {@link IoSession} and trigger events when some I/O
61   * operation is possible.
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   * 
65   * @param <S>
66   *            the type of the {@link IoSession} this processor can handle
67   */
68  public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
69      /** A logger for this class */
70      private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
71  
72      /**
73       * A timeout used for the select, as we need to get out to deal with idle
74       * sessions
75       */
76      private static final long SELECT_TIMEOUT = 1000L;
77  
78      /** A map containing the last Thread ID for each class */
79      private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
80  
81      /** This IoProcessor instance name */
82      private final String threadName;
83  
84      /** The executor to use when we need to start the inner Processor */
85      private final Executor executor;
86  
87      /** A Session queue containing the newly created sessions */
88      private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
89  
90      /** A queue used to store the sessions to be removed */
91      private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
92  
93      /** A queue used to store the sessions to be flushed */
94      private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
95  
96      /**
97       * A queue used to store the sessions which have a trafficControl to be
98       * updated
99       */
100     private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();
101 
102     /** The processor thread : it handles the incoming messages */
103     private final AtomicReference<Processor> processorRef = new AtomicReference<>();
104 
105     private long lastIdleCheckTime;
106 
107     private final Object disposalLock = new Object();
108 
109     private volatile boolean disposing;
110 
111     private volatile boolean disposed;
112 
113     private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114 
115     protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116 
117     /**
118      * Create an {@link AbstractPollingIoProcessor} with the given
119      * {@link Executor} for handling I/Os events.
120      * 
121      * @param executor
122      *            the {@link Executor} for handling I/O events
123      */
124     protected AbstractPollingIoProcessor(Executor executor) {
125         if (executor == null) {
126             throw new IllegalArgumentException("executor");
127         }
128 
129         this.threadName = nextThreadName();
130         this.executor = executor;
131     }
132 
133     /**
134      * Compute the thread ID for this class instance. As we may have different
135      * classes, we store the last ID number into a Map associating the class
136      * name to the last assigned ID.
137      * 
138      * @return a name for the current thread, based on the class name and an
139      *         incremental value, starting at 1.
140      */
141     private String nextThreadName() {
142         Class<?> cls = getClass();
143         int newThreadId;
144 
145         AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146 
147         if (threadId == null) {
148             newThreadId = 1;
149         } else {
150             // Just increment the last ID, and get it.
151             newThreadId = threadId.incrementAndGet();
152         }
153 
154         // Now we can compute the name for this thread
155         return cls.getSimpleName() + '-' + newThreadId;
156     }
157 
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public final boolean isDisposing() {
163         return disposing;
164     }
165 
166     /**
167      * {@inheritDoc}
168      */
169     @Override
170     public final boolean isDisposed() {
171         return disposed;
172     }
173 
174     /**
175      * {@inheritDoc}
176      */
177     @Override
178     public final void dispose() {
179         if (disposed || disposing) {
180             return;
181         }
182 
183         synchronized (disposalLock) {
184             disposing = true;
185             startupProcessor();
186         }
187 
188         disposalFuture.awaitUninterruptibly();
189         disposed = true;
190     }
191 
192     /**
193      * Dispose the resources used by this {@link IoProcessor} for polling the
194      * client connections. The implementing class doDispose method will be
195      * called.
196      * 
197      * @throws Exception
198      *             if some low level IO error occurs
199      */
200     protected abstract void doDispose() throws Exception;
201 
202     /**
203      * poll those sessions for the given timeout
204      * 
205      * @param timeout
206      *            milliseconds before the call timeout if no event appear
207      * @return The number of session ready for read or for write
208      * @throws Exception
209      *             if some low level IO error occurs
210      */
211     protected abstract int select(long timeout) throws Exception;
212 
213     /**
214      * poll those sessions forever
215      * 
216      * @return The number of session ready for read or for write
217      * @throws Exception
218      *             if some low level IO error occurs
219      */
220     protected abstract int select() throws Exception;
221 
222     /**
223      * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
224      * is empty
225      * 
226      * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor}
227      */
228     protected abstract boolean isSelectorEmpty();
229 
230     /**
231      * Interrupt the {@link #select(long)} call.
232      */
233     protected abstract void wakeup();
234 
235     /**
236      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
237      * {@link IoProcessor}
238      * 
239      * @return {@link Iterator} of {@link IoSession}
240      */
241     protected abstract Iterator<S> allSessions();
242 
243     /**
244      * Get an {@link Iterator} for the list of {@link IoSession} found selected
245      * by the last call of {@link #select(long)}
246      * 
247      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
248      */
249     protected abstract Iterator<S> selectedSessions();
250 
251     /**
252      * Get the state of a session (One of OPENING, OPEN, CLOSING)
253      * 
254      * @param session the {@link IoSession} to inspect
255      * @return the state of the session
256      */
257     protected abstract SessionState getState(S session);
258     
259     
260     /**
261      * Tells if the session ready for writing
262      * 
263      * @param session the queried session
264      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
265      */
266     protected abstract boolean isWritable(S session);
267 
268     /**
269      * Tells if the session ready for reading
270      * 
271      * @param session the queried session
272      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
273      */
274     protected abstract boolean isReadable(S session);
275 
276     /**
277      * Set the session to be informed when a write event should be processed
278      * 
279      * @param session the session for which we want to be interested in write events
280      * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
281      * @throws Exception If there was a problem while registering the session 
282      */
283     protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
284 
285     /**
286      * Set the session to be informed when a read event should be processed
287      * 
288      * @param session the session for which we want to be interested in read events
289      * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
290      * @throws Exception If there was a problem while registering the session 
291      */
292     protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
293 
294     /**
295      * Tells if this session is registered for reading
296      * 
297      * @param session the queried session
298      * @return <tt>true</tt> is registered for reading
299      */
300     protected abstract boolean isInterestedInRead(S session);
301 
302     /**
303      * Tells if this session is registered for writing
304      * 
305      * @param session the queried session
306      * @return <tt>true</tt> is registered for writing
307      */
308     protected abstract boolean isInterestedInWrite(S session);
309 
310     /**
311      * Initialize the polling of a session. Add it to the polling process.
312      * 
313      * @param session the {@link IoSession} to add to the polling
314      * @throws Exception any exception thrown by the underlying system calls
315      */
316     protected abstract void init(S session) throws Exception;
317 
318     /**
319      * Destroy the underlying client socket handle
320      * 
321      * @param session the {@link IoSession}
322      * @throws Exception any exception thrown by the underlying system calls
323      */
324     protected abstract void destroy(S session) throws Exception;
325 
326     /**
327      * Reads a sequence of bytes from a {@link IoSession} into the given
328      * {@link IoBuffer}. Is called when the session was found ready for reading.
329      * 
330      * @param session the session to read
331      * @param buf the buffer to fill
332      * @return the number of bytes read
333      * @throws Exception any exception thrown by the underlying system calls
334      */
335     protected abstract int read(S session, IoBuffer buf) throws Exception;
336 
337     /**
338      * Write a sequence of bytes to a {@link IoSession}, means to be called when
339      * a session was found ready for writing.
340      * 
341      * @param session the session to write
342      * @param buf the buffer to write
343      * @param length the number of bytes to write can be superior to the number of
344      *            bytes remaining in the buffer
345      * @return the number of byte written
346      * @throws IOException any exception thrown by the underlying system calls
347      */
348     protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
349 
350     /**
351      * Write a part of a file to a {@link IoSession}, if the underlying API
352      * isn't supporting system calls like sendfile(), you can throw a
353      * {@link UnsupportedOperationException} so the file will be send using
354      * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
355      * 
356      * @param session the session to write
357      * @param region the file region to write
358      * @param length the length of the portion to send
359      * @return the number of written bytes
360      * @throws Exception any exception thrown by the underlying system calls
361      */
362     protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
363 
364     /**
365      * {@inheritDoc}
366      */
367     @Override
368     public final void add(S session) {
369         if (disposed || disposing) {
370             throw new IllegalStateException("Already disposed.");
371         }
372 
373         // Adds the session to the newSession queue and starts the worker
374         newSessions.add(session);
375         startupProcessor();
376     }
377 
378     /**
379      * {@inheritDoc}
380      */
381     @Override
382     public final void remove(S session) {
383         scheduleRemove(session);
384         startupProcessor();
385     }
386 
387     private void scheduleRemove(S session) {
388         if (!removingSessions.contains(session)) {
389             removingSessions.add(session);
390         }
391     }
392 
393     /**
394      * {@inheritDoc}
395      */
396     @Override
397     public void write(S session, WriteRequest writeRequest) {
398         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
399 
400         writeRequestQueue.offer(session, writeRequest);
401 
402         if (!session.isWriteSuspended()) {
403             this.flush(session);
404         }
405     }
406 
407     /**
408      * {@inheritDoc}
409      */
410     @Override
411     public final void flush(S session) {
412         // add the session to the queue if it's not already
413         // in the queue, then wake up the select()
414         if (session.setScheduledForFlush(true)) {
415             flushingSessions.add(session);
416             wakeup();
417         }
418     }
419 
420     private void scheduleFlush(S session) {
421         // add the session to the queue if it's not already
422         // in the queue
423         if (session.setScheduledForFlush(true)) {
424             flushingSessions.add(session);
425         }
426     }
427 
428     /**
429      * Updates the traffic mask for a given session
430      * 
431      * @param session the session to update
432      */
433     public final void updateTrafficMask(S session) {
434         trafficControllingSessions.add(session);
435         wakeup();
436     }
437 
438     /**
439      * Starts the inner Processor, asking the executor to pick a thread in its
440      * pool. The Runnable will be renamed
441      */
442     private void startupProcessor() {
443         Processor processor = processorRef.get();
444 
445         if (processor == null) {
446             processor = new Processor();
447 
448             if (processorRef.compareAndSet(null, processor)) {
449                 executor.execute(new NamePreservingRunnable(processor, threadName));
450             }
451         }
452 
453         // Just stop the select() and start it again, so that the processor
454         // can be activated immediately.
455         wakeup();
456     }
457 
458     /**
459      * In the case we are using the java select() method, this method is used to
460      * trash the buggy selector and create a new one, registring all the sockets
461      * on it.
462      * 
463      * @throws IOException If we got an exception
464      */
465     protected abstract void registerNewSelector() throws IOException;
466 
467     /**
468      * Check that the select() has not exited immediately just because of a
469      * broken connection. In this case, this is a standard case, and we just
470      * have to loop.
471      * 
472      * @return <tt>true</tt> if a connection has been brutally closed.
473      * @throws IOException If we got an exception
474      */
475     protected abstract boolean isBrokenConnection() throws IOException;
476 
477     /**
478      * Loops over the new sessions blocking queue and returns the number of
479      * sessions which are effectively created
480      * 
481      * @return The number of new sessions
482      */
483     private int handleNewSessions() {
484         int addedSessions = 0;
485 
486         for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
487             if (addNow(session)) {
488                 // A new session has been created
489                 addedSessions++;
490             }
491         }
492 
493         return addedSessions;
494     }
495 
496     /**
497      * Process a new session : - initialize it - create its chain - fire the
498      * CREATED listeners if any
499      * 
500      * @param session The session to create
501      * @return <tt>true</tt> if the session has been registered
502      */
503     private boolean addNow(S session) {
504         boolean registered = false;
505 
506         try {
507             init(session);
508             registered = true;
509 
510             // Build the filter chain of this session.
511             IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
512             chainBuilder.buildFilterChain(session.getFilterChain());
513 
514             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
515             // in AbstractIoFilterChain.fireSessionOpened().
516             // Propagate the SESSION_CREATED event up to the chain
517             IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
518             listeners.fireSessionCreated(session);
519         } catch (Exception e) {
520             ExceptionMonitor.getInstance().exceptionCaught(e);
521 
522             try {
523                 destroy(session);
524             } catch (Exception e1) {
525                 ExceptionMonitor.getInstance().exceptionCaught(e1);
526             } finally {
527                 registered = false;
528             }
529         }
530 
531         return registered;
532     }
533 
534     private int removeSessions() {
535         int removedSessions = 0;
536 
537         for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
538             SessionState state = getState(session);
539 
540             // Now deal with the removal accordingly to the session's state
541             switch (state) {
542                 case OPENED:
543                     // Try to remove this session
544                     if (removeNow(session)) {
545                         removedSessions++;
546                     }
547                     
548                     break;
549     
550                 case CLOSING:
551                     // Skip if channel is already closed
552                     // In any case, remove the session from the queue
553                     removedSessions++;
554                     break;
555     
556                 case OPENING:
557                     // Remove session from the newSessions queue and
558                     // remove it
559                     newSessions.remove(session);
560     
561                     if (removeNow(session)) {
562                         removedSessions++;
563                     }
564     
565                     break;
566     
567                 default:
568                     throw new IllegalStateException(String.valueOf(state));
569             }
570         }
571 
572         return removedSessions;
573     }
574 
575     private boolean removeNow(S session) {
576         clearWriteRequestQueue(session);
577 
578         try {
579             destroy(session);
580             return true;
581         } catch (Exception e) {
582             IoFilterChain filterChain = session.getFilterChain();
583             filterChain.fireExceptionCaught(e);
584         } finally {
585             try {
586                 clearWriteRequestQueue(session);
587                 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
588             } catch (Exception e) {
589                 // The session was either destroyed or not at this point.
590                 // We do not want any exception thrown from this "cleanup" code to change
591                 // the return value by bubbling up.
592                 IoFilterChain filterChain = session.getFilterChain();
593                 filterChain.fireExceptionCaught(e);
594             }
595         }
596         
597         return false;
598     }
599 
600     private void clearWriteRequestQueue(S session) {
601         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
602         WriteRequest req;
603 
604         List<WriteRequest> failedRequests = new ArrayList<>();
605 
606         if ((req = writeRequestQueue.poll(session)) != null) {
607             Object message = req.getMessage();
608 
609             if (message instanceof IoBuffer) {
610                 IoBuffer buf = (IoBuffer) message;
611 
612                 // The first unwritten empty buffer must be
613                 // forwarded to the filter chain.
614                 if (buf.hasRemaining()) {
615                     buf.reset();
616                     failedRequests.add(req);
617                 } else {
618                     IoFilterChain filterChain = session.getFilterChain();
619                     filterChain.fireMessageSent(req);
620                 }
621             } else {
622                 failedRequests.add(req);
623             }
624 
625             // Discard others.
626             while ((req = writeRequestQueue.poll(session)) != null) {
627                 failedRequests.add(req);
628             }
629         }
630 
631         // Create an exception and notify.
632         if (!failedRequests.isEmpty()) {
633             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
634 
635             for (WriteRequest r : failedRequests) {
636                 session.decreaseScheduledBytesAndMessages(r);
637                 r.getFuture().setException(cause);
638             }
639 
640             IoFilterChain filterChain = session.getFilterChain();
641             filterChain.fireExceptionCaught(cause);
642         }
643     }
644 
645     private void process() throws Exception {
646         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
647             S session = i.next();
648             process(session);
649             i.remove();
650         }
651     }
652 
653     /**
654      * Deal with session ready for the read or write operations, or both.
655      */
656     private void process(S session) {
657         // Process Reads
658         if (isReadable(session) && !session.isReadSuspended()) {
659             read(session);
660         }
661 
662         // Process writes
663         if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
664             // add the session to the queue, if it's not already there
665             flushingSessions.add(session);
666         }
667     }
668 
669     private void read(S session) {
670         IoSessionConfig config = session.getConfig();
671         int bufferSize = config.getReadBufferSize();
672         IoBuffer buf = IoBuffer.allocate(bufferSize);
673 
674         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
675 
676         try {
677             int readBytes = 0;
678             int ret;
679 
680             try {
681                 if (hasFragmentation) {
682 
683                     while ((ret = read(session, buf)) > 0) {
684                         readBytes += ret;
685 
686                         if (!buf.hasRemaining()) {
687                             break;
688                         }
689                     }
690                 } else {
691                     ret = read(session, buf);
692 
693                     if (ret > 0) {
694                         readBytes = ret;
695                     }
696                 }
697             } finally {
698                 buf.flip();
699             }
700 
701             if (readBytes > 0) {
702                 IoFilterChain filterChain = session.getFilterChain();
703                 filterChain.fireMessageReceived(buf);
704                 buf = null;
705 
706                 if (hasFragmentation) {
707                     if (readBytes << 1 < config.getReadBufferSize()) {
708                         session.decreaseReadBufferSize();
709                     } else if (readBytes == config.getReadBufferSize()) {
710                         session.increaseReadBufferSize();
711                     }
712                 }
713             }
714 
715             if (ret < 0) {
716                 IoFilterChain filterChain = session.getFilterChain();
717                 filterChain.fireInputClosed();
718             }
719         } catch (Exception e) {
720             if (e instanceof IOException) {
721                 if (!(e instanceof PortUnreachableException)
722                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
723                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
724                     scheduleRemove(session);
725                 }
726             }
727 
728             IoFilterChain filterChain = session.getFilterChain();
729             filterChain.fireExceptionCaught(e);
730         }
731     }
732 
733     private void notifyIdleSessions(long currentTime) throws Exception {
734         // process idle sessions
735         if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
736             lastIdleCheckTime = currentTime;
737             AbstractIoSession.notifyIdleness(allSessions(), currentTime);
738         }
739     }
740 
741     /**
742      * Write all the pending messages
743      */
744     private void flush(long currentTime) {
745         if (flushingSessions.isEmpty()) {
746             return;
747         }
748 
749         do {
750             S session = flushingSessions.poll(); // the same one with
751                                                  // firstSession
752 
753             if (session == null) {
754                 // Just in case ... It should not happen.
755                 break;
756             }
757 
758             // Reset the Schedule for flush flag for this session,
759             // as we are flushing it now
760             session.unscheduledForFlush();
761 
762             SessionState state = getState(session);
763 
764             switch (state) {
765                 case OPENED:
766                     try {
767                         boolean flushedAll = flushNow(session, currentTime);
768     
769                         if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
770                                 && !session.isScheduledForFlush()) {
771                             scheduleFlush(session);
772                         }
773                     } catch (Exception e) {
774                         scheduleRemove(session);
775                         session.closeNow();
776                         IoFilterChain filterChain = session.getFilterChain();
777                         filterChain.fireExceptionCaught(e);
778                     }
779     
780                     break;
781     
782                 case CLOSING:
783                     // Skip if the channel is already closed.
784                     break;
785     
786                 case OPENING:
787                     // Retry later if session is not yet fully initialized.
788                     // (In case that Session.write() is called before addSession()
789                     // is processed)
790                     scheduleFlush(session);
791                     return;
792     
793                 default:
794                     throw new IllegalStateException(String.valueOf(state));
795             }
796 
797         } while (!flushingSessions.isEmpty());
798     }
799 
800     private boolean flushNow(S session, long currentTime) {
801         if (!session.isConnected()) {
802             scheduleRemove(session);
803             return false;
804         }
805 
806         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
807 
808         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
809 
810         // Set limitation for the number of written bytes for read-write
811         // fairness. I used maxReadBufferSize * 3 / 2, which yields best
812         // performance in my experience while not breaking fairness much.
813         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
814                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
815         int writtenBytes = 0;
816         WriteRequest req = null;
817 
818         try {
819             // Clear OP_WRITE
820             setInterestedInWrite(session, false);
821 
822             do {
823                 // Check for pending writes.
824                 req = session.getCurrentWriteRequest();
825 
826                 if (req == null) {
827                     req = writeRequestQueue.poll(session);
828 
829                     if (req == null) {
830                         break;
831                     }
832 
833                     session.setCurrentWriteRequest(req);
834                 }
835 
836                 int localWrittenBytes;
837                 Object message = req.getMessage();
838 
839                 if (message instanceof IoBuffer) {
840                     localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
841                             currentTime);
842 
843                     if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
844                         // the buffer isn't empty, we re-interest it in writing
845                         writtenBytes += localWrittenBytes;
846                         setInterestedInWrite(session, true);
847                         return false;
848                     }
849                 } else if (message instanceof FileRegion) {
850                     localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
851                             currentTime);
852 
853                     // Fix for Java bug on Linux
854                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
855                     // If there's still data to be written in the FileRegion,
856                     // return 0 indicating that we need
857                     // to pause until writing may resume.
858                     if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
859                         writtenBytes += localWrittenBytes;
860                         setInterestedInWrite(session, true);
861                         return false;
862                     }
863                 } else {
864                     throw new IllegalStateException("Don't know how to handle message of type '"
865                             + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
866                 }
867 
868                 if (localWrittenBytes == 0) {
869 
870                     // Kernel buffer is full.
871                     if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
872                         setInterestedInWrite(session, true);
873                         return false;
874                     }
875                 } else {
876                     writtenBytes += localWrittenBytes;
877     
878                     if (writtenBytes >= maxWrittenBytes) {
879                         // Wrote too much
880                         scheduleFlush(session);
881                         return false;
882                     }
883                 }
884 
885                 if (message instanceof IoBuffer) {
886                     ((IoBuffer) message).free();
887                 }
888             } while (writtenBytes < maxWrittenBytes);
889         } catch (Exception e) {
890             if (req != null) {
891                 req.getFuture().setException(e);
892             }
893 
894             IoFilterChain filterChain = session.getFilterChain();
895             filterChain.fireExceptionCaught(e);
896             return false;
897         }
898 
899         return true;
900     }
901 
902     private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
903             throws Exception {
904         IoBuffer buf = (IoBuffer) req.getMessage();
905         int localWrittenBytes = 0;
906 
907         if (buf.hasRemaining()) {
908             int length;
909 
910             if (hasFragmentation) {
911                 length = Math.min(buf.remaining(), maxLength);
912             } else {
913                 length = buf.remaining();
914             }
915 
916             try {
917                 localWrittenBytes = write(session, buf, length);
918             } catch (IOException ioe) {
919                 // We have had an issue while trying to send data to the
920                 // peer : let's close the session.
921                 buf.free();
922                 session.closeNow();
923                 removeNow(session);
924 
925                 return 0;
926             }
927         }
928 
929         session.increaseWrittenBytes(localWrittenBytes, currentTime);
930         
931         // Now, forward the original message
932         if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
933             // Buffer has been sent, clear the current request.
934             Object originalMessage = req.getOriginalRequest().getMessage();
935 
936             if (originalMessage instanceof IoBuffer) {
937                 buf = ((IoBuffer)req.getOriginalRequest().getMessage());
938 
939                 int pos = buf.position();
940                 buf.reset();
941                 fireMessageSent(session, req);
942                 // And set it back to its position
943                 buf.position(pos);
944             } else {
945                 fireMessageSent(session, req);
946             }
947         }
948 
949         return localWrittenBytes;
950     }
951 
952     private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
953             throws Exception {
954         int localWrittenBytes;
955         FileRegion region = (FileRegion) req.getMessage();
956 
957         if (region.getRemainingBytes() > 0) {
958             int length;
959 
960             if (hasFragmentation) {
961                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
962             } else {
963                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
964             }
965 
966             localWrittenBytes = transferFile(session, region, length);
967             region.update(localWrittenBytes);
968         } else {
969             localWrittenBytes = 0;
970         }
971 
972         session.increaseWrittenBytes(localWrittenBytes, currentTime);
973 
974         if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
975             fireMessageSent(session, req);
976         }
977 
978         return localWrittenBytes;
979     }
980 
981     private void fireMessageSent(S session, WriteRequest req) {
982         session.setCurrentWriteRequest(null);
983         IoFilterChain filterChain = session.getFilterChain();
984         filterChain.fireMessageSent(req);
985     }
986 
987     /**
988      * Update the trafficControl for all the session.
989      */
990     private void updateTrafficMask() {
991         int queueSize = trafficControllingSessions.size();
992 
993         while (queueSize > 0) {
994             S session = trafficControllingSessions.poll();
995 
996             if (session == null) {
997                 // We are done with this queue.
998                 return;
999             }
1000 
1001             SessionState state = getState(session);
1002 
1003             switch (state) {
1004             case OPENED:
1005                 updateTrafficControl(session);
1006 
1007                 break;
1008 
1009             case CLOSING:
1010                 break;
1011 
1012             case OPENING:
1013                 // Retry later if session is not yet fully initialized.
1014                 // (In case that Session.suspend??() or session.resume??() is
1015                 // called before addSession() is processed)
1016                 // We just put back the session at the end of the queue.
1017                 trafficControllingSessions.add(session);
1018                 break;
1019 
1020             default:
1021                 throw new IllegalStateException(String.valueOf(state));
1022             }
1023 
1024             // As we have handled one session, decrement the number of
1025             // remaining sessions. The OPENING session will be processed
1026             // with the next select(), as the queue size has been decreased,
1027             // even
1028             // if the session has been pushed at the end of the queue
1029             queueSize--;
1030         }
1031     }
1032 
1033     /**
1034      * {@inheritDoc}
1035      */
1036     @Override
1037     public void updateTrafficControl(S session) {
1038         //
1039         try {
1040             setInterestedInRead(session, !session.isReadSuspended());
1041         } catch (Exception e) {
1042             IoFilterChain filterChain = session.getFilterChain();
1043             filterChain.fireExceptionCaught(e);
1044         }
1045 
1046         try {
1047             setInterestedInWrite(session,
1048                     !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1049         } catch (Exception e) {
1050             IoFilterChain filterChain = session.getFilterChain();
1051             filterChain.fireExceptionCaught(e);
1052         }
1053     }
1054 
1055     /**
1056      * The main loop. This is the place in charge to poll the Selector, and to
1057      * process the active sessions. It's done in - handle the newly created
1058      * sessions -
1059      */
1060     private class Processor implements Runnable {
1061         public void run() {
1062             assert (processorRef.get() == this);
1063 
1064             int nSessions = 0;
1065             lastIdleCheckTime = System.currentTimeMillis();
1066             int nbTries = 10;
1067 
1068             for (;;) {
1069                 try {
1070                     // This select has a timeout so that we can manage
1071                     // idle session when we get out of the select every
1072                     // second. (note : this is a hack to avoid creating
1073                     // a dedicated thread).
1074                     long t0 = System.currentTimeMillis();
1075                     int selected = select(SELECT_TIMEOUT);
1076                     long t1 = System.currentTimeMillis();
1077                     long delta = t1 - t0;
1078 
1079                     if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
1080                         // Last chance : the select() may have been
1081                         // interrupted because we have had an closed channel.
1082                         if (isBrokenConnection()) {
1083                             LOG.warn("Broken connection");
1084                         } else {
1085                             // Ok, we are hit by the nasty epoll
1086                             // spinning.
1087                             // Basically, there is a race condition
1088                             // which causes a closing file descriptor not to be
1089                             // considered as available as a selected channel,
1090                             // but
1091                             // it stopped the select. The next time we will
1092                             // call select(), it will exit immediately for the
1093                             // same
1094                             // reason, and do so forever, consuming 100%
1095                             // CPU.
1096                             // We have to destroy the selector, and
1097                             // register all the socket on a new one.
1098                             if (nbTries == 0) {
1099                                 LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
1100                                 registerNewSelector();
1101                                 nbTries = 10;
1102                             } else {
1103                                 nbTries--;
1104                             }
1105                         }
1106                     } else {
1107                         nbTries = 10;
1108                     }
1109 
1110                     // Manage newly created session first
1111                     nSessions += handleNewSessions();
1112 
1113                     updateTrafficMask();
1114 
1115                     // Now, if we have had some incoming or outgoing events,
1116                     // deal with them
1117                     if (selected > 0) {
1118                         // LOG.debug("Processing ..."); // This log hurts one of
1119                         // the MDCFilter test...
1120                         process();
1121                     }
1122 
1123                     // Write the pending requests
1124                     long currentTime = System.currentTimeMillis();
1125                     flush(currentTime);
1126 
1127                     // And manage removed sessions
1128                     nSessions -= removeSessions();
1129 
1130                     // Last, not least, send Idle events to the idle sessions
1131                     notifyIdleSessions(currentTime);
1132 
1133                     // Get a chance to exit the infinite loop if there are no
1134                     // more sessions on this Processor
1135                     if (nSessions == 0) {
1136                         processorRef.set(null);
1137 
1138                         if (newSessions.isEmpty() && isSelectorEmpty()) {
1139                             // newSessions.add() precedes startupProcessor
1140                             assert (processorRef.get() != this);
1141                             break;
1142                         }
1143 
1144                         assert (processorRef.get() != this);
1145 
1146                         if (!processorRef.compareAndSet(null, this)) {
1147                             // startupProcessor won race, so must exit processor
1148                             assert (processorRef.get() != this);
1149                             break;
1150                         }
1151 
1152                         assert (processorRef.get() == this);
1153                     }
1154 
1155                     // Disconnect all sessions immediately if disposal has been
1156                     // requested so that we exit this loop eventually.
1157                     if (isDisposing()) {
1158                         boolean hasKeys = false;
1159                         
1160                         for (Iterator<S> i = allSessions(); i.hasNext();) {
1161                             IoSession session = i.next();
1162                             
1163                             if (session.isActive()) {
1164                                 scheduleRemove((S)session);
1165                                 hasKeys = true;
1166                             }
1167                         }
1168 
1169                         if (hasKeys) {
1170                             wakeup();
1171                         }
1172                     }
1173                 } catch (ClosedSelectorException cse) {
1174                     // If the selector has been closed, we can exit the loop
1175                     // But first, dump a stack trace
1176                     ExceptionMonitor.getInstance().exceptionCaught(cse);
1177                     break;
1178                 } catch (Exception e) {
1179                     ExceptionMonitor.getInstance().exceptionCaught(e);
1180 
1181                     try {
1182                         Thread.sleep(1000);
1183                     } catch (InterruptedException e1) {
1184                         ExceptionMonitor.getInstance().exceptionCaught(e1);
1185                     }
1186                 }
1187             }
1188 
1189             try {
1190                 synchronized (disposalLock) {
1191                     if (disposing) {
1192                         doDispose();
1193                     }
1194                 }
1195             } catch (Exception e) {
1196                 ExceptionMonitor.getInstance().exceptionCaught(e);
1197             } finally {
1198                 disposalFuture.setValue(true);
1199             }
1200         }
1201     }
1202 }