View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentLinkedQueue;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  import org.apache.mina.core.buffer.IoBuffer;
34  import org.apache.mina.core.file.FileRegion;
35  import org.apache.mina.core.filterchain.IoFilterChain;
36  import org.apache.mina.core.future.DefaultIoFuture;
37  import org.apache.mina.core.service.AbstractIoService;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.session.AbstractIoSession;
40  import org.apache.mina.core.session.IoSession;
41  import org.apache.mina.core.session.IoSessionConfig;
42  import org.apache.mina.core.write.WriteRequest;
43  import org.apache.mina.core.write.WriteRequestQueue;
44  import org.apache.mina.core.write.WriteToClosedSessionException;
45  import org.apache.mina.util.ExceptionMonitor;
46  import org.apache.mina.util.NamePreservingRunnable;
47  
48  /**
49   * An abstract implementation of {@link IoProcessor} which helps
50   * transport developers to write an {@link IoProcessor} easily.
51   * This class is in charge of active polling a set of {@link IoSession}
52   * and trigger events when some I/O operation is possible.
53   *
54   * @author The Apache MINA Project (dev@mina.apache.org)
55   */
56  public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
57      /**
58       * The maximum loop count for a write operation until
59       * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
60       * It is similar to what a spin lock is for in concurrency programming.
61       * It improves memory utilization and write throughput significantly.
62       */
63      private static final int WRITE_SPIN_COUNT = 256;
64      
65      /** A timeout used for the select, as we need to get out to deal with idle sessions */
66      private static final long SELECT_TIMEOUT = 1000L;
67  
68      /** A map containing the last Thread ID for each class */
69      private static final Map<Class<?>, AtomicInteger> threadIds = 
70          new HashMap<Class<?>, AtomicInteger>();
71  
72      private final Object lock = new Object();
73      private final String threadName;
74      private final Executor executor;
75  
76      /** A Session queue containing the newly created sessions */
77      private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
78      private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
79      private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
80      private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
81  
82      /** The processor thread : it handles the incoming messages */
83      private Processor processor;
84      
85      private long lastIdleCheckTime;
86  
87      private final Object disposalLock = new Object();
88      private volatile boolean disposing;
89      private volatile boolean disposed;
90      private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
91  
92      /**
93       * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor}
94       * for handling I/Os events.
95       * 
96       * @param executor the {@link Executor} for handling I/O events
97       */
98      protected AbstractPollingIoProcessor(Executor executor) {
99          if (executor == null) {
100             throw new NullPointerException("executor");
101         }
102 
103         this.threadName = nextThreadName();
104         this.executor = executor;
105     }
106 
107     /**
108      * Compute the thread ID for this class instance. As we may have different
109      * classes, we store the last ID number into a Map associating the class
110      * name to the last assigned ID.
111      *   
112      * @return a name for the current thread, based on the class name and
113      * an incremental value, starting at 1. 
114      */
115     private String nextThreadName() {
116         Class<?> cls = getClass();
117         int newThreadId;
118         
119         // We synchronize this block to avoid a concurrent access to 
120         // the actomicInteger (it can be modified by another thread, while
121         // being seen as null by another thread)
122         synchronized( threadIds ) {
123             // Get the current ID associated to this class' name
124             AtomicInteger threadId = threadIds.get(cls);
125             
126             if (threadId == null) {
127                 // We never have seen this class before, just create a
128                 // new ID starting at 1 for it, and associate this ID
129                 // with the class name in the map.
130                 newThreadId = 1;
131                 threadIds.put(cls, new AtomicInteger(newThreadId));
132             } else {
133                 // Just increment the lat ID, and get it.
134                 newThreadId = threadId.incrementAndGet();
135             }
136         }
137         
138         // Now we can compute the name for this thread
139         return cls.getSimpleName() + '-' + newThreadId;
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     public final boolean isDisposing() {
146         return disposing;
147     }
148 
149     /**
150      * {@inheritDoc}
151      */
152     public final boolean isDisposed() {
153         return disposed;
154     }
155     
156     /**
157      * {@inheritDoc}
158      */
159     public final void dispose() {
160         if (disposed) {
161             return;
162         }
163 
164         synchronized (disposalLock) {
165             if (!disposing) {
166                 disposing = true;
167                 startupProcessor();
168             }
169         }
170 
171         disposalFuture.awaitUninterruptibly();
172         disposed = true;
173     }
174 
175     /**
176      * Dispose the resources used by this {@link IoProcessor} for polling 
177      * the client connections
178      * @throws Exception if some low level IO error occurs
179      */
180     protected abstract void dispose0() throws Exception;
181 
182     /**
183      * poll those sessions for the given timeout
184      * @param timeout milliseconds before the call timeout if no event appear
185      * @return The number of session ready for read or for write
186      * @throws Exception if some low level IO error occurs
187      */
188     protected abstract int select(long timeout) throws Exception;
189     
190     /**
191      * poll those sessions forever
192      * @return The number of session ready for read or for write
193      * @throws Exception if some low level IO error occurs
194      */
195     protected abstract int select() throws Exception;
196     
197     /**
198      * Say if the list of {@link IoSession} polled by this {@link IoProcessor} 
199      * is empty
200      * @return true if at least a session is managed by this {@link IoProcessor}
201      */
202     protected abstract boolean isSelectorEmpty();
203     
204     /**
205      * Interrupt the {@link AbstractPollingIoProcessor#select(int) call.
206      */
207     protected abstract void wakeup();
208     
209     /**
210      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
211      * {@link IoProcessor}   
212      * @return {@link Iterator} of {@link IoSession}
213      */
214     protected abstract Iterator<T> allSessions();
215     
216     /**
217      * Get an {@link Iterator} for the list of {@link IoSession} found selected 
218      * by the last call of {@link AbstractPollingIoProcessor#select(int)
219      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
220      */
221     protected abstract Iterator<T> selectedSessions();
222     
223     /**
224      * Get the state of a session (preparing, open, closed)
225      * @param session the {@link IoSession} to inspect
226      * @return the state of the session
227      */
228     protected abstract SessionState state(T session);
229 
230     /**
231      * Is the session ready for writing
232      * @param session the session queried
233      * @return true is ready, false if not ready
234      */
235     protected abstract boolean isWritable(T session);
236 
237     /**
238      * Is the session ready for reading
239      * @param session the session queried
240      * @return true is ready, false if not ready
241      */
242     protected abstract boolean isReadable(T session);
243 
244     /**
245      * register a session for writing
246      * @param session the session registered
247      * @param interested true for registering, false for removing
248      */
249     protected abstract void setInterestedInWrite(T session, boolean interested)
250             throws Exception;
251 
252     /**
253      * register a session for reading
254      * @param session the session registered
255      * @param interested true for registering, false for removing
256      */
257     protected abstract void setInterestedInRead(T session, boolean interested)
258             throws Exception;
259 
260     /**
261      * is this session registered for reading
262      * @param session the session queried
263      * @return true is registered for reading
264      */
265     protected abstract boolean isInterestedInRead(T session);
266 
267     /**
268      * is this session registered for writing
269      * @param session the session queried
270      * @return true is registered for writing
271      */
272     protected abstract boolean isInterestedInWrite(T session);
273 
274     /**
275      * Initialize the polling of a session. Add it to the polling process. 
276      * @param session the {@link IoSession} to add to the polling
277      * @throws Exception any exception thrown by the underlying system calls
278      */
279     protected abstract void init(T session) throws Exception;
280     
281     /**
282      * Destroy the underlying client socket handle
283      * @param session the {@link IoSession}
284      * @throws Exception any exception thrown by the underlying system calls
285      */
286     protected abstract void destroy(T session) throws Exception;
287     
288     /**
289      * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}. 
290      * Is called when the session was found ready for reading.
291      * @param session the session to read
292      * @param buf the buffer to fill
293      * @return the number of bytes read
294      * @throws Exception any exception thrown by the underlying system calls
295      */
296     protected abstract int read(T session, IoBuffer buf) throws Exception;
297 
298     /**
299      * Write a sequence of bytes to a {@link IoSession}, means to be called when a session
300      * was found ready for writing.
301      * @param session the session to write
302      * @param buf the buffer to write
303      * @param length the number of bytes to write can be superior to the number of bytes remaining
304      * in the buffer
305      * @return the number of byte written
306      * @throws Exception any exception thrown by the underlying system calls
307      */
308     protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
309     
310     /**
311      * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting
312      * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so 
313      * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call. 
314      * @param session the session to write
315      * @param region the file region to write
316      * @param length the length of the portion to send
317      * @return the number of written bytes
318      * @throws Exception any exception thrown by the underlying system calls
319      */
320     protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
321 
322     /**
323      * {@inheritDoc}
324      */
325     public final void add(T session) {
326         if (isDisposing()) {
327             throw new IllegalStateException("Already disposed.");
328         }
329 
330         // Adds the session to the newSession queue and starts the worker
331         newSessions.add(session);
332         startupProcessor();
333     }
334 
335     /**
336      * {@inheritDoc}
337      */
338     public final void remove(T session) {
339         scheduleRemove(session);
340         startupProcessor();
341     }
342 
343     private void scheduleRemove(T session) {
344         removingSessions.add(session);
345     }
346 
347     /**
348      * {@inheritDoc}
349      */
350     public final void flush(T session) {
351         boolean needsWakeup = flushingSessions.isEmpty();
352         if (scheduleFlush(session) && needsWakeup) {
353             wakeup();
354         }
355     }
356 
357     private boolean scheduleFlush(T session) {
358         if (session.setScheduledForFlush(true)) {
359             flushingSessions.add(session);
360             return true;
361         }
362         return false;
363     }
364 
365     /**
366      * {@inheritDoc}
367      */
368     public final void updateTrafficMask(T session) {
369         scheduleTrafficControl(session);
370         wakeup();
371     }
372 
373     private void scheduleTrafficControl(T session) {
374         trafficControllingSessions.add(session);
375     }
376 
377     /**
378      * Starts the inner Processor, asking the executor to pick a thread in its
379      * pool. The Runnable will be renamed 
380      */
381     private void startupProcessor() {
382         synchronized (lock) {
383             if (processor == null) {
384                 processor = new Processor();
385                 executor.execute(new NamePreservingRunnable(processor, threadName));
386             }
387         }
388         
389         // Just stop the select() and start it again, so that the processor
390         // can be activated immediately. 
391         wakeup();
392     }
393 
394     /**
395      * Handle newly created sessions
396      * @return The number of new sessions
397      */
398     private int handleNewSessions() {
399         int addedSessions = 0;
400         
401         // Loop on the new sessions blocking queue, to count
402         // the number of sessions who has been created
403         for (;;) {
404             T session = newSessions.poll();
405 
406             if (session == null) {
407                 // We don't have new sessions
408                 break;
409             }
410 
411 
412             if (addNow(session)) {
413                 // A new session has been created 
414                 addedSessions ++;
415             }
416         }
417 
418         return addedSessions;
419     }
420 
421     private boolean addNow(T session) {
422 
423         boolean registered = false;
424         boolean notified = false;
425         try {
426             init(session);
427             registered = true;
428 
429             // Build the filter chain of this session.
430             session.getService().getFilterChainBuilder().buildFilterChain(
431                     session.getFilterChain());
432 
433             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
434             // in AbstractIoFilterChain.fireSessionOpened().
435             ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
436             notified = true;
437         } catch (Throwable e) {
438             if (notified) {
439                 // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
440                 // and call ConnectFuture.setException().
441                 scheduleRemove(session);
442                 IoFilterChain filterChain = session.getFilterChain(); 
443                 filterChain.fireExceptionCaught(e);
444                 wakeup();
445             } else {
446                 ExceptionMonitor.getInstance().exceptionCaught(e);
447                 try {
448                     destroy(session);
449                 } catch (Exception e1) {
450                     ExceptionMonitor.getInstance().exceptionCaught(e1);
451                 } finally {
452                     registered = false;
453                 }
454             }
455         }
456         return registered;
457     }
458 
459     private int remove() {
460         int removedSessions = 0;
461         for (; ;) {
462             T session = removingSessions.poll();
463 
464             if (session == null) {
465                 break;
466             }
467 
468             SessionState state = state(session);
469             switch (state) {
470             case OPEN:
471                 if (removeNow(session)) {
472                     removedSessions ++;
473                 }
474                 break;
475             case CLOSED:
476                 // Skip if channel is already closed
477                 break;
478             case PREPARING:
479                 // Retry later if session is not yet fully initialized.
480                 // (In case that Session.close() is called before addSession() is processed)
481                 scheduleRemove(session);
482                 return removedSessions;
483             default:
484                 throw new IllegalStateException(String.valueOf(state));
485             }
486         }
487 
488         return removedSessions;
489     }
490 
491     private boolean removeNow(T session) {
492         clearWriteRequestQueue(session);
493 
494         try {
495             destroy(session);
496             return true;
497         } catch (Exception e) {
498             IoFilterChain filterChain = session.getFilterChain(); 
499             filterChain.fireExceptionCaught(e);
500         } finally {
501             clearWriteRequestQueue(session);
502             ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
503         }
504         return false;
505     }
506 
507     private void clearWriteRequestQueue(T session) {
508         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
509         WriteRequest req;
510 
511         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
512 
513         if ((req = writeRequestQueue.poll(session)) != null) {
514             Object m = req.getMessage();
515             if (m instanceof IoBuffer) {
516                 IoBuffer buf = (IoBuffer) req.getMessage();
517 
518                 // The first unwritten empty buffer must be
519                 // forwarded to the filter chain.
520                 if (buf.hasRemaining()) {
521                     buf.reset();
522                     failedRequests.add(req);
523                 } else {
524                     IoFilterChain filterChain = session.getFilterChain(); 
525                     filterChain.fireMessageSent(req);
526                 }
527             } else {
528                 failedRequests.add(req);
529             }
530 
531             // Discard others.
532             while ((req = writeRequestQueue.poll(session)) != null) {
533                 failedRequests.add(req);
534             }
535         }
536 
537         // Create an exception and notify.
538         if (!failedRequests.isEmpty()) {
539             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
540             for (WriteRequest r: failedRequests) {
541                 session.decreaseScheduledBytesAndMessages(r);
542                 r.getFuture().setException(cause);
543             }
544             IoFilterChain filterChain = session.getFilterChain(); 
545             filterChain.fireExceptionCaught(cause);
546         }
547     }
548 
549     private void process() throws Exception {
550         for (Iterator<T> i = selectedSessions(); i.hasNext();) {
551             T session = i.next();
552             process(session);
553             i.remove();
554         }
555     }
556 
557     /**
558      * Deal with session ready for the read or write operations, or both.
559      */
560     private void process(T session) {
561         // Process Reads
562         if (isReadable(session) && !session.isReadSuspended()) {
563             read(session);
564         }
565 
566         // Process writes
567         if (isWritable(session) && !session.isWriteSuspended()) {
568             scheduleFlush(session);
569         }
570     }
571 
572     private void read(T session) {
573         IoSessionConfig config = session.getConfig();
574         IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
575 
576         final boolean hasFragmentation =
577             session.getTransportMetadata().hasFragmentation();
578 
579         try {
580             int readBytes = 0;
581             int ret;
582 
583             try {
584                 if (hasFragmentation) {
585                     while ((ret = read(session, buf)) > 0) {
586                         readBytes += ret;
587                         if (!buf.hasRemaining()) {
588                             break;
589                         }
590                     }
591                 } else {
592                     ret = read(session, buf);
593                     if (ret > 0) {
594                         readBytes = ret;
595                     }
596                 }
597             } finally {
598                 buf.flip();
599             }
600 
601             if (readBytes > 0) {
602                 IoFilterChain filterChain = session.getFilterChain(); 
603                 filterChain.fireMessageReceived(buf);
604                 buf = null;
605 
606                 if (hasFragmentation) {
607                     if (readBytes << 1 < config.getReadBufferSize()) {
608                         session.decreaseReadBufferSize();
609                     } else if (readBytes == config.getReadBufferSize()) {
610                         session.increaseReadBufferSize();
611                     }
612                 }
613             }
614             if (ret < 0) {
615                 scheduleRemove(session);
616             }
617         } catch (Throwable e) {
618             if (e instanceof IOException) {
619                 scheduleRemove(session);
620             }
621             IoFilterChain filterChain = session.getFilterChain(); 
622             filterChain.fireExceptionCaught(e);
623         }
624     }
625 
626     private void notifyIdleSessions(long currentTime) throws Exception {
627         // process idle sessions
628         if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
629             lastIdleCheckTime = currentTime;
630             AbstractIoSession.notifyIdleness(allSessions(), currentTime);
631         }
632     }
633 
634     private void flush(long currentTime) {
635         final T firstSession = flushingSessions.peek();
636         if (firstSession == null) {
637             return;
638         }
639 
640         T session = flushingSessions.poll(); // the same one with firstSession
641         for (; ;) {
642             session.setScheduledForFlush(false);
643             SessionState state = state(session);
644             
645             switch (state) {
646             case OPEN:
647                 try {
648                     boolean flushedAll = flushNow(session, currentTime);
649                     if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
650                         !session.isScheduledForFlush()) {
651                         scheduleFlush(session);
652                     }
653                 } catch (Exception e) {
654                     scheduleRemove(session);
655                     IoFilterChain filterChain = session.getFilterChain(); 
656                     filterChain.fireExceptionCaught(e);
657                 }
658                 break;
659             case CLOSED:
660                 // Skip if the channel is already closed.
661                 break;
662             case PREPARING:
663                 // Retry later if session is not yet fully initialized.
664                 // (In case that Session.write() is called before addSession() is processed)
665                 scheduleFlush(session);
666                 return;
667             default:
668                 throw new IllegalStateException(String.valueOf(state));
669             }
670 
671             session = flushingSessions.peek();
672             if (session == null || session == firstSession) {
673                 break;
674             }
675             session = flushingSessions.poll();
676         }
677     }
678 
679     private boolean flushNow(T session, long currentTime) {
680         if (!session.isConnected()) {
681             scheduleRemove(session);
682             return false;
683         }
684 
685         final boolean hasFragmentation =
686             session.getTransportMetadata().hasFragmentation();
687 
688         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
689 
690         // Set limitation for the number of written bytes for read-write
691         // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
692         // performance in my experience while not breaking fairness much.
693         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
694                               (session.getConfig().getMaxReadBufferSize() >>> 1);
695         int writtenBytes = 0;
696         try {
697             // Clear OP_WRITE
698             setInterestedInWrite(session, false);
699             do {
700                 // Check for pending writes.
701                 WriteRequest req = session.getCurrentWriteRequest();
702                 if (req == null) {
703                     req = writeRequestQueue.poll(session);
704                     if (req == null) {
705                         break;
706                     }
707                     session.setCurrentWriteRequest(req);
708                 }
709 
710                 int localWrittenBytes = 0;
711                 Object message = req.getMessage();
712                 if (message instanceof IoBuffer) {
713                     localWrittenBytes = writeBuffer(
714                             session, req, hasFragmentation,
715                             maxWrittenBytes - writtenBytes,
716                             currentTime);
717                     if (localWrittenBytes > 0 && ((IoBuffer)message).hasRemaining() ) {
718                         // the buffer isn't empty, we re-interest it in writing 
719                         writtenBytes += localWrittenBytes;    
720                         setInterestedInWrite(session, true);
721                         return false;
722                     }
723                 } else if (message instanceof FileRegion) {
724                     localWrittenBytes = writeFile(
725                             session, req, hasFragmentation,
726                             maxWrittenBytes - writtenBytes,
727                             currentTime);
728 
729                     // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
730                     // If there's still data to be written in the FileRegion, return 0 indicating that we need
731                     // to pause until writing may resume.
732                     if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {
733                         writtenBytes += localWrittenBytes;
734                         setInterestedInWrite(session, true);
735                         return false;
736                     }
737                 } else {
738                     throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
739                 }
740 
741                 if (localWrittenBytes == 0) {
742                     // Kernel buffer is full.
743                     setInterestedInWrite(session, true);
744                     return false;
745                 }
746 
747                 writtenBytes += localWrittenBytes;
748 
749                 if (writtenBytes >= maxWrittenBytes) {
750                     // Wrote too much
751                     scheduleFlush(session);
752                     return false;
753                 }
754             } while (writtenBytes < maxWrittenBytes);
755         } catch (Exception e) {
756             IoFilterChain filterChain = session.getFilterChain(); 
757             filterChain.fireExceptionCaught(e);
758             return false;
759         }
760 
761         return true;
762     }
763 
764     private int writeBuffer(T session, WriteRequest req,
765             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
766         IoBuffer buf = (IoBuffer) req.getMessage();
767         int localWrittenBytes = 0;
768         if (buf.hasRemaining()) {
769             int length;
770             if (hasFragmentation) {
771                 length = Math.min(buf.remaining(), maxLength);
772             } else {
773                 length = buf.remaining();
774             }
775             for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
776                 localWrittenBytes = write(session, buf, length);
777                 if (localWrittenBytes != 0) {
778                     break;
779                 }
780             }
781         }
782 
783         session.increaseWrittenBytes(localWrittenBytes, currentTime);
784 
785         if (!buf.hasRemaining() ||
786                 !hasFragmentation && localWrittenBytes != 0) {
787             // Buffer has been sent, clear the current request.
788             buf.reset();
789             fireMessageSent(session, req);
790         }
791         return localWrittenBytes;
792     }
793 
794     private int writeFile(T session, WriteRequest req,
795             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
796         int localWrittenBytes;
797         FileRegion region = (FileRegion) req.getMessage();
798         if (region.getRemainingBytes() > 0) {
799             int length;
800             if (hasFragmentation) {
801                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
802             } else {
803                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
804             }
805             localWrittenBytes = transferFile(session, region, length);
806             region.update(localWrittenBytes);
807         } else {
808             localWrittenBytes = 0;
809         }
810 
811         session.increaseWrittenBytes(localWrittenBytes, currentTime);
812 
813         if (region.getRemainingBytes() <= 0 ||
814                     !hasFragmentation && localWrittenBytes != 0) {
815             fireMessageSent(session, req);
816         }
817 
818         return localWrittenBytes;
819     }
820 
821     private void fireMessageSent(T session, WriteRequest req) {
822         session.setCurrentWriteRequest(null);
823         IoFilterChain filterChain = session.getFilterChain(); 
824         filterChain.fireMessageSent(req);
825     }
826 
827     private void updateTrafficMask() {
828         for (; ;) {
829             T session = trafficControllingSessions.poll();
830 
831             if (session == null) {
832                 break;
833             }
834 
835             SessionState state = state(session);
836             switch (state) {
837             case OPEN:
838                 updateTrafficControl(session);
839                 break;
840             case CLOSED:
841                 break;
842             case PREPARING:
843                 // Retry later if session is not yet fully initialized.
844                 // (In case that Session.suspend??() or session.resume??() is
845                 // called before addSession() is processed)
846                 scheduleTrafficControl(session);
847                 return;
848             default:
849                 throw new IllegalStateException(String.valueOf(state));
850             }
851         }
852     }
853 
854     public void updateTrafficControl(T session) {
855         try {
856             setInterestedInRead(session, !session.isReadSuspended());
857         } catch (Exception e) {
858             IoFilterChain filterChain = session.getFilterChain(); 
859             filterChain.fireExceptionCaught(e);
860         }
861         try {
862             setInterestedInWrite(
863                     session,
864                     !session.getWriteRequestQueue().isEmpty(session) &&
865                             !session.isWriteSuspended());
866         } catch (Exception e) {
867             IoFilterChain filterChain = session.getFilterChain(); 
868             filterChain.fireExceptionCaught(e);
869         }
870     }
871         
872     private class Processor implements Runnable {
873         public void run() {
874             int nSessions = 0;
875             lastIdleCheckTime = System.currentTimeMillis();
876 
877             for (;;) {
878                 try {
879                     // This select has a timeout so that we can manage
880                     // dile session when we get out of the select every
881                     // second. (note : this is a hack to avoid creating
882                     // a dedicated thread).
883                     int selected = select(SELECT_TIMEOUT);
884 
885                     nSessions += handleNewSessions();
886                     updateTrafficMask();
887 
888                     // Now, if we have had some incoming or outgoing events,
889                     // deal with them
890                     if (selected > 0) {
891                         process();
892                     }
893 
894                     long currentTime = System.currentTimeMillis();
895                     flush(currentTime);
896                     nSessions -= remove();
897                     notifyIdleSessions(currentTime);
898 
899                     if (nSessions == 0) {
900                         synchronized (lock) {
901                             if (newSessions.isEmpty() && isSelectorEmpty()) {
902                                 processor = null;
903                                 break;
904                             }
905                         }
906                     }
907 
908                     // Disconnect all sessions immediately if disposal has been
909                     // requested so that we exit this loop eventually.
910                     if (isDisposing()) {
911                         for (Iterator<T> i = allSessions(); i.hasNext(); ) {
912                             scheduleRemove(i.next());
913                         }
914                         wakeup();
915                     }
916                 } catch (Throwable t) {
917                     ExceptionMonitor.getInstance().exceptionCaught(t);
918 
919                     try {
920                         Thread.sleep(1000);
921                     } catch (InterruptedException e1) {
922                         ExceptionMonitor.getInstance().exceptionCaught(e1);
923                     }
924                 }
925             }
926 
927             try {
928                 synchronized (disposalLock) {
929                     if (isDisposing()) {
930                         dispose0();
931                     }
932                 }
933             } catch (Throwable t) {
934                 ExceptionMonitor.getInstance().exceptionCaught(t);
935             } finally {
936                 disposalFuture.setValue(true);
937             }
938         }
939     }
940 
941     protected static enum SessionState {
942         OPEN,
943         CLOSED,
944         PREPARING,
945     }
946 }