View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.net.PortUnreachableException;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.mina.core.buffer.IoBuffer;
37  import org.apache.mina.core.file.FileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40  import org.apache.mina.core.future.DefaultIoFuture;
41  import org.apache.mina.core.service.AbstractIoService;
42  import org.apache.mina.core.service.IoProcessor;
43  import org.apache.mina.core.service.IoServiceListenerSupport;
44  import org.apache.mina.core.session.AbstractIoSession;
45  import org.apache.mina.core.session.IoSession;
46  import org.apache.mina.core.session.IoSessionConfig;
47  import org.apache.mina.core.session.SessionState;
48  import org.apache.mina.core.write.WriteRequest;
49  import org.apache.mina.core.write.WriteRequestQueue;
50  import org.apache.mina.core.write.WriteToClosedSessionException;
51  import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52  import org.apache.mina.util.ExceptionMonitor;
53  import org.apache.mina.util.NamePreservingRunnable;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  /**
58   * An abstract implementation of {@link IoProcessor} which helps transport
59   * developers to write an {@link IoProcessor} easily. This class is in charge of
60   * active polling a set of {@link IoSession} and trigger events when some I/O
61   * operation is possible.
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   * 
65   * @param <S>
66   *            the type of the {@link IoSession} this processor can handle
67   */
68  public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
69      /** A logger for this class */
70      private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
71  
72      /**
73       * A timeout used for the select, as we need to get out to deal with idle
74       * sessions
75       */
76      private static final long SELECT_TIMEOUT = 1000L;
77  
78      /** A map containing the last Thread ID for each class */
79      private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
80  
81      /** This IoProcessor instance name */
82      private final String threadName;
83  
84      /** The executor to use when we need to start the inner Processor */
85      private final Executor executor;
86  
87      /** A Session queue containing the newly created sessions */
88      private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
89  
90      /** A queue used to store the sessions to be removed */
91      private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
92  
93      /** A queue used to store the sessions to be flushed */
94      private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
95  
96      /**
97       * A queue used to store the sessions which have a trafficControl to be
98       * updated
99       */
100     private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
101 
102     /** The processor thread : it handles the incoming messages */
103     private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
104 
105     private long lastIdleCheckTime;
106 
107     private final Object disposalLock = new Object();
108 
109     private volatile boolean disposing;
110 
111     private volatile boolean disposed;
112 
113     private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114 
115     protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116 
117     /**
118      * Create an {@link AbstractPollingIoProcessor} with the given
119      * {@link Executor} for handling I/Os events.
120      * 
121      * @param executor
122      *            the {@link Executor} for handling I/O events
123      */
124     protected AbstractPollingIoProcessor(Executor executor) {
125         if (executor == null) {
126             throw new IllegalArgumentException("executor");
127         }
128 
129         this.threadName = nextThreadName();
130         this.executor = executor;
131     }
132 
133     /**
134      * Compute the thread ID for this class instance. As we may have different
135      * classes, we store the last ID number into a Map associating the class
136      * name to the last assigned ID.
137      * 
138      * @return a name for the current thread, based on the class name and an
139      *         incremental value, starting at 1.
140      */
141     private String nextThreadName() {
142         Class<?> cls = getClass();
143         int newThreadId;
144 
145         AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146 
147         if (threadId == null) {
148             newThreadId = 1;
149         } else {
150             // Just increment the last ID, and get it.
151             newThreadId = threadId.incrementAndGet();
152         }
153 
154         // Now we can compute the name for this thread
155         return cls.getSimpleName() + '-' + newThreadId;
156     }
157 
158     /**
159      * {@inheritDoc}
160      */
161     public final boolean isDisposing() {
162         return disposing;
163     }
164 
165     /**
166      * {@inheritDoc}
167      */
168     public final boolean isDisposed() {
169         return disposed;
170     }
171 
172     /**
173      * {@inheritDoc}
174      */
175     public final void dispose() {
176         if (disposed || disposing) {
177             return;
178         }
179 
180         synchronized (disposalLock) {
181             disposing = true;
182             startupProcessor();
183         }
184 
185         disposalFuture.awaitUninterruptibly();
186         disposed = true;
187     }
188 
189     /**
190      * Dispose the resources used by this {@link IoProcessor} for polling the
191      * client connections. The implementing class doDispose method will be
192      * called.
193      * 
194      * @throws Exception
195      *             if some low level IO error occurs
196      */
197     protected abstract void doDispose() throws Exception;
198 
199     /**
200      * poll those sessions for the given timeout
201      * 
202      * @param timeout
203      *            milliseconds before the call timeout if no event appear
204      * @return The number of session ready for read or for write
205      * @throws Exception
206      *             if some low level IO error occurs
207      */
208     protected abstract int select(long timeout) throws Exception;
209 
210     /**
211      * poll those sessions forever
212      * 
213      * @return The number of session ready for read or for write
214      * @throws Exception
215      *             if some low level IO error occurs
216      */
217     protected abstract int select() throws Exception;
218 
219     /**
220      * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
221      * is empty
222      * 
223      * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor}
224      */
225     protected abstract boolean isSelectorEmpty();
226 
227     /**
228      * Interrupt the {@link #select(long)} call.
229      */
230     protected abstract void wakeup();
231 
232     /**
233      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
234      * {@link IoProcessor}
235      * 
236      * @return {@link Iterator} of {@link IoSession}
237      */
238     protected abstract Iterator<S> allSessions();
239 
240     /**
241      * Get an {@link Iterator} for the list of {@link IoSession} found selected
242      * by the last call of {@link #select(long)}
243      * 
244      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
245      */
246     protected abstract Iterator<S> selectedSessions();
247 
248     /**
249      * Get the state of a session (One of OPENING, OPEN, CLOSING)
250      * 
251      * @param session the {@link IoSession} to inspect
252      * @return the state of the session
253      */
254     protected abstract SessionState getState(S session);
255 
256     /**
257      * Tells if the session ready for writing
258      * 
259      * @param session the queried session
260      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
261      */
262     protected abstract boolean isWritable(S session);
263 
264     /**
265      * Tells if the session ready for reading
266      * 
267      * @param session the queried session
268      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
269      */
270     protected abstract boolean isReadable(S session);
271 
272     /**
273      * Set the session to be informed when a write event should be processed
274      * 
275      * @param session the session for which we want to be interested in write events
276      * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
277      * @throws Exception If there was a problem while registering the session 
278      */
279     protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
280 
281     /**
282      * Set the session to be informed when a read event should be processed
283      * 
284      * @param session the session for which we want to be interested in read events
285      * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
286      * @throws Exception If there was a problem while registering the session 
287      */
288     protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
289 
290     /**
291      * Tells if this session is registered for reading
292      * 
293      * @param session the queried session
294      * @return <tt>true</tt> is registered for reading
295      */
296     protected abstract boolean isInterestedInRead(S session);
297 
298     /**
299      * Tells if this session is registered for writing
300      * 
301      * @param session the queried session
302      * @return <tt>true</tt> is registered for writing
303      */
304     protected abstract boolean isInterestedInWrite(S session);
305 
306     /**
307      * Initialize the polling of a session. Add it to the polling process.
308      * 
309      * @param session the {@link IoSession} to add to the polling
310      * @throws Exception any exception thrown by the underlying system calls
311      */
312     protected abstract void init(S session) throws Exception;
313 
314     /**
315      * Destroy the underlying client socket handle
316      * 
317      * @param session the {@link IoSession}
318      * @throws Exception any exception thrown by the underlying system calls
319      */
320     protected abstract void destroy(S session) throws Exception;
321 
322     /**
323      * Reads a sequence of bytes from a {@link IoSession} into the given
324      * {@link IoBuffer}. Is called when the session was found ready for reading.
325      * 
326      * @param session the session to read
327      * @param buf the buffer to fill
328      * @return the number of bytes read
329      * @throws Exception any exception thrown by the underlying system calls
330      */
331     protected abstract int read(S session, IoBuffer buf) throws Exception;
332 
333     /**
334      * Write a sequence of bytes to a {@link IoSession}, means to be called when
335      * a session was found ready for writing.
336      * 
337      * @param session the session to write
338      * @param buf the buffer to write
339      * @param length the number of bytes to write can be superior to the number of
340      *            bytes remaining in the buffer
341      * @return the number of byte written
342      * @throws Exception any exception thrown by the underlying system calls
343      */
344     protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
345 
346     /**
347      * Write a part of a file to a {@link IoSession}, if the underlying API
348      * isn't supporting system calls like sendfile(), you can throw a
349      * {@link UnsupportedOperationException} so the file will be send using
350      * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
351      * 
352      * @param session the session to write
353      * @param region the file region to write
354      * @param length the length of the portion to send
355      * @return the number of written bytes
356      * @throws Exception any exception thrown by the underlying system calls
357      */
358     protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
359 
360     /**
361      * {@inheritDoc}
362      */
363     public final void add(S session) {
364         if (disposed || disposing) {
365             throw new IllegalStateException("Already disposed.");
366         }
367 
368         // Adds the session to the newSession queue and starts the worker
369         newSessions.add(session);
370         startupProcessor();
371     }
372 
373     /**
374      * {@inheritDoc}
375      */
376     public final void remove(S session) {
377         scheduleRemove(session);
378         startupProcessor();
379     }
380 
381     private void scheduleRemove(S session) {
382         if (!removingSessions.contains(session)) {
383             removingSessions.add(session);
384         }
385     }
386 
387     /**
388      * {@inheritDoc}
389      */
390     public void write(S session, WriteRequest writeRequest) {
391         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
392 
393         writeRequestQueue.offer(session, writeRequest);
394 
395         if (!session.isWriteSuspended()) {
396             this.flush(session);
397         }
398     }
399 
400     /**
401      * {@inheritDoc}
402      */
403     public final void flush(S session) {
404         // add the session to the queue if it's not already
405         // in the queue, then wake up the select()
406         if (session.setScheduledForFlush(true)) {
407             flushingSessions.add(session);
408             wakeup();
409         }
410     }
411 
412     private void scheduleFlush(S session) {
413         // add the session to the queue if it's not already
414         // in the queue
415         if (session.setScheduledForFlush(true)) {
416             flushingSessions.add(session);
417         }
418     }
419 
420     /**
421      * Updates the traffic mask for a given session
422      * 
423      * @param session the session to update
424      */
425     public final void updateTrafficMask(S session) {
426         trafficControllingSessions.add(session);
427         wakeup();
428     }
429 
430     /**
431      * Starts the inner Processor, asking the executor to pick a thread in its
432      * pool. The Runnable will be renamed
433      */
434     private void startupProcessor() {
435         Processor processor = processorRef.get();
436 
437         if (processor == null) {
438             processor = new Processor();
439 
440             if (processorRef.compareAndSet(null, processor)) {
441                 executor.execute(new NamePreservingRunnable(processor, threadName));
442             }
443         }
444 
445         // Just stop the select() and start it again, so that the processor
446         // can be activated immediately.
447         wakeup();
448     }
449 
450     /**
451      * In the case we are using the java select() method, this method is used to
452      * trash the buggy selector and create a new one, registring all the sockets
453      * on it.
454      * 
455      * @throws IOException If we got an exception
456      */
457     abstract protected void registerNewSelector() throws IOException;
458 
459     /**
460      * Check that the select() has not exited immediately just because of a
461      * broken connection. In this case, this is a standard case, and we just
462      * have to loop.
463      * 
464      * @return <tt>true</tt> if a connection has been brutally closed.
465      * @throws IOException If we got an exception
466      */
467     abstract protected boolean isBrokenConnection() throws IOException;
468 
469     /**
470      * Loops over the new sessions blocking queue and returns the number of
471      * sessions which are effectively created
472      * 
473      * @return The number of new sessions
474      */
475     private int handleNewSessions() {
476         int addedSessions = 0;
477 
478         for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
479             if (addNow(session)) {
480                 // A new session has been created
481                 addedSessions++;
482             }
483         }
484 
485         return addedSessions;
486     }
487 
488     /**
489      * Process a new session : - initialize it - create its chain - fire the
490      * CREATED listeners if any
491      * 
492      * @param session The session to create
493      * @return <tt>true</tt> if the session has been registered
494      */
495     private boolean addNow(S session) {
496         boolean registered = false;
497 
498         try {
499             init(session);
500             registered = true;
501 
502             // Build the filter chain of this session.
503             IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
504             chainBuilder.buildFilterChain(session.getFilterChain());
505 
506             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
507             // in AbstractIoFilterChain.fireSessionOpened().
508             // Propagate the SESSION_CREATED event up to the chain
509             IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
510             listeners.fireSessionCreated(session);
511         } catch (Exception e) {
512             ExceptionMonitor.getInstance().exceptionCaught(e);
513 
514             try {
515                 destroy(session);
516             } catch (Exception e1) {
517                 ExceptionMonitor.getInstance().exceptionCaught(e1);
518             } finally {
519                 registered = false;
520             }
521         }
522 
523         return registered;
524     }
525 
526     private int removeSessions() {
527         int removedSessions = 0;
528 
529         for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
530             SessionState state = getState(session);
531 
532             // Now deal with the removal accordingly to the session's state
533             switch (state) {
534                 case OPENED:
535                     // Try to remove this session
536                     if (removeNow(session)) {
537                         removedSessions++;
538                     }
539                     
540                     break;
541     
542                 case CLOSING:
543                     // Skip if channel is already closed
544                     // In any case, remove the session from the queue
545                     removedSessions++;
546                     break;
547     
548                 case OPENING:
549                     // Remove session from the newSessions queue and
550                     // remove it
551                     newSessions.remove(session);
552     
553                     if (removeNow(session)) {
554                         removedSessions++;
555                     }
556     
557                     break;
558     
559                 default:
560                     throw new IllegalStateException(String.valueOf(state));
561             }
562         }
563 
564         return removedSessions;
565     }
566 
567     private boolean removeNow(S session) {
568         clearWriteRequestQueue(session);
569 
570         try {
571             destroy(session);
572             return true;
573         } catch (Exception e) {
574             IoFilterChain filterChain = session.getFilterChain();
575             filterChain.fireExceptionCaught(e);
576         } finally {
577             try {
578                 clearWriteRequestQueue(session);
579                 ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
580             } catch (Exception e) {
581                 // The session was either destroyed or not at this point.
582                 // We do not want any exception thrown from this "cleanup" code to change
583                 // the return value by bubbling up.
584                 IoFilterChain filterChain = session.getFilterChain();
585                 filterChain.fireExceptionCaught(e);
586             }
587         }
588         
589         return false;
590     }
591 
592     private void clearWriteRequestQueue(S session) {
593         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
594         WriteRequest req;
595 
596         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
597 
598         if ((req = writeRequestQueue.poll(session)) != null) {
599             Object message = req.getMessage();
600 
601             if (message instanceof IoBuffer) {
602                 IoBuffer buf = (IoBuffer) message;
603 
604                 // The first unwritten empty buffer must be
605                 // forwarded to the filter chain.
606                 if (buf.hasRemaining()) {
607                     buf.reset();
608                     failedRequests.add(req);
609                 } else {
610                     IoFilterChain filterChain = session.getFilterChain();
611                     filterChain.fireMessageSent(req);
612                 }
613             } else {
614                 failedRequests.add(req);
615             }
616 
617             // Discard others.
618             while ((req = writeRequestQueue.poll(session)) != null) {
619                 failedRequests.add(req);
620             }
621         }
622 
623         // Create an exception and notify.
624         if (!failedRequests.isEmpty()) {
625             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
626 
627             for (WriteRequest r : failedRequests) {
628                 session.decreaseScheduledBytesAndMessages(r);
629                 r.getFuture().setException(cause);
630             }
631 
632             IoFilterChain filterChain = session.getFilterChain();
633             filterChain.fireExceptionCaught(cause);
634         }
635     }
636 
637     private void process() throws Exception {
638         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
639             S session = i.next();
640             process(session);
641             i.remove();
642         }
643     }
644 
645     /**
646      * Deal with session ready for the read or write operations, or both.
647      */
648     private void process(S session) {
649         // Process Reads
650         if (isReadable(session) && !session.isReadSuspended()) {
651             read(session);
652         }
653 
654         // Process writes
655         if (isWritable(session) && !session.isWriteSuspended()) {
656             // add the session to the queue, if it's not already there
657             if (session.setScheduledForFlush(true)) {
658                 flushingSessions.add(session);
659             }
660         }
661     }
662 
663     private void read(S session) {
664         IoSessionConfig config = session.getConfig();
665         int bufferSize = config.getReadBufferSize();
666         IoBuffer buf = IoBuffer.allocate(bufferSize);
667 
668         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
669 
670         try {
671             int readBytes = 0;
672             int ret;
673 
674             try {
675                 if (hasFragmentation) {
676 
677                     while ((ret = read(session, buf)) > 0) {
678                         readBytes += ret;
679 
680                         if (!buf.hasRemaining()) {
681                             break;
682                         }
683                     }
684                 } else {
685                     ret = read(session, buf);
686 
687                     if (ret > 0) {
688                         readBytes = ret;
689                     }
690                 }
691             } finally {
692                 buf.flip();
693             }
694 
695             if (readBytes > 0) {
696                 IoFilterChain filterChain = session.getFilterChain();
697                 filterChain.fireMessageReceived(buf);
698                 buf = null;
699 
700                 if (hasFragmentation) {
701                     if (readBytes << 1 < config.getReadBufferSize()) {
702                         session.decreaseReadBufferSize();
703                     } else if (readBytes == config.getReadBufferSize()) {
704                         session.increaseReadBufferSize();
705                     }
706                 }
707             }
708 
709             if (ret < 0) {
710                 // scheduleRemove(session);
711                 IoFilterChain filterChain = session.getFilterChain();
712                 filterChain.fireInputClosed();
713             }
714         } catch (Exception e) {
715             if (e instanceof IOException) {
716                 if (!(e instanceof PortUnreachableException)
717                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
718                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
719                     scheduleRemove(session);
720                 }
721             }
722 
723             IoFilterChain filterChain = session.getFilterChain();
724             filterChain.fireExceptionCaught(e);
725         }
726     }
727 
728     private void notifyIdleSessions(long currentTime) throws Exception {
729         // process idle sessions
730         if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
731             lastIdleCheckTime = currentTime;
732             AbstractIoSession.notifyIdleness(allSessions(), currentTime);
733         }
734     }
735 
736     /**
737      * Write all the pending messages
738      */
739     private void flush(long currentTime) {
740         if (flushingSessions.isEmpty()) {
741             return;
742         }
743 
744         do {
745             S session = flushingSessions.poll(); // the same one with
746                                                  // firstSession
747 
748             if (session == null) {
749                 // Just in case ... It should not happen.
750                 break;
751             }
752 
753             // Reset the Schedule for flush flag for this session,
754             // as we are flushing it now
755             session.unscheduledForFlush();
756 
757             SessionState state = getState(session);
758 
759             switch (state) {
760             case OPENED:
761                 try {
762                     boolean flushedAll = flushNow(session, currentTime);
763 
764                     if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
765                             && !session.isScheduledForFlush()) {
766                         scheduleFlush(session);
767                     }
768                 } catch (Exception e) {
769                     scheduleRemove(session);
770                     session.close(true);
771                     IoFilterChain filterChain = session.getFilterChain();
772                     filterChain.fireExceptionCaught(e);
773                 }
774 
775                 break;
776 
777             case CLOSING:
778                 // Skip if the channel is already closed.
779                 break;
780 
781             case OPENING:
782                 // Retry later if session is not yet fully initialized.
783                 // (In case that Session.write() is called before addSession()
784                 // is processed)
785                 scheduleFlush(session);
786                 return;
787 
788             default:
789                 throw new IllegalStateException(String.valueOf(state));
790             }
791 
792         } while (!flushingSessions.isEmpty());
793     }
794 
795     private boolean flushNow(S session, long currentTime) {
796         if (!session.isConnected()) {
797             scheduleRemove(session);
798             return false;
799         }
800 
801         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
802 
803         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
804 
805         // Set limitation for the number of written bytes for read-write
806         // fairness. I used maxReadBufferSize * 3 / 2, which yields best
807         // performance in my experience while not breaking fairness much.
808         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
809                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
810         int writtenBytes = 0;
811         WriteRequest req = null;
812 
813         try {
814             // Clear OP_WRITE
815             setInterestedInWrite(session, false);
816 
817             do {
818                 // Check for pending writes.
819                 req = session.getCurrentWriteRequest();
820 
821                 if (req == null) {
822                     req = writeRequestQueue.poll(session);
823 
824                     if (req == null) {
825                         break;
826                     }
827 
828                     session.setCurrentWriteRequest(req);
829                 }
830 
831                 int localWrittenBytes = 0;
832                 Object message = req.getMessage();
833 
834                 if (message instanceof IoBuffer) {
835                     localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
836                             currentTime);
837 
838                     if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
839                         // the buffer isn't empty, we re-interest it in writing
840                         writtenBytes += localWrittenBytes;
841                         setInterestedInWrite(session, true);
842                         return false;
843                     }
844                 } else if (message instanceof FileRegion) {
845                     localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
846                             currentTime);
847 
848                     // Fix for Java bug on Linux
849                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
850                     // If there's still data to be written in the FileRegion,
851                     // return 0 indicating that we need
852                     // to pause until writing may resume.
853                     if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
854                         writtenBytes += localWrittenBytes;
855                         setInterestedInWrite(session, true);
856                         return false;
857                     }
858                 } else {
859                     throw new IllegalStateException("Don't know how to handle message of type '"
860                             + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
861                 }
862 
863                 if (localWrittenBytes == 0) {
864                     // Kernel buffer is full.
865                     setInterestedInWrite(session, true);
866                     return false;
867                 }
868 
869                 writtenBytes += localWrittenBytes;
870 
871                 if (writtenBytes >= maxWrittenBytes) {
872                     // Wrote too much
873                     scheduleFlush(session);
874                     return false;
875                 }
876 
877                 if (message instanceof IoBuffer) {
878                     ((IoBuffer) message).free();
879                 }
880             } while (writtenBytes < maxWrittenBytes);
881         } catch (Exception e) {
882             if (req != null) {
883                 req.getFuture().setException(e);
884             }
885 
886             IoFilterChain filterChain = session.getFilterChain();
887             filterChain.fireExceptionCaught(e);
888             return false;
889         }
890 
891         return true;
892     }
893 
894     private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
895             throws Exception {
896         IoBuffer buf = (IoBuffer) req.getMessage();
897         int localWrittenBytes = 0;
898 
899         if (buf.hasRemaining()) {
900             int length;
901 
902             if (hasFragmentation) {
903                 length = Math.min(buf.remaining(), maxLength);
904             } else {
905                 length = buf.remaining();
906             }
907 
908             try {
909                 localWrittenBytes = write(session, buf, length);
910             } catch (IOException ioe) {
911                 // We have had an issue while trying to send data to the
912                 // peer : let's close the session.
913                 buf.free();
914                 session.close(true);
915                 destroy(session);
916 
917                 return 0;
918             }
919 
920         }
921 
922         session.increaseWrittenBytes(localWrittenBytes, currentTime);
923 
924         if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
925             // Buffer has been sent, clear the current request.
926             int pos = buf.position();
927             buf.reset();
928 
929             fireMessageSent(session, req);
930 
931             // And set it back to its position
932             buf.position(pos);
933         }
934 
935         return localWrittenBytes;
936     }
937 
938     private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
939             throws Exception {
940         int localWrittenBytes;
941         FileRegion region = (FileRegion) req.getMessage();
942 
943         if (region.getRemainingBytes() > 0) {
944             int length;
945 
946             if (hasFragmentation) {
947                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
948             } else {
949                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
950             }
951 
952             localWrittenBytes = transferFile(session, region, length);
953             region.update(localWrittenBytes);
954         } else {
955             localWrittenBytes = 0;
956         }
957 
958         session.increaseWrittenBytes(localWrittenBytes, currentTime);
959 
960         if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
961             fireMessageSent(session, req);
962         }
963 
964         return localWrittenBytes;
965     }
966 
967     private void fireMessageSent(S session, WriteRequest req) {
968         session.setCurrentWriteRequest(null);
969         IoFilterChain filterChain = session.getFilterChain();
970         filterChain.fireMessageSent(req);
971     }
972 
973     /**
974      * Update the trafficControl for all the session.
975      */
976     private void updateTrafficMask() {
977         int queueSize = trafficControllingSessions.size();
978 
979         while (queueSize > 0) {
980             S session = trafficControllingSessions.poll();
981 
982             if (session == null) {
983                 // We are done with this queue.
984                 return;
985             }
986 
987             SessionState state = getState(session);
988 
989             switch (state) {
990             case OPENED:
991                 updateTrafficControl(session);
992 
993                 break;
994 
995             case CLOSING:
996                 break;
997 
998             case OPENING:
999                 // Retry later if session is not yet fully initialized.
1000                 // (In case that Session.suspend??() or session.resume??() is
1001                 // called before addSession() is processed)
1002                 // We just put back the session at the end of the queue.
1003                 trafficControllingSessions.add(session);
1004                 break;
1005 
1006             default:
1007                 throw new IllegalStateException(String.valueOf(state));
1008             }
1009 
1010             // As we have handled one session, decrement the number of
1011             // remaining sessions. The OPENING session will be processed
1012             // with the next select(), as the queue size has been decreased,
1013             // even
1014             // if the session has been pushed at the end of the queue
1015             queueSize--;
1016         }
1017     }
1018 
1019     /**
1020      * {@inheritDoc}
1021      */
1022     public void updateTrafficControl(S session) {
1023         //
1024         try {
1025             setInterestedInRead(session, !session.isReadSuspended());
1026         } catch (Exception e) {
1027             IoFilterChain filterChain = session.getFilterChain();
1028             filterChain.fireExceptionCaught(e);
1029         }
1030 
1031         try {
1032             setInterestedInWrite(session,
1033                     !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1034         } catch (Exception e) {
1035             IoFilterChain filterChain = session.getFilterChain();
1036             filterChain.fireExceptionCaught(e);
1037         }
1038     }
1039 
1040     /**
1041      * The main loop. This is the place in charge to poll the Selector, and to
1042      * process the active sessions. It's done in - handle the newly created
1043      * sessions -
1044      */
1045     private class Processor implements Runnable {
1046         public void run() {
1047             assert (processorRef.get() == this);
1048 
1049             int nSessions = 0;
1050             lastIdleCheckTime = System.currentTimeMillis();
1051 
1052             for (;;) {
1053                 try {
1054                     // This select has a timeout so that we can manage
1055                     // idle session when we get out of the select every
1056                     // second. (note : this is a hack to avoid creating
1057                     // a dedicated thread).
1058                     long t0 = System.currentTimeMillis();
1059                     int selected = select(SELECT_TIMEOUT);
1060                     long t1 = System.currentTimeMillis();
1061                     long delta = (t1 - t0);
1062 
1063                     if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
1064                         // Last chance : the select() may have been
1065                         // interrupted because we have had an closed channel.
1066                         if (isBrokenConnection()) {
1067                             LOG.warn("Broken connection");
1068                         } else {
1069                             LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1070                             // Ok, we are hit by the nasty epoll
1071                             // spinning.
1072                             // Basically, there is a race condition
1073                             // which causes a closing file descriptor not to be
1074                             // considered as available as a selected channel,
1075                             // but
1076                             // it stopped the select. The next time we will
1077                             // call select(), it will exit immediately for the
1078                             // same
1079                             // reason, and do so forever, consuming 100%
1080                             // CPU.
1081                             // We have to destroy the selector, and
1082                             // register all the socket on a new one.
1083                             registerNewSelector();
1084                         }
1085                     }
1086 
1087                     // Manage newly created session first
1088                     nSessions += handleNewSessions();
1089 
1090                     updateTrafficMask();
1091 
1092                     // Now, if we have had some incoming or outgoing events,
1093                     // deal with them
1094                     if (selected > 0) {
1095                         // LOG.debug("Processing ..."); // This log hurts one of
1096                         // the MDCFilter test...
1097                         process();
1098                     }
1099 
1100                     // Write the pending requests
1101                     long currentTime = System.currentTimeMillis();
1102                     flush(currentTime);
1103 
1104                     // And manage removed sessions
1105                     nSessions -= removeSessions();
1106 
1107                     // Last, not least, send Idle events to the idle sessions
1108                     notifyIdleSessions(currentTime);
1109 
1110                     // Get a chance to exit the infinite loop if there are no
1111                     // more sessions on this Processor
1112                     if (nSessions == 0) {
1113                         processorRef.set(null);
1114 
1115                         if (newSessions.isEmpty() && isSelectorEmpty()) {
1116                             // newSessions.add() precedes startupProcessor
1117                             assert (processorRef.get() != this);
1118                             break;
1119                         }
1120 
1121                         assert (processorRef.get() != this);
1122 
1123                         if (!processorRef.compareAndSet(null, this)) {
1124                             // startupProcessor won race, so must exit processor
1125                             assert (processorRef.get() != this);
1126                             break;
1127                         }
1128 
1129                         assert (processorRef.get() == this);
1130                     }
1131 
1132                     // Disconnect all sessions immediately if disposal has been
1133                     // requested so that we exit this loop eventually.
1134                     if (isDisposing()) {
1135                         boolean hasKeys = false;
1136                         
1137                         for (Iterator<S> i = allSessions(); i.hasNext();) {
1138                             IoSession session = i.next();
1139                             
1140                             if (session.isActive()) {
1141                                 scheduleRemove((S)session);
1142                                 hasKeys = true;
1143                             }
1144                         }
1145 
1146                         if (hasKeys) {
1147                             wakeup();
1148                         }
1149                     }
1150                 } catch (ClosedSelectorException cse) {
1151                     // If the selector has been closed, we can exit the loop
1152                     // But first, dump a stack trace
1153                     ExceptionMonitor.getInstance().exceptionCaught(cse);
1154                     break;
1155                 } catch (Exception e) {
1156                     ExceptionMonitor.getInstance().exceptionCaught(e);
1157 
1158                     try {
1159                         Thread.sleep(1000);
1160                     } catch (InterruptedException e1) {
1161                         ExceptionMonitor.getInstance().exceptionCaught(e1);
1162                     }
1163                 }
1164             }
1165 
1166             try {
1167                 synchronized (disposalLock) {
1168                     if (disposing) {
1169                         doDispose();
1170                     }
1171                 }
1172             } catch (Exception e) {
1173                 ExceptionMonitor.getInstance().exceptionCaught(e);
1174             } finally {
1175                 disposalFuture.setValue(true);
1176             }
1177         }
1178     }
1179 }