View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.net.PortUnreachableException;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.mina.core.buffer.IoBuffer;
37  import org.apache.mina.core.file.FileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40  import org.apache.mina.core.future.DefaultIoFuture;
41  import org.apache.mina.core.service.AbstractIoService;
42  import org.apache.mina.core.service.IoProcessor;
43  import org.apache.mina.core.service.IoServiceListenerSupport;
44  import org.apache.mina.core.session.AbstractIoSession;
45  import org.apache.mina.core.session.IoSession;
46  import org.apache.mina.core.session.IoSessionConfig;
47  import org.apache.mina.core.session.SessionState;
48  import org.apache.mina.core.write.WriteRequest;
49  import org.apache.mina.core.write.WriteRequestQueue;
50  import org.apache.mina.core.write.WriteToClosedSessionException;
51  import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52  import org.apache.mina.util.ExceptionMonitor;
53  import org.apache.mina.util.NamePreservingRunnable;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  /**
58   * An abstract implementation of {@link IoProcessor} which helps transport
59   * developers to write an {@link IoProcessor} easily. This class is in charge of
60   * active polling a set of {@link IoSession} and trigger events when some I/O
61   * operation is possible.
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   * 
65   * @param <S>
66   *            the type of the {@link IoSession} this processor can handle
67   */
68  public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
69      /** A logger for this class */
70      private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
71  
72      /**
73       * A timeout used for the select, as we need to get out to deal with idle
74       * sessions
75       */
76      private static final long SELECT_TIMEOUT = 1000L;
77  
78      /** A map containing the last Thread ID for each class */
79      private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
80  
81      /** This IoProcessor instance name */
82      private final String threadName;
83  
84      /** The executor to use when we need to start the inner Processor */
85      private final Executor executor;
86  
87      /** A Session queue containing the newly created sessions */
88      private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
89  
90      /** A queue used to store the sessions to be removed */
91      private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
92  
93      /** A queue used to store the sessions to be flushed */
94      private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
95  
96      /**
97       * A queue used to store the sessions which have a trafficControl to be
98       * updated
99       */
100     private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();
101 
102     /** The processor thread : it handles the incoming messages */
103     private final AtomicReference<Processor> processorRef = new AtomicReference<>();
104 
105     private long lastIdleCheckTime;
106 
107     private final Object disposalLock = new Object();
108 
109     private volatile boolean disposing;
110 
111     private volatile boolean disposed;
112 
113     private final DefaultIoFuturee.html#DefaultIoFuture">DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114 
115     protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116 
117     /**
118      * Create an {@link AbstractPollingIoProcessor} with the given
119      * {@link Executor} for handling I/Os events.
120      * 
121      * @param executor
122      *            the {@link Executor} for handling I/O events
123      */
124     protected AbstractPollingIoProcessor(Executor executor) {
125         if (executor == null) {
126             throw new IllegalArgumentException("executor");
127         }
128 
129         this.threadName = nextThreadName();
130         this.executor = executor;
131     }
132 
133     /**
134      * Compute the thread ID for this class instance. As we may have different
135      * classes, we store the last ID number into a Map associating the class
136      * name to the last assigned ID.
137      * 
138      * @return a name for the current thread, based on the class name and an
139      *         incremental value, starting at 1.
140      */
141     private String nextThreadName() {
142         Class<?> cls = getClass();
143         int newThreadId;
144 
145         AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146 
147         if (threadId == null) {
148             newThreadId = 1;
149         } else {
150             // Just increment the last ID, and get it.
151             newThreadId = threadId.incrementAndGet();
152         }
153 
154         // Now we can compute the name for this thread
155         return cls.getSimpleName() + '-' + newThreadId;
156     }
157 
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public final boolean isDisposing() {
163         return disposing;
164     }
165 
166     /**
167      * {@inheritDoc}
168      */
169     @Override
170     public final boolean isDisposed() {
171         return disposed;
172     }
173 
174     /**
175      * {@inheritDoc}
176      */
177     @Override
178     public final void dispose() {
179         if (disposed || disposing) {
180             return;
181         }
182 
183         synchronized (disposalLock) {
184             disposing = true;
185             startupProcessor();
186         }
187 
188         disposalFuture.awaitUninterruptibly();
189         disposed = true;
190     }
191 
192     /**
193      * Dispose the resources used by this {@link IoProcessor} for polling the
194      * client connections. The implementing class doDispose method will be
195      * called.
196      * 
197      * @throws Exception
198      *             if some low level IO error occurs
199      */
200     protected abstract void doDispose() throws Exception;
201 
202     /**
203      * poll those sessions for the given timeout
204      * 
205      * @param timeout
206      *            milliseconds before the call timeout if no event appear
207      * @return The number of session ready for read or for write
208      * @throws Exception
209      *             if some low level IO error occurs
210      */
211     protected abstract int select(long timeout) throws Exception;
212 
213     /**
214      * poll those sessions forever
215      * 
216      * @return The number of session ready for read or for write
217      * @throws Exception
218      *             if some low level IO error occurs
219      */
220     protected abstract int select() throws Exception;
221 
222     /**
223      * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
224      * is empty
225      * 
226      * @return <tt>true</tt> if at least a session is managed by this
227      *         {@link IoProcessor}
228      */
229     protected abstract boolean isSelectorEmpty();
230 
231     /**
232      * Interrupt the {@link #select(long)} call.
233      */
234     protected abstract void wakeup();
235 
236     /**
237      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
238      * {@link IoProcessor}
239      * 
240      * @return {@link Iterator} of {@link IoSession}
241      */
242     protected abstract Iterator<S> allSessions();
243     
244     /**
245      * Get the number of {@link IoSession} polled by this {@link IoProcessor}
246      *
247      * @return the number of sessions attached to this {@link IoProcessor}
248      */
249     protected abstract int allSessionsCount();
250 
251     /**
252      * Get an {@link Iterator} for the list of {@link IoSession} found selected
253      * by the last call of {@link #select(long)}
254      * 
255      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
256      */
257     protected abstract Iterator<S> selectedSessions();
258 
259     /**
260      * Get the state of a session (One of OPENING, OPEN, CLOSING)
261      * 
262      * @param session
263      *            the {@link IoSession} to inspect
264      * @return the state of the session
265      */
266     protected abstract SessionState getState(S session);
267 
268     /**
269      * Tells if the session ready for writing
270      * 
271      * @param session
272      *            the queried session
273      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
274      */
275     protected abstract boolean isWritable(S session);
276 
277     /**
278      * Tells if the session ready for reading
279      * 
280      * @param session
281      *            the queried session
282      * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
283      */
284     protected abstract boolean isReadable(S session);
285 
286     /**
287      * Set the session to be informed when a write event should be processed
288      * 
289      * @param session
290      *            the session for which we want to be interested in write events
291      * @param isInterested
292      *            <tt>true</tt> for registering, <tt>false</tt> for removing
293      * @throws Exception
294      *             If there was a problem while registering the session
295      */
296     protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
297 
298     /**
299      * Set the session to be informed when a read event should be processed
300      * 
301      * @param session
302      *            the session for which we want to be interested in read events
303      * @param isInterested
304      *            <tt>true</tt> for registering, <tt>false</tt> for removing
305      * @throws Exception
306      *             If there was a problem while registering the session
307      */
308     protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
309 
310     /**
311      * Tells if this session is registered for reading
312      * 
313      * @param session
314      *            the queried session
315      * @return <tt>true</tt> is registered for reading
316      */
317     protected abstract boolean isInterestedInRead(S session);
318 
319     /**
320      * Tells if this session is registered for writing
321      * 
322      * @param session
323      *            the queried session
324      * @return <tt>true</tt> is registered for writing
325      */
326     protected abstract boolean isInterestedInWrite(S session);
327 
328     /**
329      * Initialize the polling of a session. Add it to the polling process.
330      * 
331      * @param session
332      *            the {@link IoSession} to add to the polling
333      * @throws Exception
334      *             any exception thrown by the underlying system calls
335      */
336     protected abstract void init(S session) throws Exception;
337 
338     /**
339      * Destroy the underlying client socket handle
340      * 
341      * @param session
342      *            the {@link IoSession}
343      * @throws Exception
344      *             any exception thrown by the underlying system calls
345      */
346     protected abstract void destroy(S session) throws Exception;
347 
348     /**
349      * Reads a sequence of bytes from a {@link IoSession} into the given
350      * {@link IoBuffer}. Is called when the session was found ready for reading.
351      * 
352      * @param session
353      *            the session to read
354      * @param buf
355      *            the buffer to fill
356      * @return the number of bytes read
357      * @throws Exception
358      *             any exception thrown by the underlying system calls
359      */
360     protected abstract int read(S session, IoBuffer buf) throws Exception;
361 
362     /**
363      * Write a sequence of bytes to a {@link IoSession}, means to be called when
364      * a session was found ready for writing.
365      * 
366      * @param session
367      *            the session to write
368      * @param buf
369      *            the buffer to write
370      * @param length
371      *            the number of bytes to write can be superior to the number of
372      *            bytes remaining in the buffer
373      * @return the number of byte written
374      * @throws IOException
375      *             any exception thrown by the underlying system calls
376      */
377     protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
378 
379     /**
380      * Write a part of a file to a {@link IoSession}, if the underlying API
381      * isn't supporting system calls like sendfile(), you can throw a
382      * {@link UnsupportedOperationException} so the file will be send using
383      * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
384      * 
385      * @param session
386      *            the session to write
387      * @param region
388      *            the file region to write
389      * @param length
390      *            the length of the portion to send
391      * @return the number of written bytes
392      * @throws Exception
393      *             any exception thrown by the underlying system calls
394      */
395     protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
396 
397     /**
398      * {@inheritDoc}
399      */
400     @Override
401     public final void add(S session) {
402         if (disposed || disposing) {
403             throw new IllegalStateException("Already disposed.");
404         }
405 
406         // Adds the session to the newSession queue and starts the worker
407         newSessions.add(session);
408         startupProcessor();
409     }
410 
411     /**
412      * {@inheritDoc}
413      */
414     @Override
415     public final void remove(S session) {
416         scheduleRemove(session);
417         startupProcessor();
418     }
419 
420     private void scheduleRemove(S session) {
421         if (!removingSessions.contains(session)) {
422             removingSessions.add(session);
423         }
424     }
425 
426     /**
427      * {@inheritDoc}
428      */
429     @Override
430     public void write(S session, WriteRequest writeRequest) {
431         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
432 
433         writeRequestQueue.offer(session, writeRequest);
434 
435         if (!session.isWriteSuspended()) {
436             this.flush(session);
437         }
438     }
439 
440     /**
441      * {@inheritDoc}
442      */
443     @Override
444     public final void flush(S session) {
445         // add the session to the queue if it's not already
446         // in the queue, then wake up the select()
447         if (session.setScheduledForFlush(true)) {
448             flushingSessions.add(session);
449             wakeup();
450         }
451     }
452 
453     /**
454      * Updates the traffic mask for a given session
455      * 
456      * @param session
457      *            the session to update
458      */
459     public final void updateTrafficMask(S session) {
460         trafficControllingSessions.add(session);
461         wakeup();
462     }
463 
464     /**
465      * Starts the inner Processor, asking the executor to pick a thread in its
466      * pool. The Runnable will be renamed
467      */
468     private void startupProcessor() {
469         Processor processor = processorRef.get();
470 
471         if (processor == null) {
472             processor = new Processor();
473 
474             if (processorRef.compareAndSet(null, processor)) {
475                 executor.execute(new NamePreservingRunnable(processor, threadName));
476             }
477         }
478 
479         // Just stop the select() and start it again, so that the processor
480         // can be activated immediately.
481         wakeup();
482     }
483 
484     /**
485      * In the case we are using the java select() method, this method is used to
486      * trash the buggy selector and create a new one, registring all the sockets
487      * on it.
488      * 
489      * @throws IOException
490      *             If we got an exception
491      */
492     protected abstract void registerNewSelector() throws IOException;
493 
494     /**
495      * Check that the select() has not exited immediately just because of a
496      * broken connection. In this case, this is a standard case, and we just
497      * have to loop.
498      * 
499      * @return <tt>true</tt> if a connection has been brutally closed.
500      * @throws IOException
501      *             If we got an exception
502      */
503     protected abstract boolean isBrokenConnection() throws IOException;
504 
505     private void read(S session) {
506         IoSessionConfig config = session.getConfig();
507         int bufferSize = config.getReadBufferSize();
508         IoBuffer buf = IoBuffer.allocate(bufferSize);
509 
510         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
511 
512         try {
513             int readBytes = 0;
514             int ret;
515 
516             try {
517                 if (hasFragmentation) {
518 
519                     while ((ret = read(session, buf)) > 0) {
520                         readBytes += ret;
521 
522                         if (!buf.hasRemaining()) {
523                             break;
524                         }
525                     }
526                 } else {
527                     ret = read(session, buf);
528 
529                     if (ret > 0) {
530                         readBytes = ret;
531                     }
532                 }
533             } finally {
534                 buf.flip();
535             }
536 
537             if (readBytes > 0) {
538                 IoFilterChain filterChain = session.getFilterChain();
539                 filterChain.fireMessageReceived(buf);
540                 buf = null;
541 
542                 if (hasFragmentation) {
543                     if (readBytes << 1 < config.getReadBufferSize()) {
544                         session.decreaseReadBufferSize();
545                     } else if (readBytes == config.getReadBufferSize()) {
546                         session.increaseReadBufferSize();
547                     }
548                 }
549             } else {
550                 // release temporary buffer when read nothing
551                 buf.free(); 
552             }
553 
554             if (ret < 0) {
555                 IoFilterChain filterChain = session.getFilterChain();
556                 filterChain.fireInputClosed();
557             }
558         } catch (Exception e) {
559             if ((e instanceof IOException) &&
560                 (!(e instanceof PortUnreachableException)
561                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
562                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable())) {
563                 scheduleRemove(session);
564             }
565 
566             IoFilterChain filterChain = session.getFilterChain();
567             filterChain.fireExceptionCaught(e);
568         }
569     }
570 
571     /**
572      * {@inheritDoc}
573      */
574     @Override
575     public void updateTrafficControl(S session) {
576         //
577         try {
578             setInterestedInRead(session, !session.isReadSuspended());
579         } catch (Exception e) {
580             IoFilterChain filterChain = session.getFilterChain();
581             filterChain.fireExceptionCaught(e);
582         }
583 
584         try {
585             setInterestedInWrite(session,
586                     !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
587         } catch (Exception e) {
588             IoFilterChain filterChain = session.getFilterChain();
589             filterChain.fireExceptionCaught(e);
590         }
591     }
592 
593     /**
594      * The main loop. This is the place in charge to poll the Selector, and to
595      * process the active sessions. It's done in - handle the newly created
596      * sessions -
597      */
598     private class Processor implements Runnable {
599         /**
600          * {@inheritDoc}
601          */
602         @Override
603         public void run() {
604             assert processorRef.get() == this;
605 
606             lastIdleCheckTime = System.currentTimeMillis();
607             int nbTries = 10;
608 
609             for (;;) {
610                 try {
611                     // This select has a timeout so that we can manage
612                     // idle session when we get out of the select every
613                     // second. (note : this is a hack to avoid creating
614                     // a dedicated thread).
615                     long t0 = System.currentTimeMillis();
616                     int selected = select(SELECT_TIMEOUT);
617                     long t1 = System.currentTimeMillis();
618                     long delta = t1 - t0;
619 
620                     if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
621                         // Last chance : the select() may have been
622                         // interrupted because we have had an closed channel.
623                         if (isBrokenConnection()) {
624                             LOG.warn("Broken connection");
625                         } else {
626                             // Ok, we are hit by the nasty epoll
627                             // spinning.
628                             // Basically, there is a race condition
629                             // which causes a closing file descriptor not to be
630                             // considered as available as a selected channel,
631                             // but
632                             // it stopped the select. The next time we will
633                             // call select(), it will exit immediately for the
634                             // same
635                             // reason, and do so forever, consuming 100%
636                             // CPU.
637                             // We have to destroy the selector, and
638                             // register all the socket on a new one.
639                             if (nbTries == 0) {
640                                 LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
641                                 registerNewSelector();
642                                 nbTries = 10;
643                             } else {
644                                 nbTries--;
645                             }
646                         }
647                     } else {
648                         nbTries = 10;
649                     }
650                     
651                     // Manage newly created session first
652                     if(handleNewSessions() == 0) {
653                         // Get a chance to exit the infinite loop if there are no
654                         // more sessions on this Processor
655                         if (allSessionsCount() == 0) {
656                             processorRef.set(null);
657 
658                             if (newSessions.isEmpty() && isSelectorEmpty()) {
659                                 // newSessions.add() precedes startupProcessor
660                                 assert processorRef.get() != this;
661                                 break;
662                             }
663 
664                             assert processorRef.get() != this;
665 
666                             if (!processorRef.compareAndSet(null, this)) {
667                                 // startupProcessor won race, so must exit processor
668                                 assert processorRef.get() != this;
669                                 break;
670                             }
671 
672                             assert processorRef.get() == this;
673                         }
674                     }
675 
676                     updateTrafficMask();
677 
678                     // Now, if we have had some incoming or outgoing events,
679                     // deal with them
680                     if (selected > 0) {
681                         // LOG.debug("Processing ..."); // This log hurts one of
682                         // the MDCFilter test...
683                         process();
684                     }
685                     
686                     // Write the pending requests
687                     long currentTime = System.currentTimeMillis();
688                     flush(currentTime);
689                     
690                     // Last, not least, send Idle events to the idle sessions
691                     notifyIdleSessions(currentTime);
692                     
693                     // And manage removed sessions
694                     removeSessions();
695                     
696                     // Disconnect all sessions immediately if disposal has been
697                     // requested so that we exit this loop eventually.
698                     if (isDisposing()) {
699                         boolean hasKeys = false;
700 
701                         for (Iterator<S> i = allSessions(); i.hasNext();) {
702                             IoSession session = i.next();
703 
704                             scheduleRemove((S) session);
705 
706                             if (session.isActive()) {
707                                 hasKeys = true;
708                             }
709                         }
710 
711                         wakeup();
712                     }
713                 } catch (ClosedSelectorException cse) {
714                     // If the selector has been closed, we can exit the loop
715                     // But first, dump a stack trace
716                     ExceptionMonitor.getInstance().exceptionCaught(cse);
717                     break;
718                 } catch (Exception e) {
719                     ExceptionMonitor.getInstance().exceptionCaught(e);
720 
721                     try {
722                         Thread.sleep(1000);
723                     } catch (InterruptedException e1) {
724                         ExceptionMonitor.getInstance().exceptionCaught(e1);
725                     }
726                 }
727             }
728 
729             try {
730                 synchronized (disposalLock) {
731                     if (disposing) {
732                         doDispose();
733                     }
734                 }
735             } catch (Exception e) {
736                 ExceptionMonitor.getInstance().exceptionCaught(e);
737             } finally {
738                 disposalFuture.setValue(true);
739             }
740         }
741 
742         /**
743          * Loops over the new sessions blocking queue and returns the number of
744          * sessions which are effectively created
745          * 
746          * @return The number of new sessions
747          */
748         private int handleNewSessions() {
749             int addedSessions = 0;
750 
751             for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
752                 if (addNow(session)) {
753                     // A new session has been created
754                     addedSessions++;
755                 }
756             }
757 
758             return addedSessions;
759         }
760 
761         private void notifyIdleSessions(long currentTime) throws Exception {
762             // process idle sessions
763             if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
764                 lastIdleCheckTime = currentTime;
765                 AbstractIoSession.notifyIdleness(allSessions(), currentTime);
766             }
767         }
768 
769         /**
770          * Update the trafficControl for all the session.
771          */
772         private void updateTrafficMask() {
773             int queueSize = trafficControllingSessions.size();
774 
775             while (queueSize > 0) {
776                 S session = trafficControllingSessions.poll();
777 
778                 if (session == null) {
779                     // We are done with this queue.
780                     return;
781                 }
782 
783                 SessionState state = getState(session);
784 
785                 switch (state) {
786                 case OPENED:
787                     updateTrafficControl(session);
788 
789                     break;
790 
791                 case CLOSING:
792                     break;
793 
794                 case OPENING:
795                     // Retry later if session is not yet fully initialized.
796                     // (In case that Session.suspend??() or session.resume??() is
797                     // called before addSession() is processed)
798                     // We just put back the session at the end of the queue.
799                     trafficControllingSessions.add(session);
800                     break;
801 
802                 default:
803                     throw new IllegalStateException(String.valueOf(state));
804                 }
805 
806                 // As we have handled one session, decrement the number of
807                 // remaining sessions. The OPENING session will be processed
808                 // with the next select(), as the queue size has been decreased,
809                 // even
810                 // if the session has been pushed at the end of the queue
811                 queueSize--;
812             }
813         }
814 
815         /**
816          * Process a new session : - initialize it - create its chain - fire the
817          * CREATED listeners if any
818          * 
819          * @param session
820          *            The session to create
821          * @return <tt>true</tt> if the session has been registered
822          */
823         private boolean addNow(S session) {
824             boolean registered = false;
825 
826             try {
827                 init(session);
828                 registered = true;
829 
830                 // Build the filter chain of this session.
831                 IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
832                 chainBuilder.buildFilterChain(session.getFilterChain());
833 
834                 // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
835                 // in AbstractIoFilterChain.fireSessionOpened().
836                 // Propagate the SESSION_CREATED event up to the chain
837                 IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
838                 listeners.fireSessionCreated(session);
839             } catch (Exception e) {
840                 ExceptionMonitor.getInstance().exceptionCaught(e);
841 
842                 try {
843                     destroy(session);
844                 } catch (Exception e1) {
845                     ExceptionMonitor.getInstance().exceptionCaught(e1);
846                 } finally {
847                     registered = false;
848                 }
849             }
850 
851             return registered;
852         }
853 
854         private int removeSessions() {
855             int removedSessions = 0;
856 
857             for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
858                 SessionState state = getState(session);
859 
860                 // Now deal with the removal accordingly to the session's state
861                 switch (state) {
862                 case OPENED:
863                     // Try to remove this session
864                     if (removeNow(session)) {
865                         removedSessions++;
866                     }
867 
868                     break;
869 
870                 case CLOSING:
871                     // Skip if channel is already closed
872                     // In any case, remove the session from the queue
873                     removedSessions++;
874                     break;
875 
876                 case OPENING:
877                     // Remove session from the newSessions queue and
878                     // remove it
879                     newSessions.remove(session);
880 
881                     if (removeNow(session)) {
882                         removedSessions++;
883                     }
884 
885                     break;
886 
887                 default:
888                     throw new IllegalStateException(String.valueOf(state));
889                 }
890             }
891 
892             return removedSessions;
893         }
894 
895         /**
896          * Write all the pending messages
897          */
898         private void flush(long currentTime) {
899             if (flushingSessions.isEmpty()) {
900                 return;
901             }
902 
903             do {
904                 S session = flushingSessions.poll(); // the same one with
905                                                      // firstSession
906 
907                 if (session == null) {
908                     // Just in case ... It should not happen.
909                     break;
910                 }
911 
912                 // Reset the Schedule for flush flag for this session,
913                 // as we are flushing it now.  This allows another thread
914                 // to enqueue data to be written without corrupting the
915                 // selector interest state.
916                 session.unscheduledForFlush();
917 
918                 SessionState state = getState(session);
919 
920                 switch (state) {
921                 case OPENED:
922                     try {
923                         boolean flushedAll = flushNow(session, currentTime);
924 
925                         if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
926                                 && !session.isScheduledForFlush()) {
927                             scheduleFlush(session);
928                         }
929                     } catch (Exception e) {
930                         scheduleRemove(session);
931                         session.closeNow();
932                         IoFilterChain filterChain = session.getFilterChain();
933                         filterChain.fireExceptionCaught(e);
934                     }
935 
936                     break;
937 
938                 case CLOSING:
939                     // Skip if the channel is already closed.
940                     break;
941 
942                 case OPENING:
943                     // Retry later if session is not yet fully initialized.
944                     // (In case that Session.write() is called before addSession()
945                     // is processed)
946                     scheduleFlush(session);
947                     return;
948 
949                 default:
950                     throw new IllegalStateException(String.valueOf(state));
951                 }
952 
953             } while (!flushingSessions.isEmpty());
954         }
955 
956         private boolean flushNow(S session, long currentTime) {
957             if (!session.isConnected()) {
958                 scheduleRemove(session);
959                 return false;
960             }
961 
962             final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
963 
964             final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
965 
966             // Set limitation for the number of written bytes for read-write
967             // fairness. I used maxReadBufferSize * 3 / 2, which yields best
968             // performance in my experience while not breaking fairness much.
969             final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
970                     + (session.getConfig().getMaxReadBufferSize() >>> 1);
971             int writtenBytes = 0;
972             WriteRequest req = null;
973 
974             try {
975                 // Clear OP_WRITE
976                 setInterestedInWrite(session, false);
977 
978                 do {
979                     // Check for pending writes.
980                     req = session.getCurrentWriteRequest();
981 
982                     if (req == null) {
983                         req = writeRequestQueue.poll(session);
984 
985                         if (req == null) {
986                             break;
987                         }
988 
989                         session.setCurrentWriteRequest(req);
990                     }
991 
992                     int localWrittenBytes;
993                     Object message = req.getMessage();
994 
995                     if (message instanceof IoBuffer) {
996                         localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
997                                 currentTime);
998 
999                         if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
1000                             // the buffer isn't empty, we re-interest it in writing
1001                             setInterestedInWrite(session, true);
1002 
1003                             return false;
1004                         }
1005                     } else if (message instanceof FileRegion) {
1006                         localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
1007                                 currentTime);
1008 
1009                         // Fix for Java bug on Linux
1010                         // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
1011                         // If there's still data to be written in the FileRegion,
1012                         // return 0 indicating that we need
1013                         // to pause until writing may resume.
1014                         if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
1015                             setInterestedInWrite(session, true);
1016 
1017                             return false;
1018                         }
1019                     } else {
1020                         throw new IllegalStateException("Don't know how to handle message of type '"
1021                                 + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
1022                     }
1023 
1024                     if (localWrittenBytes == 0) {
1025 
1026                         // Kernel buffer is full.
1027                         if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
1028                             setInterestedInWrite(session, true);
1029                             return false;
1030                         }
1031                     } else {
1032                         writtenBytes += localWrittenBytes;
1033 
1034                         if (writtenBytes >= maxWrittenBytes) {
1035                             // Wrote too much
1036                             scheduleFlush(session);
1037                             return false;
1038                         }
1039                     }
1040 
1041                     if (message instanceof IoBuffer) {
1042                         ((IoBuffer) message).free();
1043                     }
1044                 } while (writtenBytes < maxWrittenBytes);
1045             } catch (Exception e) {
1046                 if (req != null) {
1047                     req.getFuture().setException(e);
1048                 }
1049 
1050                 IoFilterChain filterChain = session.getFilterChain();
1051                 filterChain.fireExceptionCaught(e);
1052                 return false;
1053             }
1054 
1055             return true;
1056         }
1057 
1058         private void scheduleFlush(S session) {
1059             // add the session to the queue if it's not already
1060             // in the queue
1061             if (session.setScheduledForFlush(true)) {
1062                 flushingSessions.add(session);
1063             }
1064         }
1065 
1066         private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
1067                 throws Exception {
1068             int localWrittenBytes;
1069             FileRegion./../../../org/apache/mina/core/file/FileRegion.html#FileRegion">FileRegion region = (FileRegion) req.getMessage();
1070 
1071             if (region.getRemainingBytes() > 0) {
1072                 int length;
1073 
1074                 if (hasFragmentation) {
1075                     length = (int) Math.min(region.getRemainingBytes(), maxLength);
1076                 } else {
1077                     length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
1078                 }
1079 
1080                 localWrittenBytes = transferFile(session, region, length);
1081                 region.update(localWrittenBytes);
1082             } else {
1083                 localWrittenBytes = 0;
1084             }
1085 
1086             session.increaseWrittenBytes(localWrittenBytes, currentTime);
1087 
1088             if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
1089                 fireMessageSent(session, req);
1090             }
1091 
1092             return localWrittenBytes;
1093         }
1094 
1095         private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime) throws Exception {
1096             IoBuffer"../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) req.getMessage();
1097             int localWrittenBytes = 0;
1098 
1099             if (buf.hasRemaining()) {
1100                 int length;
1101 
1102                 if (hasFragmentation) {
1103                     length = Math.min(buf.remaining(), maxLength);
1104                 } else {
1105                     length = buf.remaining();
1106                 }
1107 
1108                 try {
1109                     localWrittenBytes = write(session, buf, length);
1110                 } catch (IOException ioe) {
1111                     // We have had an issue while trying to send data to the
1112                     // peer : let's close the session.
1113                     buf.free();
1114                     session.closeNow();
1115                     this.removeNow(session);
1116 
1117                     return 0;
1118                 }
1119 
1120                 session.increaseWrittenBytes(localWrittenBytes, currentTime);
1121 
1122                 // Now, forward the original message if it has been fully sent
1123                 if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
1124                     this.fireMessageSent(session, req);
1125                 }
1126             } else {
1127                 this.fireMessageSent(session, req);
1128             }
1129 
1130             return localWrittenBytes;
1131         }
1132 
1133         private boolean removeNow(S session) {
1134             clearWriteRequestQueue(session);
1135 
1136             try {
1137                 destroy(session);
1138                 return true;
1139             } catch (Exception e) {
1140                 IoFilterChain filterChain = session.getFilterChain();
1141                 filterChain.fireExceptionCaught(e);
1142             } finally {
1143                 try {
1144                     ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
1145                 } catch (Exception e) {
1146                     // The session was either destroyed or not at this point.
1147                     // We do not want any exception thrown from this "cleanup" code
1148                     // to change
1149                     // the return value by bubbling up.
1150                     IoFilterChain filterChain = session.getFilterChain();
1151                     filterChain.fireExceptionCaught(e);
1152                 } finally {
1153                     clearWriteRequestQueue(session);
1154                 }
1155             }
1156 
1157             return false;
1158         }
1159 
1160         private void clearWriteRequestQueue(S session) {
1161             WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
1162             WriteRequest req;
1163 
1164             List<WriteRequest> failedRequests = new ArrayList<>();
1165 
1166             if ((req = writeRequestQueue.poll(session)) != null) {
1167                 Object message = req.getMessage();
1168 
1169                 if (message instanceof IoBuffer) {
1170                     IoBuffer"../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) message;
1171 
1172                     // The first unwritten empty buffer must be
1173                     // forwarded to the filter chain.
1174                     if (buf.hasRemaining()) {
1175                         failedRequests.add(req);
1176                     } else {
1177                         IoFilterChain filterChain = session.getFilterChain();
1178                         filterChain.fireMessageSent(req);
1179                     }
1180                 } else {
1181                     failedRequests.add(req);
1182                 }
1183 
1184                 // Discard others.
1185                 while ((req = writeRequestQueue.poll(session)) != null) {
1186                     failedRequests.add(req);
1187                 }
1188             }
1189 
1190             // Create an exception and notify.
1191             if (!failedRequests.isEmpty()) {
1192                 WriteToClosedSessionExceptionException.html#WriteToClosedSessionException">WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
1193 
1194                 for (WriteRequest r : failedRequests) {
1195                     session.decreaseScheduledBytesAndMessages(r);
1196                     r.getFuture().setException(cause);
1197                 }
1198 
1199                 IoFilterChain filterChain = session.getFilterChain();
1200                 filterChain.fireExceptionCaught(cause);
1201             }
1202         }
1203 
1204         private void fireMessageSent(S session, WriteRequest req) {
1205             session.setCurrentWriteRequest(null);
1206             IoFilterChain filterChain = session.getFilterChain();
1207             filterChain.fireMessageSent(req);
1208         }
1209         
1210         private void process() throws Exception {
1211             for (Iterator<S> i = selectedSessions(); i.hasNext();) {
1212                 S session = i.next();
1213                 process(session);
1214                 i.remove();
1215             }
1216         }
1217 
1218         /**
1219          * Deal with session ready for the read or write operations, or both.
1220          */
1221         private void process(S session) {
1222             // Process Reads
1223             if (isReadable(session) && !session.isReadSuspended()) {
1224                 read(session);
1225             }
1226 
1227             // Process writes
1228             if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
1229                 // add the session to the queue, if it's not already there
1230                 flushingSessions.add(session);
1231             }
1232         }
1233     }
1234 }