001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.polling;
021
022import java.io.IOException;
023import java.net.PortUnreachableException;
024import java.nio.channels.ClosedSelectorException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Queue;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.Executor;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicReference;
035
036import org.apache.mina.core.buffer.IoBuffer;
037import org.apache.mina.core.file.FileRegion;
038import org.apache.mina.core.filterchain.IoFilterChain;
039import org.apache.mina.core.filterchain.IoFilterChainBuilder;
040import org.apache.mina.core.future.DefaultIoFuture;
041import org.apache.mina.core.service.AbstractIoService;
042import org.apache.mina.core.service.IoProcessor;
043import org.apache.mina.core.service.IoServiceListenerSupport;
044import org.apache.mina.core.session.AbstractIoSession;
045import org.apache.mina.core.session.IoSession;
046import org.apache.mina.core.session.IoSessionConfig;
047import org.apache.mina.core.session.SessionState;
048import org.apache.mina.core.write.WriteRequest;
049import org.apache.mina.core.write.WriteRequestQueue;
050import org.apache.mina.core.write.WriteToClosedSessionException;
051import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
052import org.apache.mina.util.ExceptionMonitor;
053import org.apache.mina.util.NamePreservingRunnable;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * An abstract implementation of {@link IoProcessor} which helps transport
059 * developers to write an {@link IoProcessor} easily. This class is in charge of
060 * active polling a set of {@link IoSession} and trigger events when some I/O
061 * operation is possible.
062 * 
063 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
064 * 
065 * @param <S>
066 *            the type of the {@link IoSession} this processor can handle
067 */
068public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
069    /** A logger for this class */
070    private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
071
072    /**
073     * A timeout used for the select, as we need to get out to deal with idle
074     * sessions
075     */
076    private static final long SELECT_TIMEOUT = 1000L;
077
078    /** A map containing the last Thread ID for each class */
079    private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
080
081    /** This IoProcessor instance name */
082    private final String threadName;
083
084    /** The executor to use when we need to start the inner Processor */
085    private final Executor executor;
086
087    /** A Session queue containing the newly created sessions */
088    private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
089
090    /** A queue used to store the sessions to be removed */
091    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
092
093    /** A queue used to store the sessions to be flushed */
094    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
095
096    /**
097     * A queue used to store the sessions which have a trafficControl to be
098     * updated
099     */
100    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
101
102    /** The processor thread : it handles the incoming messages */
103    private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
104
105    private long lastIdleCheckTime;
106
107    private final Object disposalLock = new Object();
108
109    private volatile boolean disposing;
110
111    private volatile boolean disposed;
112
113    private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114
115    protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116
117    /**
118     * Create an {@link AbstractPollingIoProcessor} with the given
119     * {@link Executor} for handling I/Os events.
120     * 
121     * @param executor
122     *            the {@link Executor} for handling I/O events
123     */
124    protected AbstractPollingIoProcessor(Executor executor) {
125        if (executor == null) {
126            throw new IllegalArgumentException("executor");
127        }
128
129        this.threadName = nextThreadName();
130        this.executor = executor;
131    }
132
133    /**
134     * Compute the thread ID for this class instance. As we may have different
135     * classes, we store the last ID number into a Map associating the class
136     * name to the last assigned ID.
137     * 
138     * @return a name for the current thread, based on the class name and an
139     *         incremental value, starting at 1.
140     */
141    private String nextThreadName() {
142        Class<?> cls = getClass();
143        int newThreadId;
144
145        AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146
147        if (threadId == null) {
148            newThreadId = 1;
149        } else {
150            // Just increment the last ID, and get it.
151            newThreadId = threadId.incrementAndGet();
152        }
153
154        // Now we can compute the name for this thread
155        return cls.getSimpleName() + '-' + newThreadId;
156    }
157
158    /**
159     * {@inheritDoc}
160     */
161    public final boolean isDisposing() {
162        return disposing;
163    }
164
165    /**
166     * {@inheritDoc}
167     */
168    public final boolean isDisposed() {
169        return disposed;
170    }
171
172    /**
173     * {@inheritDoc}
174     */
175    public final void dispose() {
176        if (disposed || disposing) {
177            return;
178        }
179
180        synchronized (disposalLock) {
181            disposing = true;
182            startupProcessor();
183        }
184
185        disposalFuture.awaitUninterruptibly();
186        disposed = true;
187    }
188
189    /**
190     * Dispose the resources used by this {@link IoProcessor} for polling the
191     * client connections. The implementing class doDispose method will be
192     * called.
193     * 
194     * @throws Exception
195     *             if some low level IO error occurs
196     */
197    protected abstract void doDispose() throws Exception;
198
199    /**
200     * poll those sessions for the given timeout
201     * 
202     * @param timeout
203     *            milliseconds before the call timeout if no event appear
204     * @return The number of session ready for read or for write
205     * @throws Exception
206     *             if some low level IO error occurs
207     */
208    protected abstract int select(long timeout) throws Exception;
209
210    /**
211     * poll those sessions forever
212     * 
213     * @return The number of session ready for read or for write
214     * @throws Exception
215     *             if some low level IO error occurs
216     */
217    protected abstract int select() throws Exception;
218
219    /**
220     * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
221     * is empty
222     * 
223     * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor}
224     */
225    protected abstract boolean isSelectorEmpty();
226
227    /**
228     * Interrupt the {@link #select(long)} call.
229     */
230    protected abstract void wakeup();
231
232    /**
233     * Get an {@link Iterator} for the list of {@link IoSession} polled by this
234     * {@link IoProcessor}
235     * 
236     * @return {@link Iterator} of {@link IoSession}
237     */
238    protected abstract Iterator<S> allSessions();
239
240    /**
241     * Get an {@link Iterator} for the list of {@link IoSession} found selected
242     * by the last call of {@link #select(long)}
243     * 
244     * @return {@link Iterator} of {@link IoSession} read for I/Os operation
245     */
246    protected abstract Iterator<S> selectedSessions();
247
248    /**
249     * Get the state of a session (One of OPENING, OPEN, CLOSING)
250     * 
251     * @param session the {@link IoSession} to inspect
252     * @return the state of the session
253     */
254    protected abstract SessionState getState(S session);
255
256    /**
257     * Tells if the session ready for writing
258     * 
259     * @param session the queried session
260     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
261     */
262    protected abstract boolean isWritable(S session);
263
264    /**
265     * Tells if the session ready for reading
266     * 
267     * @param session the queried session
268     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
269     */
270    protected abstract boolean isReadable(S session);
271
272    /**
273     * Set the session to be informed when a write event should be processed
274     * 
275     * @param session the session for which we want to be interested in write events
276     * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
277     * @throws Exception If there was a problem while registering the session 
278     */
279    protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
280
281    /**
282     * Set the session to be informed when a read event should be processed
283     * 
284     * @param session the session for which we want to be interested in read events
285     * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
286     * @throws Exception If there was a problem while registering the session 
287     */
288    protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
289
290    /**
291     * Tells if this session is registered for reading
292     * 
293     * @param session the queried session
294     * @return <tt>true</tt> is registered for reading
295     */
296    protected abstract boolean isInterestedInRead(S session);
297
298    /**
299     * Tells if this session is registered for writing
300     * 
301     * @param session the queried session
302     * @return <tt>true</tt> is registered for writing
303     */
304    protected abstract boolean isInterestedInWrite(S session);
305
306    /**
307     * Initialize the polling of a session. Add it to the polling process.
308     * 
309     * @param session the {@link IoSession} to add to the polling
310     * @throws Exception any exception thrown by the underlying system calls
311     */
312    protected abstract void init(S session) throws Exception;
313
314    /**
315     * Destroy the underlying client socket handle
316     * 
317     * @param session the {@link IoSession}
318     * @throws Exception any exception thrown by the underlying system calls
319     */
320    protected abstract void destroy(S session) throws Exception;
321
322    /**
323     * Reads a sequence of bytes from a {@link IoSession} into the given
324     * {@link IoBuffer}. Is called when the session was found ready for reading.
325     * 
326     * @param session the session to read
327     * @param buf the buffer to fill
328     * @return the number of bytes read
329     * @throws Exception any exception thrown by the underlying system calls
330     */
331    protected abstract int read(S session, IoBuffer buf) throws Exception;
332
333    /**
334     * Write a sequence of bytes to a {@link IoSession}, means to be called when
335     * a session was found ready for writing.
336     * 
337     * @param session the session to write
338     * @param buf the buffer to write
339     * @param length the number of bytes to write can be superior to the number of
340     *            bytes remaining in the buffer
341     * @return the number of byte written
342     * @throws Exception any exception thrown by the underlying system calls
343     */
344    protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
345
346    /**
347     * Write a part of a file to a {@link IoSession}, if the underlying API
348     * isn't supporting system calls like sendfile(), you can throw a
349     * {@link UnsupportedOperationException} so the file will be send using
350     * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
351     * 
352     * @param session the session to write
353     * @param region the file region to write
354     * @param length the length of the portion to send
355     * @return the number of written bytes
356     * @throws Exception any exception thrown by the underlying system calls
357     */
358    protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
359
360    /**
361     * {@inheritDoc}
362     */
363    public final void add(S session) {
364        if (disposed || disposing) {
365            throw new IllegalStateException("Already disposed.");
366        }
367
368        // Adds the session to the newSession queue and starts the worker
369        newSessions.add(session);
370        startupProcessor();
371    }
372
373    /**
374     * {@inheritDoc}
375     */
376    public final void remove(S session) {
377        scheduleRemove(session);
378        startupProcessor();
379    }
380
381    private void scheduleRemove(S session) {
382        if (!removingSessions.contains(session)) {
383            removingSessions.add(session);
384        }
385    }
386
387    /**
388     * {@inheritDoc}
389     */
390    public void write(S session, WriteRequest writeRequest) {
391        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
392
393        writeRequestQueue.offer(session, writeRequest);
394
395        if (!session.isWriteSuspended()) {
396            this.flush(session);
397        }
398    }
399
400    /**
401     * {@inheritDoc}
402     */
403    public final void flush(S session) {
404        // add the session to the queue if it's not already
405        // in the queue, then wake up the select()
406        if (session.setScheduledForFlush(true)) {
407            flushingSessions.add(session);
408            wakeup();
409        }
410    }
411
412    private void scheduleFlush(S session) {
413        // add the session to the queue if it's not already
414        // in the queue
415        if (session.setScheduledForFlush(true)) {
416            flushingSessions.add(session);
417        }
418    }
419
420    /**
421     * Updates the traffic mask for a given session
422     * 
423     * @param session the session to update
424     */
425    public final void updateTrafficMask(S session) {
426        trafficControllingSessions.add(session);
427        wakeup();
428    }
429
430    /**
431     * Starts the inner Processor, asking the executor to pick a thread in its
432     * pool. The Runnable will be renamed
433     */
434    private void startupProcessor() {
435        Processor processor = processorRef.get();
436
437        if (processor == null) {
438            processor = new Processor();
439
440            if (processorRef.compareAndSet(null, processor)) {
441                executor.execute(new NamePreservingRunnable(processor, threadName));
442            }
443        }
444
445        // Just stop the select() and start it again, so that the processor
446        // can be activated immediately.
447        wakeup();
448    }
449
450    /**
451     * In the case we are using the java select() method, this method is used to
452     * trash the buggy selector and create a new one, registring all the sockets
453     * on it.
454     * 
455     * @throws IOException If we got an exception
456     */
457    abstract protected void registerNewSelector() throws IOException;
458
459    /**
460     * Check that the select() has not exited immediately just because of a
461     * broken connection. In this case, this is a standard case, and we just
462     * have to loop.
463     * 
464     * @return <tt>true</tt> if a connection has been brutally closed.
465     * @throws IOException If we got an exception
466     */
467    abstract protected boolean isBrokenConnection() throws IOException;
468
469    /**
470     * Loops over the new sessions blocking queue and returns the number of
471     * sessions which are effectively created
472     * 
473     * @return The number of new sessions
474     */
475    private int handleNewSessions() {
476        int addedSessions = 0;
477
478        for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
479            if (addNow(session)) {
480                // A new session has been created
481                addedSessions++;
482            }
483        }
484
485        return addedSessions;
486    }
487
488    /**
489     * Process a new session : - initialize it - create its chain - fire the
490     * CREATED listeners if any
491     * 
492     * @param session The session to create
493     * @return <tt>true</tt> if the session has been registered
494     */
495    private boolean addNow(S session) {
496        boolean registered = false;
497
498        try {
499            init(session);
500            registered = true;
501
502            // Build the filter chain of this session.
503            IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
504            chainBuilder.buildFilterChain(session.getFilterChain());
505
506            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
507            // in AbstractIoFilterChain.fireSessionOpened().
508            // Propagate the SESSION_CREATED event up to the chain
509            IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
510            listeners.fireSessionCreated(session);
511        } catch (Exception e) {
512            ExceptionMonitor.getInstance().exceptionCaught(e);
513
514            try {
515                destroy(session);
516            } catch (Exception e1) {
517                ExceptionMonitor.getInstance().exceptionCaught(e1);
518            } finally {
519                registered = false;
520            }
521        }
522
523        return registered;
524    }
525
526    private int removeSessions() {
527        int removedSessions = 0;
528
529        for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
530            SessionState state = getState(session);
531
532            // Now deal with the removal accordingly to the session's state
533            switch (state) {
534                case OPENED:
535                    // Try to remove this session
536                    if (removeNow(session)) {
537                        removedSessions++;
538                    }
539                    
540                    break;
541    
542                case CLOSING:
543                    // Skip if channel is already closed
544                    // In any case, remove the session from the queue
545                    removedSessions++;
546                    break;
547    
548                case OPENING:
549                    // Remove session from the newSessions queue and
550                    // remove it
551                    newSessions.remove(session);
552    
553                    if (removeNow(session)) {
554                        removedSessions++;
555                    }
556    
557                    break;
558    
559                default:
560                    throw new IllegalStateException(String.valueOf(state));
561            }
562        }
563
564        return removedSessions;
565    }
566
567    private boolean removeNow(S session) {
568        clearWriteRequestQueue(session);
569
570        try {
571            destroy(session);
572            return true;
573        } catch (Exception e) {
574            IoFilterChain filterChain = session.getFilterChain();
575            filterChain.fireExceptionCaught(e);
576        } finally {
577            try {
578                clearWriteRequestQueue(session);
579                ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
580            } catch (Exception e) {
581                // The session was either destroyed or not at this point.
582                // We do not want any exception thrown from this "cleanup" code to change
583                // the return value by bubbling up.
584                IoFilterChain filterChain = session.getFilterChain();
585                filterChain.fireExceptionCaught(e);
586            }
587        }
588        
589        return false;
590    }
591
592    private void clearWriteRequestQueue(S session) {
593        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
594        WriteRequest req;
595
596        List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
597
598        if ((req = writeRequestQueue.poll(session)) != null) {
599            Object message = req.getMessage();
600
601            if (message instanceof IoBuffer) {
602                IoBuffer buf = (IoBuffer) message;
603
604                // The first unwritten empty buffer must be
605                // forwarded to the filter chain.
606                if (buf.hasRemaining()) {
607                    buf.reset();
608                    failedRequests.add(req);
609                } else {
610                    IoFilterChain filterChain = session.getFilterChain();
611                    filterChain.fireMessageSent(req);
612                }
613            } else {
614                failedRequests.add(req);
615            }
616
617            // Discard others.
618            while ((req = writeRequestQueue.poll(session)) != null) {
619                failedRequests.add(req);
620            }
621        }
622
623        // Create an exception and notify.
624        if (!failedRequests.isEmpty()) {
625            WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
626
627            for (WriteRequest r : failedRequests) {
628                session.decreaseScheduledBytesAndMessages(r);
629                r.getFuture().setException(cause);
630            }
631
632            IoFilterChain filterChain = session.getFilterChain();
633            filterChain.fireExceptionCaught(cause);
634        }
635    }
636
637    private void process() throws Exception {
638        for (Iterator<S> i = selectedSessions(); i.hasNext();) {
639            S session = i.next();
640            process(session);
641            i.remove();
642        }
643    }
644
645    /**
646     * Deal with session ready for the read or write operations, or both.
647     */
648    private void process(S session) {
649        // Process Reads
650        if (isReadable(session) && !session.isReadSuspended()) {
651            read(session);
652        }
653
654        // Process writes
655        if (isWritable(session) && !session.isWriteSuspended()) {
656            // add the session to the queue, if it's not already there
657            if (session.setScheduledForFlush(true)) {
658                flushingSessions.add(session);
659            }
660        }
661    }
662
663    private void read(S session) {
664        IoSessionConfig config = session.getConfig();
665        int bufferSize = config.getReadBufferSize();
666        IoBuffer buf = IoBuffer.allocate(bufferSize);
667
668        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
669
670        try {
671            int readBytes = 0;
672            int ret;
673
674            try {
675                if (hasFragmentation) {
676
677                    while ((ret = read(session, buf)) > 0) {
678                        readBytes += ret;
679
680                        if (!buf.hasRemaining()) {
681                            break;
682                        }
683                    }
684                } else {
685                    ret = read(session, buf);
686
687                    if (ret > 0) {
688                        readBytes = ret;
689                    }
690                }
691            } finally {
692                buf.flip();
693            }
694
695            if (readBytes > 0) {
696                IoFilterChain filterChain = session.getFilterChain();
697                filterChain.fireMessageReceived(buf);
698                buf = null;
699
700                if (hasFragmentation) {
701                    if (readBytes << 1 < config.getReadBufferSize()) {
702                        session.decreaseReadBufferSize();
703                    } else if (readBytes == config.getReadBufferSize()) {
704                        session.increaseReadBufferSize();
705                    }
706                }
707            }
708
709            if (ret < 0) {
710                // scheduleRemove(session);
711                IoFilterChain filterChain = session.getFilterChain();
712                filterChain.fireInputClosed();
713            }
714        } catch (Exception e) {
715            if (e instanceof IOException) {
716                if (!(e instanceof PortUnreachableException)
717                        || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
718                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
719                    scheduleRemove(session);
720                }
721            }
722
723            IoFilterChain filterChain = session.getFilterChain();
724            filterChain.fireExceptionCaught(e);
725        }
726    }
727
728    private void notifyIdleSessions(long currentTime) throws Exception {
729        // process idle sessions
730        if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
731            lastIdleCheckTime = currentTime;
732            AbstractIoSession.notifyIdleness(allSessions(), currentTime);
733        }
734    }
735
736    /**
737     * Write all the pending messages
738     */
739    private void flush(long currentTime) {
740        if (flushingSessions.isEmpty()) {
741            return;
742        }
743
744        do {
745            S session = flushingSessions.poll(); // the same one with
746                                                 // firstSession
747
748            if (session == null) {
749                // Just in case ... It should not happen.
750                break;
751            }
752
753            // Reset the Schedule for flush flag for this session,
754            // as we are flushing it now
755            session.unscheduledForFlush();
756
757            SessionState state = getState(session);
758
759            switch (state) {
760            case OPENED:
761                try {
762                    boolean flushedAll = flushNow(session, currentTime);
763
764                    if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
765                            && !session.isScheduledForFlush()) {
766                        scheduleFlush(session);
767                    }
768                } catch (Exception e) {
769                    scheduleRemove(session);
770                    session.close(true);
771                    IoFilterChain filterChain = session.getFilterChain();
772                    filterChain.fireExceptionCaught(e);
773                }
774
775                break;
776
777            case CLOSING:
778                // Skip if the channel is already closed.
779                break;
780
781            case OPENING:
782                // Retry later if session is not yet fully initialized.
783                // (In case that Session.write() is called before addSession()
784                // is processed)
785                scheduleFlush(session);
786                return;
787
788            default:
789                throw new IllegalStateException(String.valueOf(state));
790            }
791
792        } while (!flushingSessions.isEmpty());
793    }
794
795    private boolean flushNow(S session, long currentTime) {
796        if (!session.isConnected()) {
797            scheduleRemove(session);
798            return false;
799        }
800
801        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
802
803        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
804
805        // Set limitation for the number of written bytes for read-write
806        // fairness. I used maxReadBufferSize * 3 / 2, which yields best
807        // performance in my experience while not breaking fairness much.
808        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
809                + (session.getConfig().getMaxReadBufferSize() >>> 1);
810        int writtenBytes = 0;
811        WriteRequest req = null;
812
813        try {
814            // Clear OP_WRITE
815            setInterestedInWrite(session, false);
816
817            do {
818                // Check for pending writes.
819                req = session.getCurrentWriteRequest();
820
821                if (req == null) {
822                    req = writeRequestQueue.poll(session);
823
824                    if (req == null) {
825                        break;
826                    }
827
828                    session.setCurrentWriteRequest(req);
829                }
830
831                int localWrittenBytes = 0;
832                Object message = req.getMessage();
833
834                if (message instanceof IoBuffer) {
835                    localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
836                            currentTime);
837
838                    if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
839                        // the buffer isn't empty, we re-interest it in writing
840                        writtenBytes += localWrittenBytes;
841                        setInterestedInWrite(session, true);
842                        return false;
843                    }
844                } else if (message instanceof FileRegion) {
845                    localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
846                            currentTime);
847
848                    // Fix for Java bug on Linux
849                    // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
850                    // If there's still data to be written in the FileRegion,
851                    // return 0 indicating that we need
852                    // to pause until writing may resume.
853                    if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
854                        writtenBytes += localWrittenBytes;
855                        setInterestedInWrite(session, true);
856                        return false;
857                    }
858                } else {
859                    throw new IllegalStateException("Don't know how to handle message of type '"
860                            + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
861                }
862
863                if (localWrittenBytes == 0) {
864                    // Kernel buffer is full.
865                    setInterestedInWrite(session, true);
866                    return false;
867                }
868
869                writtenBytes += localWrittenBytes;
870
871                if (writtenBytes >= maxWrittenBytes) {
872                    // Wrote too much
873                    scheduleFlush(session);
874                    return false;
875                }
876
877                if (message instanceof IoBuffer) {
878                    ((IoBuffer) message).free();
879                }
880            } while (writtenBytes < maxWrittenBytes);
881        } catch (Exception e) {
882            if (req != null) {
883                req.getFuture().setException(e);
884            }
885
886            IoFilterChain filterChain = session.getFilterChain();
887            filterChain.fireExceptionCaught(e);
888            return false;
889        }
890
891        return true;
892    }
893
894    private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
895            throws Exception {
896        IoBuffer buf = (IoBuffer) req.getMessage();
897        int localWrittenBytes = 0;
898
899        if (buf.hasRemaining()) {
900            int length;
901
902            if (hasFragmentation) {
903                length = Math.min(buf.remaining(), maxLength);
904            } else {
905                length = buf.remaining();
906            }
907
908            try {
909                localWrittenBytes = write(session, buf, length);
910            } catch (IOException ioe) {
911                // We have had an issue while trying to send data to the
912                // peer : let's close the session.
913                buf.free();
914                session.close(true);
915                destroy(session);
916
917                return 0;
918            }
919
920        }
921
922        session.increaseWrittenBytes(localWrittenBytes, currentTime);
923
924        if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
925            // Buffer has been sent, clear the current request.
926            int pos = buf.position();
927            buf.reset();
928
929            fireMessageSent(session, req);
930
931            // And set it back to its position
932            buf.position(pos);
933        }
934
935        return localWrittenBytes;
936    }
937
938    private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
939            throws Exception {
940        int localWrittenBytes;
941        FileRegion region = (FileRegion) req.getMessage();
942
943        if (region.getRemainingBytes() > 0) {
944            int length;
945
946            if (hasFragmentation) {
947                length = (int) Math.min(region.getRemainingBytes(), maxLength);
948            } else {
949                length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
950            }
951
952            localWrittenBytes = transferFile(session, region, length);
953            region.update(localWrittenBytes);
954        } else {
955            localWrittenBytes = 0;
956        }
957
958        session.increaseWrittenBytes(localWrittenBytes, currentTime);
959
960        if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
961            fireMessageSent(session, req);
962        }
963
964        return localWrittenBytes;
965    }
966
967    private void fireMessageSent(S session, WriteRequest req) {
968        session.setCurrentWriteRequest(null);
969        IoFilterChain filterChain = session.getFilterChain();
970        filterChain.fireMessageSent(req);
971    }
972
973    /**
974     * Update the trafficControl for all the session.
975     */
976    private void updateTrafficMask() {
977        int queueSize = trafficControllingSessions.size();
978
979        while (queueSize > 0) {
980            S session = trafficControllingSessions.poll();
981
982            if (session == null) {
983                // We are done with this queue.
984                return;
985            }
986
987            SessionState state = getState(session);
988
989            switch (state) {
990            case OPENED:
991                updateTrafficControl(session);
992
993                break;
994
995            case CLOSING:
996                break;
997
998            case OPENING:
999                // Retry later if session is not yet fully initialized.
1000                // (In case that Session.suspend??() or session.resume??() is
1001                // called before addSession() is processed)
1002                // We just put back the session at the end of the queue.
1003                trafficControllingSessions.add(session);
1004                break;
1005
1006            default:
1007                throw new IllegalStateException(String.valueOf(state));
1008            }
1009
1010            // As we have handled one session, decrement the number of
1011            // remaining sessions. The OPENING session will be processed
1012            // with the next select(), as the queue size has been decreased,
1013            // even
1014            // if the session has been pushed at the end of the queue
1015            queueSize--;
1016        }
1017    }
1018
1019    /**
1020     * {@inheritDoc}
1021     */
1022    public void updateTrafficControl(S session) {
1023        //
1024        try {
1025            setInterestedInRead(session, !session.isReadSuspended());
1026        } catch (Exception e) {
1027            IoFilterChain filterChain = session.getFilterChain();
1028            filterChain.fireExceptionCaught(e);
1029        }
1030
1031        try {
1032            setInterestedInWrite(session,
1033                    !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1034        } catch (Exception e) {
1035            IoFilterChain filterChain = session.getFilterChain();
1036            filterChain.fireExceptionCaught(e);
1037        }
1038    }
1039
1040    /**
1041     * The main loop. This is the place in charge to poll the Selector, and to
1042     * process the active sessions. It's done in - handle the newly created
1043     * sessions -
1044     */
1045    private class Processor implements Runnable {
1046        public void run() {
1047            assert (processorRef.get() == this);
1048
1049            int nSessions = 0;
1050            lastIdleCheckTime = System.currentTimeMillis();
1051
1052            for (;;) {
1053                try {
1054                    // This select has a timeout so that we can manage
1055                    // idle session when we get out of the select every
1056                    // second. (note : this is a hack to avoid creating
1057                    // a dedicated thread).
1058                    long t0 = System.currentTimeMillis();
1059                    int selected = select(SELECT_TIMEOUT);
1060                    long t1 = System.currentTimeMillis();
1061                    long delta = (t1 - t0);
1062
1063                    if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
1064                        // Last chance : the select() may have been
1065                        // interrupted because we have had an closed channel.
1066                        if (isBrokenConnection()) {
1067                            LOG.warn("Broken connection");
1068                        } else {
1069                            LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1070                            // Ok, we are hit by the nasty epoll
1071                            // spinning.
1072                            // Basically, there is a race condition
1073                            // which causes a closing file descriptor not to be
1074                            // considered as available as a selected channel,
1075                            // but
1076                            // it stopped the select. The next time we will
1077                            // call select(), it will exit immediately for the
1078                            // same
1079                            // reason, and do so forever, consuming 100%
1080                            // CPU.
1081                            // We have to destroy the selector, and
1082                            // register all the socket on a new one.
1083                            registerNewSelector();
1084                        }
1085                    }
1086
1087                    // Manage newly created session first
1088                    nSessions += handleNewSessions();
1089
1090                    updateTrafficMask();
1091
1092                    // Now, if we have had some incoming or outgoing events,
1093                    // deal with them
1094                    if (selected > 0) {
1095                        // LOG.debug("Processing ..."); // This log hurts one of
1096                        // the MDCFilter test...
1097                        process();
1098                    }
1099
1100                    // Write the pending requests
1101                    long currentTime = System.currentTimeMillis();
1102                    flush(currentTime);
1103
1104                    // And manage removed sessions
1105                    nSessions -= removeSessions();
1106
1107                    // Last, not least, send Idle events to the idle sessions
1108                    notifyIdleSessions(currentTime);
1109
1110                    // Get a chance to exit the infinite loop if there are no
1111                    // more sessions on this Processor
1112                    if (nSessions == 0) {
1113                        processorRef.set(null);
1114
1115                        if (newSessions.isEmpty() && isSelectorEmpty()) {
1116                            // newSessions.add() precedes startupProcessor
1117                            assert (processorRef.get() != this);
1118                            break;
1119                        }
1120
1121                        assert (processorRef.get() != this);
1122
1123                        if (!processorRef.compareAndSet(null, this)) {
1124                            // startupProcessor won race, so must exit processor
1125                            assert (processorRef.get() != this);
1126                            break;
1127                        }
1128
1129                        assert (processorRef.get() == this);
1130                    }
1131
1132                    // Disconnect all sessions immediately if disposal has been
1133                    // requested so that we exit this loop eventually.
1134                    if (isDisposing()) {
1135                        boolean hasKeys = false;
1136                        
1137                        for (Iterator<S> i = allSessions(); i.hasNext();) {
1138                            IoSession session = i.next();
1139                            
1140                            if (session.isActive()) {
1141                                scheduleRemove((S)session);
1142                                hasKeys = true;
1143                            }
1144                        }
1145
1146                        if (hasKeys) {
1147                            wakeup();
1148                        }
1149                    }
1150                } catch (ClosedSelectorException cse) {
1151                    // If the selector has been closed, we can exit the loop
1152                    // But first, dump a stack trace
1153                    ExceptionMonitor.getInstance().exceptionCaught(cse);
1154                    break;
1155                } catch (Exception e) {
1156                    ExceptionMonitor.getInstance().exceptionCaught(e);
1157
1158                    try {
1159                        Thread.sleep(1000);
1160                    } catch (InterruptedException e1) {
1161                        ExceptionMonitor.getInstance().exceptionCaught(e1);
1162                    }
1163                }
1164            }
1165
1166            try {
1167                synchronized (disposalLock) {
1168                    if (disposing) {
1169                        doDispose();
1170                    }
1171                }
1172            } catch (Exception e) {
1173                ExceptionMonitor.getInstance().exceptionCaught(e);
1174            } finally {
1175                disposalFuture.setValue(true);
1176            }
1177        }
1178    }
1179}