View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.nio.channels.SelectionKey;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.file.FileRegion;
36  import org.apache.mina.core.future.DefaultIoFuture;
37  import org.apache.mina.core.service.AbstractIoService;
38  import org.apache.mina.core.service.IoProcessor;
39  import org.apache.mina.core.session.AbstractIoSession;
40  import org.apache.mina.core.session.IdleStatusChecker;
41  import org.apache.mina.core.session.IoSession;
42  import org.apache.mina.core.session.IoSessionConfig;
43  import org.apache.mina.core.write.WriteRequest;
44  import org.apache.mina.core.write.WriteRequestQueue;
45  import org.apache.mina.core.write.WriteToClosedSessionException;
46  import org.apache.mina.util.ExceptionMonitor;
47  import org.apache.mina.util.NamePreservingRunnable;
48  
49  /**
50   * An abstract implementation of {@link IoProcessor} which helps
51   * transport developers to write an {@link IoProcessor} easily.
52   * This class is in charge of active polling a set of {@link IoSession}
53   * and trigger events when some I/O operation is possible.
54   *
55   * @author The Apache MINA Project (dev@mina.apache.org)
56   * @version $Rev: 680286 $, $Date: 2008-07-28 10:22:53 +0200 (lun, 28 jui 2008) $
57   */
58  public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession> implements IoProcessor<T> {
59      /**
60       * The maximum loop count for a write operation until
61       * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
62       * It is similar to what a spin lock is for in concurrency programming.
63       * It improves memory utilization and write throughput significantly.
64       */
65      private static final int WRITE_SPIN_COUNT = 256;
66  
67      /** A map containing the last Thread ID for each class */
68      private static final Map<Class<?>, AtomicInteger> threadIds = 
69          new HashMap<Class<?>, AtomicInteger>();
70  
71      private final Object lock = new Object();
72      private final String threadName;
73      private final Executor executor;
74  
75      private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
76      private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
77      private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
78      private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
79  
80      private Worker worker;
81      private long lastIdleCheckTime;
82  
83      private final Object disposalLock = new Object();
84      private volatile boolean disposing;
85      private volatile boolean disposed;
86      private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
87  
88      /**
89       * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor}
90       * for handling I/Os events.
91       * 
92       * @param executor the {@link Executor} for handling I/O events
93       */
94      protected AbstractPollingIoProcessor(Executor executor) {
95          if (executor == null) {
96              throw new NullPointerException("executor");
97          }
98  
99          this.threadName = nextThreadName();
100         this.executor = executor;
101     }
102 
103     /**
104      * Compute the thread ID for this class instance. As we may have different
105      * classes, we store the last ID number into a Map associating the class
106      * name to the last assigned ID.
107      *   
108      * @return a name for the current thread, based on the class name and
109      * an incremental value, starting at 1. 
110      */
111     private String nextThreadName() {
112         Class<?> cls = getClass();
113         int newThreadId;
114         
115         // We synchronize this block to avoid a concurrent access to 
116         // the actomicInteger (it can be modified by another thread, while
117         // being seen as null by another thread)
118         synchronized( threadIds ) {
119             // Get the current ID associated to this class' name
120             AtomicInteger threadId = threadIds.get(cls);
121             
122             if (threadId == null) {
123                 // We never have seen this class before, just create a
124                 // new ID starting at 1 for it, and associate this ID
125                 // with the class name in the map.
126                 newThreadId = 1;
127                 threadIds.put(cls, new AtomicInteger(newThreadId));
128             } else {
129                 // Just increment the lat ID, and get it.
130                 newThreadId = threadId.incrementAndGet();
131             }
132         }
133         
134         // Now we can compute the name for this thread
135         return cls.getSimpleName() + '-' + newThreadId;
136     }
137 
138     /**
139      * {@inheritDoc}
140      */
141     public final boolean isDisposing() {
142         return disposing;
143     }
144 
145     /**
146      * {@inheritDoc}
147      */
148     public final boolean isDisposed() {
149         return disposed;
150     }
151     
152     /**
153      * {@inheritDoc}
154      */
155     public final void dispose() {
156         if (disposed) {
157             return;
158         }
159 
160         synchronized (disposalLock) {
161             if (!disposing) {
162                 disposing = true;
163                 startupWorker();
164             }
165         }
166 
167         disposalFuture.awaitUninterruptibly();
168         disposed = true;
169     }
170 
171     /**
172      * Dispose the resources used by this {@link IoProcessor} for polling 
173      * the client connections
174      * @throws Exception if some low level IO error occurs
175      */
176     protected abstract void dispose0() throws Exception;
177 
178     /**
179      * poll those sessions for the given timeout
180      * @param timeout milliseconds before the call timeout if no event appear
181      * @return true if at least a session is ready for read or for write
182      * @throws Exception if some low level IO error occurs
183      */
184     protected abstract boolean select(int timeout) throws Exception;
185     
186     /**
187      * Say if the list of {@link IoSession} polled by this {@link IoProcessor} 
188      * is empty
189      * @return true if at least a session is managed by this {@link IoProcessor}
190      */
191     protected abstract boolean isSelectorEmpty();
192     
193     /**
194      * Interrupt the {@link AbstractPollingIoProcessor#select(int) call.
195      */
196     protected abstract void wakeup();
197     
198     /**
199      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
200      * {@link IoProcessor}   
201      * @return {@link Iterator} of {@link IoSession}
202      */
203     protected abstract Iterator<T> allSessions();
204     
205     /**
206      * Get an {@link Iterator} for the list of {@link IoSession} found selected 
207      * by the last call of {@link AbstractPollingIoProcessor#select(int)
208      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
209      */
210     protected abstract Iterator<T> selectedSessions();
211     
212     /**
213      * Get the sate of a session (preparing, open, closed)
214      * @param session the {@link IoSession} to inspect
215      * @return the state of the session
216      */
217     protected abstract SessionState state(T session);
218 
219     /**
220      * Is the session ready for writing
221      * @param session the session queried
222      * @return true is ready, false if not ready
223      */
224     protected abstract boolean isWritable(T session);
225 
226     /**
227      * Is the session ready for reading
228      * @param session the session queried
229      * @return true is ready, false if not ready
230      */
231     protected abstract boolean isReadable(T session);
232 
233     /**
234      * register a session for writing
235      * @param session the session registered
236      * @param interested true for registering, false for removing
237      */
238     protected abstract void setInterestedInWrite(T session, boolean interested)
239             throws Exception;
240 
241     /**
242      * register a session for reading
243      * @param session the session registered
244      * @param interested true for registering, false for removing
245      */
246     protected abstract void setInterestedInRead(T session, boolean interested)
247             throws Exception;
248 
249     /**
250      * is this session registered for reading
251      * @param session the session queried
252      * @return true is registered for reading
253      */
254     protected abstract boolean isInterestedInRead(T session);
255 
256     /**
257      * is this session registered for writing
258      * @param session the session queried
259      * @return true is registered for writing
260      */
261     protected abstract boolean isInterestedInWrite(T session);
262 
263     /**
264      * Initialize the polling of a session. Add it to the polling process. 
265      * @param session the {@link IoSession} to add to the polling
266      * @throws Exception any exception thrown by the underlying system calls
267      */
268     protected abstract void init(T session) throws Exception;
269     
270     /**
271      * Destroy the underlying client socket handle
272      * @param session the {@link IoSession}
273      * @throws Exception any exception thrown by the underlying system calls
274      */
275     protected abstract void destroy(T session) throws Exception;
276     
277     /**
278      * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}. 
279      * Is called when the session was found ready for reading.
280      * @param session the session to read
281      * @param buf the buffer to fill
282      * @return the number of bytes read
283      * @throws Exception any exception thrown by the underlying system calls
284      */
285     protected abstract int read(T session, IoBuffer buf) throws Exception;
286 
287     /**
288      * Write a sequence of bytes to a {@link IoSession}, means to be called when a session
289      * was found ready for writing.
290      * @param session the session to write
291      * @param buf the buffer to write
292      * @param length the number of bytes to write can be superior to the number of bytes remaining
293      * in the buffer
294      * @return the number of byte written
295      * @throws Exception any exception thrown by the underlying system calls
296      */
297     protected abstract int write(T session, IoBuffer buf, int length) throws Exception;
298     
299     /**
300      * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting
301      * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so 
302      * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call. 
303      * @param session the session to write
304      * @param region the file region to write
305      * @param length the length of the portion to send
306      * @return the number of written bytes
307      * @throws Exception any exception thrown by the underlying system calls
308      */
309     protected abstract int transferFile(T session, FileRegion region, int length) throws Exception;
310 
311     /**
312      * {@inheritDoc}
313      */
314     public final void add(T session) {
315         if (isDisposing()) {
316             throw new IllegalStateException("Already disposed.");
317         }
318 
319         newSessions.add(session);
320         startupWorker();
321     }
322 
323     /**
324      * {@inheritDoc}
325      */
326     public final void remove(T session) {
327         scheduleRemove(session);
328         startupWorker();
329     }
330 
331     private void scheduleRemove(T session) {
332         removingSessions.add(session);
333     }
334 
335     /**
336      * {@inheritDoc}
337      */
338     public final void flush(T session) {
339         // The following optimization has been disabled because it can cause StackOverflowError.
340         //if (Thread.currentThread() == workerThread) {
341         //    // Bypass the queue if called from the worker thread itself
342         //    // (i.e. single thread model).
343         //    flushNow(session, System.currentTimeMillis());
344         //    return;
345         //}
346 
347         boolean needsWakeup = flushingSessions.isEmpty();
348         if (scheduleFlush(session) && needsWakeup) {
349             wakeup();
350         }
351     }
352 
353     private boolean scheduleFlush(T session) {
354         if (session.setScheduledForFlush(true)) {
355             flushingSessions.add(session);
356             return true;
357         }
358         return false;
359     }
360 
361     /**
362      * {@inheritDoc}
363      */
364     public final void updateTrafficMask(T session) {
365         scheduleTrafficControl(session);
366         wakeup();
367     }
368 
369     private void scheduleTrafficControl(T session) {
370         trafficControllingSessions.add(session);
371     }
372 
373     private void startupWorker() {
374         synchronized (lock) {
375             if (worker == null) {
376                 worker = new Worker();
377                 executor.execute(new NamePreservingRunnable(worker, threadName));
378             }
379         }
380         wakeup();
381     }
382 
383     private int add() {
384         int addedSessions = 0;
385         
386         // Loop on the new sessions blocking queue, to count
387         // the number of sessions who has been created
388         for (;;) {
389             T session = newSessions.poll();
390 
391             if (session == null) {
392                 // We don't have anymore new sessions
393                 break;
394             }
395 
396 
397             if (addNow(session)) {
398                 // The new session has been added to the 
399                 addedSessions ++;
400             }
401         }
402 
403         return addedSessions;
404     }
405 
406     private boolean addNow(T session) {
407 
408         boolean registered = false;
409         boolean notified = false;
410         try {
411             init(session);
412             registered = true;
413 
414             // Build the filter chain of this session.
415             session.getService().getFilterChainBuilder().buildFilterChain(
416                     session.getFilterChain());
417 
418             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
419             // in AbstractIoFilterChain.fireSessionOpened().
420             ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
421             notified = true;
422         } catch (Throwable e) {
423             if (notified) {
424                 // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
425                 // and call ConnectFuture.setException().
426                 scheduleRemove(session);
427                 session.getFilterChain().fireExceptionCaught(e);
428                 wakeup();
429             } else {
430                 ExceptionMonitor.getInstance().exceptionCaught(e);
431                 try {
432                     destroy(session);
433                 } catch (Exception e1) {
434                     ExceptionMonitor.getInstance().exceptionCaught(e1);
435                 } finally {
436                     registered = false;
437                 }
438             }
439         }
440         return registered;
441     }
442 
443     private int remove() {
444         int removedSessions = 0;
445         for (; ;) {
446             T session = removingSessions.poll();
447 
448             if (session == null) {
449                 break;
450             }
451 
452             SessionState state = state(session);
453             switch (state) {
454             case OPEN:
455                 if (removeNow(session)) {
456                     removedSessions ++;
457                 }
458                 break;
459             case CLOSED:
460                 // Skip if channel is already closed
461                 break;
462             case PREPARING:
463                 // Retry later if session is not yet fully initialized.
464                 // (In case that Session.close() is called before addSession() is processed)
465                 scheduleRemove(session);
466                 return removedSessions;
467             default:
468                 throw new IllegalStateException(String.valueOf(state));
469             }
470         }
471 
472         return removedSessions;
473     }
474 
475     private boolean removeNow(T session) {
476         clearWriteRequestQueue(session);
477 
478         try {
479             destroy(session);
480             return true;
481         } catch (Exception e) {
482             session.getFilterChain().fireExceptionCaught(e);
483         } finally {
484             clearWriteRequestQueue(session);
485             ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
486         }
487         return false;
488     }
489 
490     private void clearWriteRequestQueue(T session) {
491         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
492         WriteRequest req;
493 
494         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
495 
496         if ((req = writeRequestQueue.poll(session)) != null) {
497             Object m = req.getMessage();
498             if (m instanceof IoBuffer) {
499                 IoBuffer buf = (IoBuffer) req.getMessage();
500 
501                 // The first unwritten empty buffer must be
502                 // forwarded to the filter chain.
503                 if (buf.hasRemaining()) {
504                     buf.reset();
505                     failedRequests.add(req);
506                 } else {
507                     session.getFilterChain().fireMessageSent(req);
508                 }
509             } else {
510                 failedRequests.add(req);
511             }
512 
513             // Discard others.
514             while ((req = writeRequestQueue.poll(session)) != null) {
515                 failedRequests.add(req);
516             }
517         }
518 
519         // Create an exception and notify.
520         if (!failedRequests.isEmpty()) {
521             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
522             for (WriteRequest r: failedRequests) {
523                 session.decreaseScheduledBytesAndMessages(r);
524                 r.getFuture().setException(cause);
525             }
526             session.getFilterChain().fireExceptionCaught(cause);
527         }
528     }
529 
530     private void process() throws Exception {
531         for (Iterator<T> i = selectedSessions(); i.hasNext();) {
532             process(i.next());
533             i.remove();
534         }
535     }
536 
537     private void process(T session) {
538 
539         if (isReadable(session) && session.getTrafficMask().isReadable()) {
540             read(session);
541         }
542 
543         if (isWritable(session) && session.getTrafficMask().isWritable()) {
544             scheduleFlush(session);
545         }
546     }
547 
548     private void read(T session) {
549         IoSessionConfig config = session.getConfig();
550         IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
551 
552         final boolean hasFragmentation =
553             session.getTransportMetadata().hasFragmentation();
554 
555         try {
556             int readBytes = 0;
557             int ret;
558 
559             try {
560                 if (hasFragmentation) {
561                     while ((ret = read(session, buf)) > 0) {
562                         readBytes += ret;
563                         if (!buf.hasRemaining()) {
564                             break;
565                         }
566                     }
567                 } else {
568                     ret = read(session, buf);
569                     if (ret > 0) {
570                         readBytes = ret;
571                     }
572                 }
573             } finally {
574                 buf.flip();
575             }
576 
577             if (readBytes > 0) {
578                 session.getFilterChain().fireMessageReceived(buf);
579                 buf = null;
580 
581                 if (hasFragmentation) {
582                     if (readBytes << 1 < config.getReadBufferSize()) {
583                         session.decreaseReadBufferSize();
584                     } else if (readBytes == config.getReadBufferSize()) {
585                         session.increaseReadBufferSize();
586                     }
587                 }
588             }
589             if (ret < 0) {
590                 scheduleRemove(session);
591             }
592         } catch (Throwable e) {
593             if (e instanceof IOException) {
594                 scheduleRemove(session);
595             }
596             session.getFilterChain().fireExceptionCaught(e);
597         }
598     }
599 
600     private void notifyIdleSessions(long currentTime) throws Exception {
601         // process idle sessions
602         if (currentTime - lastIdleCheckTime >= 1000) {
603             lastIdleCheckTime = currentTime;
604             IdleStatusChecker.notifyIdleness(allSessions(), currentTime);
605         }
606     }
607 
608     private void flush(long currentTime) {
609         final T firstSession = flushingSessions.peek();
610         if (firstSession == null) {
611             return;
612         }
613 
614         T session = flushingSessions.poll(); // the same one with firstSession
615         for (; ;) {
616             session.setScheduledForFlush(false);
617             SessionState state = state(session);
618             switch (state) {
619             case OPEN:
620                 try {
621                     boolean flushedAll = flushNow(session, currentTime);
622                     if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
623                         !session.isScheduledForFlush()) {
624                         scheduleFlush(session);
625                     }
626                 } catch (Exception e) {
627                     scheduleRemove(session);
628                     session.getFilterChain().fireExceptionCaught(e);
629                 }
630                 break;
631             case CLOSED:
632                 // Skip if the channel is already closed.
633                 break;
634             case PREPARING:
635                 // Retry later if session is not yet fully initialized.
636                 // (In case that Session.write() is called before addSession() is processed)
637                 scheduleFlush(session);
638                 return;
639             default:
640                 throw new IllegalStateException(String.valueOf(state));
641             }
642 
643             session = flushingSessions.peek();
644             if (session == null || session == firstSession) {
645                 break;
646             }
647             session = flushingSessions.poll();
648         }
649     }
650 
651     private boolean flushNow(T session, long currentTime) {
652         if (!session.isConnected()) {
653             scheduleRemove(session);
654             return false;
655         }
656 
657         final boolean hasFragmentation =
658             session.getTransportMetadata().hasFragmentation();
659 
660         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
661 
662         // Set limitation for the number of written bytes for read-write
663         // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
664         // performance in my experience while not breaking fairness much.
665         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize() +
666                               (session.getConfig().getMaxReadBufferSize() >>> 1);
667         int writtenBytes = 0;
668         try {
669             // Clear OP_WRITE
670             setInterestedInWrite(session, false);
671             do {
672                 // Check for pending writes.
673                 WriteRequest req = session.getCurrentWriteRequest();
674                 if (req == null) {
675                     req = writeRequestQueue.poll(session);
676                     if (req == null) {
677                         break;
678                     }
679                     session.setCurrentWriteRequest(req);
680                 }
681 
682                 int localWrittenBytes = 0;
683                 Object message = req.getMessage();
684                 if (message instanceof IoBuffer) {
685                     localWrittenBytes = writeBuffer(
686                             session, req, hasFragmentation,
687                             maxWrittenBytes - writtenBytes,
688                             currentTime);
689                     if (localWrittenBytes > 0 && ((IoBuffer)message).hasRemaining() ) {
690                     	// the buffer isn't empty, we re-interest it in writing 
691                     	writtenBytes += localWrittenBytes;    	
692                     	setInterestedInWrite(session, true);
693                         return false;
694                     }
695                 } else if (message instanceof FileRegion) {
696                     localWrittenBytes = writeFile(
697                             session, req, hasFragmentation,
698                             maxWrittenBytes - writtenBytes,
699                             currentTime);
700 
701                     // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
702                     // If there's still data to be written in the FileRegion, return 0 indicating that we need
703                     // to pause until writing may resume.
704                     if (localWrittenBytes > 0 && ((FileRegion) message).getRemainingBytes() > 0) {
705                         writtenBytes += localWrittenBytes;
706                         setInterestedInWrite(session, true);
707                         return false;
708                     }
709                 } else {
710                     throw new IllegalStateException("Don't know how to handle message of type '" + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
711                 }
712 
713                 if (localWrittenBytes == 0) {
714                     // Kernel buffer is full.
715                     setInterestedInWrite(session, true);
716                     return false;
717                 }
718 
719                 writtenBytes += localWrittenBytes;
720 
721                 if (writtenBytes >= maxWrittenBytes) {
722                     // Wrote too much
723                     scheduleFlush(session);
724                     return false;
725                 }
726             } while (writtenBytes < maxWrittenBytes);
727         } catch (Exception e) {
728             session.getFilterChain().fireExceptionCaught(e);
729             return false;
730         }
731 
732         return true;
733     }
734 
735     private int writeBuffer(T session, WriteRequest req,
736             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
737         IoBuffer buf = (IoBuffer) req.getMessage();
738         int localWrittenBytes = 0;
739         if (buf.hasRemaining()) {
740             int length;
741             if (hasFragmentation) {
742                 length = Math.min(buf.remaining(), maxLength);
743             } else {
744                 length = buf.remaining();
745             }
746             for (int i = WRITE_SPIN_COUNT; i > 0; i --) {
747                 localWrittenBytes = write(session, buf, length);
748                 if (localWrittenBytes != 0) {
749                     break;
750                 }
751             }
752         }
753 
754         session.increaseWrittenBytes(localWrittenBytes, currentTime);
755 
756         if (!buf.hasRemaining() ||
757                 !hasFragmentation && localWrittenBytes != 0) {
758             // Buffer has been sent, clear the current request.
759             buf.reset();
760             fireMessageSent(session, req);
761         }
762         return localWrittenBytes;
763     }
764 
765     private int writeFile(T session, WriteRequest req,
766             boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
767         int localWrittenBytes;
768         FileRegion region = (FileRegion) req.getMessage();
769         if (region.getRemainingBytes() > 0) {
770             int length;
771             if (hasFragmentation) {
772                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
773             } else {
774                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
775             }
776             localWrittenBytes = transferFile(session, region, length);
777             region.update(localWrittenBytes);
778         } else {
779             localWrittenBytes = 0;
780         }
781 
782         session.increaseWrittenBytes(localWrittenBytes, currentTime);
783 
784         if (region.getRemainingBytes() <= 0 ||
785                     !hasFragmentation && localWrittenBytes != 0) {
786             fireMessageSent(session, req);
787         }
788 
789         return localWrittenBytes;
790     }
791 
792     private void fireMessageSent(T session, WriteRequest req) {
793         session.setCurrentWriteRequest(null);
794         session.getFilterChain().fireMessageSent(req);
795     }
796 
797     private void updateTrafficMask() {
798         for (; ;) {
799             T session = trafficControllingSessions.poll();
800 
801             if (session == null) {
802                 break;
803             }
804 
805             SessionState state = state(session);
806             switch (state) {
807             case OPEN:
808                 updateTrafficMaskNow(session);
809                 break;
810             case CLOSED:
811                 break;
812             case PREPARING:
813                 // Retry later if session is not yet fully initialized.
814                 // (In case that Session.suspend??() or session.resume??() is
815                 // called before addSession() is processed)
816                 scheduleTrafficControl(session);
817                 return;
818             default:
819                 throw new IllegalStateException(String.valueOf(state));
820             }
821         }
822     }
823 
824     private void updateTrafficMaskNow(T session) {
825         // The normal is OP_READ and, if there are write requests in the
826         // session's write queue, set OP_WRITE to trigger flushing.
827         int mask = session.getTrafficMask().getInterestOps();
828         try {
829             setInterestedInRead(session, (mask & SelectionKey.OP_READ) != 0);
830         } catch (Exception e) {
831             session.getFilterChain().fireExceptionCaught(e);
832         }
833         try {
834             setInterestedInWrite(
835                     session,
836                     !session.getWriteRequestQueue().isEmpty(session) &&
837                             (mask & SelectionKey.OP_WRITE) != 0);
838         } catch (Exception e) {
839             session.getFilterChain().fireExceptionCaught(e);
840         }
841     }
842 
843     
844     private class Worker implements Runnable {
845         public void run() {
846             int nSessions = 0;
847             lastIdleCheckTime = System.currentTimeMillis();
848 
849             for (;;) {
850                 try {
851                     boolean selected = select(1000);
852 
853                     nSessions += add();
854                     updateTrafficMask();
855 
856                     if (selected) {
857                         process();
858                     }
859 
860                     long currentTime = System.currentTimeMillis();
861                     flush(currentTime);
862                     nSessions -= remove();
863                     notifyIdleSessions(currentTime);
864 
865                     if (nSessions == 0) {
866                         synchronized (lock) {
867                             if (newSessions.isEmpty() && isSelectorEmpty()) {
868                                 worker = null;
869                                 break;
870                             }
871                         }
872                     }
873 
874                     // Disconnect all sessions immediately if disposal has been
875                     // requested so that we exit this loop eventually.
876                     if (isDisposing()) {
877                         for (Iterator<T> i = allSessions(); i.hasNext(); ) {
878                             scheduleRemove(i.next());
879                         }
880                         wakeup();
881                     }
882                 } catch (Throwable t) {
883                     ExceptionMonitor.getInstance().exceptionCaught(t);
884 
885                     try {
886                         Thread.sleep(1000);
887                     } catch (InterruptedException e1) {
888                         ExceptionMonitor.getInstance().exceptionCaught(e1);
889                     }
890                 }
891             }
892 
893             try {
894                 synchronized (disposalLock) {
895                     if (isDisposing()) {
896                         dispose0();
897                     }
898                 }
899             } catch (Throwable t) {
900                 ExceptionMonitor.getInstance().exceptionCaught(t);
901             } finally {
902                 disposalFuture.setValue(true);
903             }
904         }
905     }
906 
907     protected static enum SessionState {
908         OPEN,
909         CLOSED,
910         PREPARING,
911     }
912 }