View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.net.PortUnreachableException;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.mina.core.buffer.IoBuffer;
37  import org.apache.mina.core.file.FileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40  import org.apache.mina.core.future.DefaultIoFuture;
41  import org.apache.mina.core.service.AbstractIoService;
42  import org.apache.mina.core.service.IoProcessor;
43  import org.apache.mina.core.service.IoServiceListenerSupport;
44  import org.apache.mina.core.session.AbstractIoSession;
45  import org.apache.mina.core.session.IoSession;
46  import org.apache.mina.core.session.IoSessionConfig;
47  import org.apache.mina.core.session.SessionState;
48  import org.apache.mina.core.write.WriteRequest;
49  import org.apache.mina.core.write.WriteRequestQueue;
50  import org.apache.mina.core.write.WriteToClosedSessionException;
51  import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52  import org.apache.mina.util.ExceptionMonitor;
53  import org.apache.mina.util.NamePreservingRunnable;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  /**
58   * An abstract implementation of {@link IoProcessor} which helps transport
59   * developers to write an {@link IoProcessor} easily. This class is in charge of
60   * active polling a set of {@link IoSession} and trigger events when some I/O
61   * operation is possible.
62   *
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   *
65   * @param <S> the type of the {@link IoSession} this processor can handle
66   */
67  public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
68      /** A logger for this class */
69      private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
70  
71      /**
72       * The maximum loop count for a write operation until
73       * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
74       * It is similar to what a spin lock is for in concurrency programming. It
75       * improves memory utilization and write throughput significantly.
76       */
77      private static final int WRITE_SPIN_COUNT = 256;
78  
79      /**
80       * A timeout used for the select, as we need to get out to deal with idle
81       * sessions
82       */
83      private static final long SELECT_TIMEOUT = 1000L;
84  
85      /** A map containing the last Thread ID for each class */
86      private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
87  
88      /** This IoProcessor instance name */
89      private final String threadName;
90  
91      /** The executor to use when we need to start the inner Processor */
92      private final Executor executor;
93  
94      /** A Session queue containing the newly created sessions */
95      private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
96  
97      /** A queue used to store the sessions to be removed */
98      private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
99  
100     /** A queue used to store the sessions to be flushed */
101     private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
102 
103     /**
104      * A queue used to store the sessions which have a trafficControl to be
105      * updated
106      */
107     private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
108 
109     /** The processor thread : it handles the incoming messages */
110     private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
111 
112     private long lastIdleCheckTime;
113 
114     private final Object disposalLock = new Object();
115 
116     private volatile boolean disposing;
117 
118     private volatile boolean disposed;
119 
120     private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
121 
122     protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
123 
124     /**
125      * Create an {@link AbstractPollingIoProcessor} with the given
126      * {@link Executor} for handling I/Os events.
127      *
128      * @param executor
129      *            the {@link Executor} for handling I/O events
130      */
131     protected AbstractPollingIoProcessor(Executor executor) {
132         if (executor == null) {
133             throw new IllegalArgumentException("executor");
134         }
135 
136         this.threadName = nextThreadName();
137         this.executor = executor;
138     }
139 
140     /**
141      * Compute the thread ID for this class instance. As we may have different
142      * classes, we store the last ID number into a Map associating the class
143      * name to the last assigned ID.
144      *
145      * @return a name for the current thread, based on the class name and an
146      *         incremental value, starting at 1.
147      */
148     private String nextThreadName() {
149         Class<?> cls = getClass();
150         int newThreadId;
151 
152         AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
153 
154         if (threadId == null) {
155             newThreadId = 1;
156         } else {
157             // Just increment the last ID, and get it.
158             newThreadId = threadId.incrementAndGet();
159         }
160 
161         // Now we can compute the name for this thread
162         return cls.getSimpleName() + '-' + newThreadId;
163     }
164 
165     /**
166      * {@inheritDoc}
167      */
168     public final boolean isDisposing() {
169         return disposing;
170     }
171 
172     /**
173      * {@inheritDoc}
174      */
175     public final boolean isDisposed() {
176         return disposed;
177     }
178 
179     /**
180      * {@inheritDoc}
181      */
182     public final void dispose() {
183         if (disposed || disposing) {
184             return;
185         }
186 
187         synchronized (disposalLock) {
188             disposing = true;
189             startupProcessor();
190         }
191 
192         disposalFuture.awaitUninterruptibly();
193         disposed = true;
194     }
195 
196     /**
197      * Dispose the resources used by this {@link IoProcessor} for polling the
198      * client connections. The implementing class doDispose method will be called.
199      *
200      * @throws Exception if some low level IO error occurs
201      */
202     protected abstract void doDispose() throws Exception;
203 
204     /**
205      * poll those sessions for the given timeout
206      *
207      * @param timeout
208      *            milliseconds before the call timeout if no event appear
209      * @return The number of session ready for read or for write
210      * @throws Exception
211      *             if some low level IO error occurs
212      */
213     protected abstract int select(long timeout) throws Exception;
214 
215     /**
216      * poll those sessions forever
217      *
218      * @return The number of session ready for read or for write
219      * @throws Exception
220      *             if some low level IO error occurs
221      */
222     protected abstract int select() throws Exception;
223 
224     /**
225      * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
226      * is empty
227      *
228      * @return true if at least a session is managed by this {@link IoProcessor}
229      */
230     protected abstract boolean isSelectorEmpty();
231 
232     /**
233      * Interrupt the {@link AbstractPollingIoProcessor#select(int) call.
234      */
235     protected abstract void wakeup();
236 
237     /**
238      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
239      * {@link IoProcessor}
240      *
241      * @return {@link Iterator} of {@link IoSession}
242      */
243     protected abstract Iterator<S> allSessions();
244 
245     /**
246      * Get an {@link Iterator} for the list of {@link IoSession} found selected
247      * by the last call of {@link AbstractPollingIoProcessor#select(int)
248      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
249      */
250     protected abstract Iterator<S> selectedSessions();
251 
252     /**
253      * Get the state of a session (preparing, open, closed)
254      *
255      * @param session
256      *            the {@link IoSession} to inspect
257      * @return the state of the session
258      */
259     protected abstract SessionState getState(S session);
260 
261     /**
262      * Is the session ready for writing
263      *
264      * @param session
265      *            the session queried
266      * @return true is ready, false if not ready
267      */
268     protected abstract boolean isWritable(S session);
269 
270     /**
271      * Is the session ready for reading
272      *
273      * @param session
274      *            the session queried
275      * @return true is ready, false if not ready
276      */
277     protected abstract boolean isReadable(S session);
278 
279     /**
280      * register a session for writing
281      *
282      * @param session
283      *            the session registered
284      * @param isInterested
285      *            true for registering, false for removing
286      */
287     protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
288 
289     /**
290      * register a session for reading
291      *
292      * @param session
293      *            the session registered
294      * @param isInterested
295      *            true for registering, false for removing
296      */
297     protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
298 
299     /**
300      * is this session registered for reading
301      *
302      * @param session
303      *            the session queried
304      * @return true is registered for reading
305      */
306     protected abstract boolean isInterestedInRead(S session);
307 
308     /**
309      * is this session registered for writing
310      *
311      * @param session
312      *            the session queried
313      * @return true is registered for writing
314      */
315     protected abstract boolean isInterestedInWrite(S session);
316 
317     /**
318      * Initialize the polling of a session. Add it to the polling process.
319      *
320      * @param session the {@link IoSession} to add to the polling
321      * @throws Exception any exception thrown by the underlying system calls
322      */
323     protected abstract void init(S session) throws Exception;
324 
325     /**
326      * Destroy the underlying client socket handle
327      *
328      * @param session
329      *            the {@link IoSession}
330      * @throws Exception
331      *             any exception thrown by the underlying system calls
332      */
333     protected abstract void destroy(S session) throws Exception;
334 
335     /**
336      * Reads a sequence of bytes from a {@link IoSession} into the given
337      * {@link IoBuffer}. Is called when the session was found ready for reading.
338      *
339      * @param session
340      *            the session to read
341      * @param buf
342      *            the buffer to fill
343      * @return the number of bytes read
344      * @throws Exception
345      *             any exception thrown by the underlying system calls
346      */
347     protected abstract int read(S session, IoBuffer buf) throws Exception;
348 
349     /**
350      * Write a sequence of bytes to a {@link IoSession}, means to be called when
351      * a session was found ready for writing.
352      *
353      * @param session
354      *            the session to write
355      * @param buf
356      *            the buffer to write
357      * @param length
358      *            the number of bytes to write can be superior to the number of
359      *            bytes remaining in the buffer
360      * @return the number of byte written
361      * @throws Exception
362      *             any exception thrown by the underlying system calls
363      */
364     protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
365 
366     /**
367      * Write a part of a file to a {@link IoSession}, if the underlying API
368      * isn't supporting system calls like sendfile(), you can throw a
369      * {@link UnsupportedOperationException} so the file will be send using
370      * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
371      *
372      * @param session
373      *            the session to write
374      * @param region
375      *            the file region to write
376      * @param length
377      *            the length of the portion to send
378      * @return the number of written bytes
379      * @throws Exception
380      *             any exception thrown by the underlying system calls
381      */
382     protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
383 
384     /**
385      * {@inheritDoc}
386      */
387     public final void add(S session) {
388         if (disposed || disposing) {
389             throw new IllegalStateException("Already disposed.");
390         }
391 
392         // Adds the session to the newSession queue and starts the worker
393         newSessions.add(session);
394         startupProcessor();
395     }
396 
397     /**
398      * {@inheritDoc}
399      */
400     public final void remove(S session) {
401         scheduleRemove(session);
402         startupProcessor();
403     }
404 
405     private void scheduleRemove(S session) {
406         removingSessions.add(session);
407     }
408 
409     /**
410      * {@inheritDoc}
411      */
412     public void write(S session, WriteRequest writeRequest) {
413         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
414 
415         writeRequestQueue.offer(session, writeRequest);
416 
417         if (!session.isWriteSuspended()) {
418             this.flush(session);
419         }
420     }
421 
422     /**
423      * {@inheritDoc}
424      */
425     public final void flush(S session) {
426         // add the session to the queue if it's not already
427         // in the queue, then wake up the select()
428         if (session.setScheduledForFlush(true)) {
429             flushingSessions.add(session);
430             wakeup();
431         }
432     }
433 
434     private void scheduleFlush(S session) {
435         // add the session to the queue if it's not already
436         // in the queue
437         if (session.setScheduledForFlush(true)) {
438             flushingSessions.add(session);
439         }
440     }
441 
442     /**
443      * {@inheritDoc}
444      */
445     public final void updateTrafficMask(S session) {
446         trafficControllingSessions.add(session);
447         wakeup();
448     }
449 
450     /**
451      * Starts the inner Processor, asking the executor to pick a thread in its
452      * pool. The Runnable will be renamed
453      */
454     private void startupProcessor() {
455         Processor processor = processorRef.get();
456 
457         if (processor == null) {
458             processor = new Processor();
459 
460             if (processorRef.compareAndSet(null, processor)) {
461                 executor.execute(new NamePreservingRunnable(processor, threadName));
462             }
463         }
464 
465         // Just stop the select() and start it again, so that the processor
466         // can be activated immediately.
467         wakeup();
468     }
469 
470     /**
471      * In the case we are using the java select() method, this method is used to
472      * trash the buggy selector and create a new one, registring all the sockets
473      * on it.
474      *
475      * @throws IOException
476      *             If we got an exception
477      */
478     abstract protected void registerNewSelector() throws IOException;
479 
480     /**
481      * Check that the select() has not exited immediately just because of a
482      * broken connection. In this case, this is a standard case, and we just
483      * have to loop.
484      *
485      * @return true if a connection has been brutally closed.
486      * @throws IOException
487      *             If we got an exception
488      */
489     abstract protected boolean isBrokenConnection() throws IOException;
490 
491     /**
492      * Loops over the new sessions blocking queue and returns the number of
493      * sessions which are effectively created
494      *
495      * @return The number of new sessions
496      */
497     private int handleNewSessions() {
498         int addedSessions = 0;
499 
500         for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
501             if (addNow(session)) {
502                 // A new session has been created
503                 addedSessions++;
504             }
505         }
506 
507         return addedSessions;
508     }
509 
510     /**
511      * Process a new session :
512      * - initialize it
513      * - create its chain
514      * - fire the CREATED listeners if any
515      *
516      * @param session The session to create
517      * @return true if the session has been registered
518      */
519     private boolean addNow(S session) {
520         boolean registered = false;
521 
522         try {
523             init(session);
524             registered = true;
525 
526             // Build the filter chain of this session.
527             IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
528             chainBuilder.buildFilterChain(session.getFilterChain());
529 
530             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
531             // in AbstractIoFilterChain.fireSessionOpened().
532             // Propagate the SESSION_CREATED event up to the chain
533             IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
534             listeners.fireSessionCreated(session);
535         } catch (Throwable e) {
536             ExceptionMonitor.getInstance().exceptionCaught(e);
537 
538             try {
539                 destroy(session);
540             } catch (Exception e1) {
541                 ExceptionMonitor.getInstance().exceptionCaught(e1);
542             } finally {
543                 registered = false;
544             }
545         }
546 
547         return registered;
548     }
549 
550     private int removeSessions() {
551         int removedSessions = 0;
552 
553         for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
554             SessionState state = getState(session);
555 
556             // Now deal with the removal accordingly to the session's state
557             switch (state) {
558             case OPENED:
559                 // Try to remove this session
560                 if (removeNow(session)) {
561                     removedSessions++;
562                 }
563 
564                 break;
565 
566             case CLOSING:
567                 // Skip if channel is already closed
568                 break;
569 
570             case OPENING:
571                 // Remove session from the newSessions queue and
572                 // remove it
573                 newSessions.remove(session);
574 
575                 if (removeNow(session)) {
576                     removedSessions++;
577                 }
578 
579                 break;
580 
581             default:
582                 throw new IllegalStateException(String.valueOf(state));
583             }
584         }
585 
586         return removedSessions;
587     }
588 
589     private boolean removeNow(S session) {
590         clearWriteRequestQueue(session);
591 
592         try {
593             destroy(session);
594             return true;
595         } catch (Exception e) {
596             IoFilterChain filterChain = session.getFilterChain();
597             filterChain.fireExceptionCaught(e);
598         } finally {
599             clearWriteRequestQueue(session);
600             ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
601         }
602         return false;
603     }
604 
605     private void clearWriteRequestQueue(S session) {
606         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
607         WriteRequest req;
608 
609         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
610 
611         if ((req = writeRequestQueue.poll(session)) != null) {
612             Object message = req.getMessage();
613 
614             if (message instanceof IoBuffer) {
615                 IoBuffer buf = (IoBuffer) message;
616 
617                 // The first unwritten empty buffer must be
618                 // forwarded to the filter chain.
619                 if (buf.hasRemaining()) {
620                     buf.reset();
621                     failedRequests.add(req);
622                 } else {
623                     IoFilterChain filterChain = session.getFilterChain();
624                     filterChain.fireMessageSent(req);
625                 }
626             } else {
627                 failedRequests.add(req);
628             }
629 
630             // Discard others.
631             while ((req = writeRequestQueue.poll(session)) != null) {
632                 failedRequests.add(req);
633             }
634         }
635 
636         // Create an exception and notify.
637         if (!failedRequests.isEmpty()) {
638             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
639 
640             for (WriteRequest r : failedRequests) {
641                 session.decreaseScheduledBytesAndMessages(r);
642                 r.getFuture().setException(cause);
643             }
644 
645             IoFilterChain filterChain = session.getFilterChain();
646             filterChain.fireExceptionCaught(cause);
647         }
648     }
649 
650     private void process() throws Exception {
651         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
652             S session = i.next();
653             process(session);
654             i.remove();
655         }
656     }
657 
658     /**
659      * Deal with session ready for the read or write operations, or both.
660      */
661     private void process(S session) {
662         // Process Reads
663         if (isReadable(session) && !session.isReadSuspended()) {
664             read(session);
665         }
666 
667         // Process writes
668         if (isWritable(session) && !session.isWriteSuspended()) {
669             // add the session to the queue, if it's not already there
670             if (session.setScheduledForFlush(true)) {
671                 flushingSessions.add(session);
672             }
673         }
674     }
675 
676     private void read(S session) {
677         IoSessionConfig config = session.getConfig();
678         int bufferSize = config.getReadBufferSize();
679         IoBuffer buf = IoBuffer.allocate(bufferSize);
680 
681         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
682 
683         try {
684             int readBytes = 0;
685             int ret;
686 
687             try {
688                 if (hasFragmentation) {
689 
690                     while ((ret = read(session, buf)) > 0) {
691                         readBytes += ret;
692 
693                         if (!buf.hasRemaining()) {
694                             break;
695                         }
696                     }
697                 } else {
698                     ret = read(session, buf);
699 
700                     if (ret > 0) {
701                         readBytes = ret;
702                     }
703                 }
704             } finally {
705                 buf.flip();
706             }
707 
708             if (readBytes > 0) {
709                 IoFilterChain filterChain = session.getFilterChain();
710                 filterChain.fireMessageReceived(buf);
711                 buf = null;
712 
713                 if (hasFragmentation) {
714                     if (readBytes << 1 < config.getReadBufferSize()) {
715                         session.decreaseReadBufferSize();
716                     } else if (readBytes == config.getReadBufferSize()) {
717                         session.increaseReadBufferSize();
718                     }
719                 }
720             }
721 
722             if (ret < 0) {
723                 scheduleRemove(session);
724             }
725         } catch (Throwable e) {
726             if (e instanceof IOException) {
727                 if (!(e instanceof PortUnreachableException)
728                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
729                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
730                     scheduleRemove(session);
731                 }
732             }
733 
734             IoFilterChain filterChain = session.getFilterChain();
735             filterChain.fireExceptionCaught(e);
736         }
737     }
738 
739     private static String byteArrayToHex(byte[] barray) {
740         char[] c = new char[barray.length * 2];
741         int pos = 0;
742 
743         for (byte b : barray) {
744             int bb = (b & 0x00FF) >> 4;
745             c[pos++] = (char) (bb > 9 ? bb + 0x37 : bb + 0x30);
746             bb = b & 0x0F;
747             c[pos++] = (char) (bb > 9 ? bb + 0x37 : bb + 0x30);
748             if (pos > 60) {
749                 break;
750             }
751         }
752 
753         return new String(c);
754     }
755 
756     private void notifyIdleSessions(long currentTime) throws Exception {
757         // process idle sessions
758         if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
759             lastIdleCheckTime = currentTime;
760             AbstractIoSession.notifyIdleness(allSessions(), currentTime);
761         }
762     }
763 
764     /**
765      * Write all the pending messages
766      */
767     private void flush(long currentTime) {
768         if (flushingSessions.isEmpty()) {
769             return;
770         }
771 
772         do {
773             S session = flushingSessions.poll(); // the same one with firstSession
774 
775             if (session == null) {
776                 // Just in case ... It should not happen.
777                 break;
778             }
779 
780             // Reset the Schedule for flush flag for this session,
781             // as we are flushing it now
782             session.unscheduledForFlush();
783 
784             SessionState state = getState(session);
785 
786             switch (state) {
787             case OPENED:
788                 try {
789                     boolean flushedAll = flushNow(session, currentTime);
790 
791                     if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
792                             && !session.isScheduledForFlush()) {
793                         scheduleFlush(session);
794                     }
795                 } catch (Exception e) {
796                     scheduleRemove(session);
797                     IoFilterChain filterChain = session.getFilterChain();
798                     filterChain.fireExceptionCaught(e);
799                 }
800 
801                 break;
802 
803             case CLOSING:
804                 // Skip if the channel is already closed.
805                 break;
806 
807             case OPENING:
808                 // Retry later if session is not yet fully initialized.
809                 // (In case that Session.write() is called before addSession()
810                 // is processed)
811                 scheduleFlush(session);
812                 return;
813 
814             default:
815                 throw new IllegalStateException(String.valueOf(state));
816             }
817 
818         } while (!flushingSessions.isEmpty());
819     }
820 
821     private boolean flushNow(S session, long currentTime) {
822         if (!session.isConnected()) {
823             scheduleRemove(session);
824             return false;
825         }
826 
827         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
828 
829         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
830 
831         // Set limitation for the number of written bytes for read-write
832         // fairness. I used maxReadBufferSize * 3 / 2, which yields best
833         // performance in my experience while not breaking fairness much.
834         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
835                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
836         int writtenBytes = 0;
837         WriteRequest req = null;
838 
839         try {
840             // Clear OP_WRITE
841             setInterestedInWrite(session, false);
842 
843             do {
844                 // Check for pending writes.
845                 req = session.getCurrentWriteRequest();
846 
847                 if (req == null) {
848                     req = writeRequestQueue.poll(session);
849 
850                     if (req == null) {
851                         break;
852                     }
853 
854                     session.setCurrentWriteRequest(req);
855                 }
856 
857                 int localWrittenBytes = 0;
858                 Object message = req.getMessage();
859 
860                 if (message instanceof IoBuffer) {
861                     localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
862                             currentTime);
863 
864                     if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
865                         // the buffer isn't empty, we re-interest it in writing
866                         writtenBytes += localWrittenBytes;
867                         setInterestedInWrite(session, true);
868                         return false;
869                     }
870                 } else if (message instanceof FileRegion) {
871                     localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
872                             currentTime);
873 
874                     // Fix for Java bug on Linux
875                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
876                     // If there's still data to be written in the FileRegion,
877                     // return 0 indicating that we need
878                     // to pause until writing may resume.
879                     if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
880                         writtenBytes += localWrittenBytes;
881                         setInterestedInWrite(session, true);
882                         return false;
883                     }
884                 } else {
885                     throw new IllegalStateException("Don't know how to handle message of type '"
886                             + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
887                 }
888 
889                 if (localWrittenBytes == 0) {
890                     // Kernel buffer is full.
891                     setInterestedInWrite(session, true);
892                     return false;
893                 }
894 
895                 writtenBytes += localWrittenBytes;
896 
897                 if (writtenBytes >= maxWrittenBytes) {
898                     // Wrote too much
899                     scheduleFlush(session);
900                     return false;
901                 }
902             } while (writtenBytes < maxWrittenBytes);
903         } catch (Exception e) {
904             if (req != null) {
905                 req.getFuture().setException(e);
906             }
907 
908             IoFilterChain filterChain = session.getFilterChain();
909             filterChain.fireExceptionCaught(e);
910             return false;
911         }
912 
913         return true;
914     }
915 
916     private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
917             throws Exception {
918         IoBuffer buf = (IoBuffer) req.getMessage();
919         int localWrittenBytes = 0;
920 
921         if (buf.hasRemaining()) {
922             int length;
923 
924             if (hasFragmentation) {
925                 length = Math.min(buf.remaining(), maxLength);
926             } else {
927                 length = buf.remaining();
928             }
929 
930             try {
931                 localWrittenBytes = write(session, buf, length);
932             } catch (IOException ioe) {
933                 // We have had an issue while trying to send data to the 
934                 // peer : let's close the session.
935                 session.close(true);
936             }
937 
938         }
939 
940         session.increaseWrittenBytes(localWrittenBytes, currentTime);
941 
942         if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
943             // Buffer has been sent, clear the current request.
944             int pos = buf.position();
945             buf.reset();
946 
947             fireMessageSent(session, req);
948 
949             // And set it back to its position
950             buf.position(pos);
951         }
952 
953         return localWrittenBytes;
954     }
955 
956     private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
957             throws Exception {
958         int localWrittenBytes;
959         FileRegion region = (FileRegion) req.getMessage();
960 
961         if (region.getRemainingBytes() > 0) {
962             int length;
963 
964             if (hasFragmentation) {
965                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
966             } else {
967                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
968             }
969 
970             localWrittenBytes = transferFile(session, region, length);
971             region.update(localWrittenBytes);
972         } else {
973             localWrittenBytes = 0;
974         }
975 
976         session.increaseWrittenBytes(localWrittenBytes, currentTime);
977 
978         if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
979             fireMessageSent(session, req);
980         }
981 
982         return localWrittenBytes;
983     }
984 
985     private void fireMessageSent(S session, WriteRequest req) {
986         session.setCurrentWriteRequest(null);
987         IoFilterChain filterChain = session.getFilterChain();
988         filterChain.fireMessageSent(req);
989     }
990 
991     /**
992      * Update the trafficControl for all the session.
993      */
994     private void updateTrafficMask() {
995         int queueSize = trafficControllingSessions.size();
996 
997         while (queueSize > 0) {
998             S session = trafficControllingSessions.poll();
999 
1000             if (session == null) {
1001                 // We are done with this queue.
1002                 return;
1003             }
1004 
1005             SessionState state = getState(session);
1006 
1007             switch (state) {
1008             case OPENED:
1009                 updateTrafficControl(session);
1010 
1011                 break;
1012 
1013             case CLOSING:
1014                 break;
1015 
1016             case OPENING:
1017                 // Retry later if session is not yet fully initialized.
1018                 // (In case that Session.suspend??() or session.resume??() is
1019                 // called before addSession() is processed)
1020                 // We just put back the session at the end of the queue.
1021                 trafficControllingSessions.add(session);
1022                 break;
1023 
1024             default:
1025                 throw new IllegalStateException(String.valueOf(state));
1026             }
1027 
1028             // As we have handled one session, decrement the number of
1029             // remaining sessions. The OPENING session will be processed
1030             // with the next select(), as the queue size has been decreased, even
1031             // if the session has been pushed at the end of the queue
1032             queueSize--;
1033         }
1034     }
1035 
1036     /**
1037      * {@inheritDoc}
1038      */
1039     public void updateTrafficControl(S session) {
1040         //
1041         try {
1042             setInterestedInRead(session, !session.isReadSuspended());
1043         } catch (Exception e) {
1044             IoFilterChain filterChain = session.getFilterChain();
1045             filterChain.fireExceptionCaught(e);
1046         }
1047 
1048         try {
1049             setInterestedInWrite(session,
1050                     !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1051         } catch (Exception e) {
1052             IoFilterChain filterChain = session.getFilterChain();
1053             filterChain.fireExceptionCaught(e);
1054         }
1055     }
1056 
1057     /**
1058      * The main loop. This is the place in charge to poll the Selector, and to
1059      * process the active sessions. It's done in
1060      * - handle the newly created sessions
1061      * -
1062      */
1063     private class Processor implements Runnable {
1064         public void run() {
1065             assert (processorRef.get() == this);
1066 
1067             int nSessions = 0;
1068             lastIdleCheckTime = System.currentTimeMillis();
1069 
1070             for (;;) {
1071                 try {
1072                     // This select has a timeout so that we can manage
1073                     // idle session when we get out of the select every
1074                     // second. (note : this is a hack to avoid creating
1075                     // a dedicated thread).
1076                     long t0 = System.currentTimeMillis();
1077                     int selected = select(SELECT_TIMEOUT);
1078                     long t1 = System.currentTimeMillis();
1079                     long delta = (t1 - t0);
1080 
1081                     if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
1082                         // Last chance : the select() may have been
1083                         // interrupted because we have had an closed channel.
1084                         if (isBrokenConnection()) {
1085                             LOG.warn("Broken connection");
1086 
1087                             // we can reselect immediately
1088                             // set back the flag to false
1089                             wakeupCalled.getAndSet(false);
1090 
1091                             continue;
1092                         } else {
1093                             LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1094                             // Ok, we are hit by the nasty epoll
1095                             // spinning.
1096                             // Basically, there is a race condition
1097                             // which causes a closing file descriptor not to be
1098                             // considered as available as a selected channel, but
1099                             // it stopped the select. The next time we will
1100                             // call select(), it will exit immediately for the same
1101                             // reason, and do so forever, consuming 100%
1102                             // CPU.
1103                             // We have to destroy the selector, and
1104                             // register all the socket on a new one.
1105                             registerNewSelector();
1106                         }
1107 
1108                         // Set back the flag to false
1109                         wakeupCalled.getAndSet(false);
1110 
1111                         // and continue the loop
1112                         continue;
1113                     }
1114 
1115                     // Manage newly created session first
1116                     nSessions += handleNewSessions();
1117 
1118                     updateTrafficMask();
1119 
1120                     // Now, if we have had some incoming or outgoing events,
1121                     // deal with them
1122                     if (selected > 0) {
1123                         //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
1124                         process();
1125                     }
1126 
1127                     // Write the pending requests
1128                     long currentTime = System.currentTimeMillis();
1129                     flush(currentTime);
1130 
1131                     // And manage removed sessions
1132                     nSessions -= removeSessions();
1133 
1134                     // Last, not least, send Idle events to the idle sessions
1135                     notifyIdleSessions(currentTime);
1136 
1137                     // Get a chance to exit the infinite loop if there are no
1138                     // more sessions on this Processor
1139                     if (nSessions == 0) {
1140                         processorRef.set(null);
1141 
1142                         if (newSessions.isEmpty() && isSelectorEmpty()) {
1143                             // newSessions.add() precedes startupProcessor
1144                             assert (processorRef.get() != this);
1145                             break;
1146                         }
1147 
1148                         assert (processorRef.get() != this);
1149 
1150                         if (!processorRef.compareAndSet(null, this)) {
1151                             // startupProcessor won race, so must exit processor
1152                             assert (processorRef.get() != this);
1153                             break;
1154                         }
1155 
1156                         assert (processorRef.get() == this);
1157                     }
1158 
1159                     // Disconnect all sessions immediately if disposal has been
1160                     // requested so that we exit this loop eventually.
1161                     if (isDisposing()) {
1162                         for (Iterator<S> i = allSessions(); i.hasNext();) {
1163                             scheduleRemove(i.next());
1164                         }
1165 
1166                         wakeup();
1167                     }
1168                 } catch (ClosedSelectorException cse) {
1169                     // If the selector has been closed, we can exit the loop
1170                     break;
1171                 } catch (Throwable t) {
1172                     ExceptionMonitor.getInstance().exceptionCaught(t);
1173 
1174                     try {
1175                         Thread.sleep(1000);
1176                     } catch (InterruptedException e1) {
1177                         ExceptionMonitor.getInstance().exceptionCaught(e1);
1178                     }
1179                 }
1180             }
1181 
1182             try {
1183                 synchronized (disposalLock) {
1184                     if (disposing) {
1185                         doDispose();
1186                     }
1187                 }
1188             } catch (Throwable t) {
1189                 ExceptionMonitor.getInstance().exceptionCaught(t);
1190             } finally {
1191                 disposalFuture.setValue(true);
1192             }
1193         }
1194     }
1195 }