001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.session;
021
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.IOException;
025import java.net.SocketAddress;
026import java.nio.channels.FileChannel;
027import java.util.Iterator;
028import java.util.Queue;
029import java.util.Set;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.mina.core.buffer.IoBuffer;
036import org.apache.mina.core.file.DefaultFileRegion;
037import org.apache.mina.core.file.FilenameFileRegion;
038import org.apache.mina.core.filterchain.IoFilterChain;
039import org.apache.mina.core.future.CloseFuture;
040import org.apache.mina.core.future.DefaultCloseFuture;
041import org.apache.mina.core.future.DefaultReadFuture;
042import org.apache.mina.core.future.DefaultWriteFuture;
043import org.apache.mina.core.future.IoFutureListener;
044import org.apache.mina.core.future.ReadFuture;
045import org.apache.mina.core.future.WriteFuture;
046import org.apache.mina.core.service.AbstractIoService;
047import org.apache.mina.core.service.IoAcceptor;
048import org.apache.mina.core.service.IoHandler;
049import org.apache.mina.core.service.IoProcessor;
050import org.apache.mina.core.service.IoService;
051import org.apache.mina.core.service.TransportMetadata;
052import org.apache.mina.core.write.DefaultWriteRequest;
053import org.apache.mina.core.write.WriteException;
054import org.apache.mina.core.write.WriteRequest;
055import org.apache.mina.core.write.WriteRequestQueue;
056import org.apache.mina.core.write.WriteTimeoutException;
057import org.apache.mina.core.write.WriteToClosedSessionException;
058import org.apache.mina.util.ExceptionMonitor;
059
060/**
061 * Base implementation of {@link IoSession}.
062 * 
063 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
064 */
065public abstract class AbstractIoSession implements IoSession {
066    /** The associated handler */
067    private final IoHandler handler;
068
069    /** The session config */
070    protected IoSessionConfig config;
071
072    /** The service which will manage this session */
073    private final IoService service;
074
075    private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
076            "readyReadFutures");
077
078    private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
079            "waitingReadFutures");
080
081    private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
082        public void operationComplete(CloseFuture future) {
083            AbstractIoSession session = (AbstractIoSession) future.getSession();
084            session.scheduledWriteBytes.set(0);
085            session.scheduledWriteMessages.set(0);
086            session.readBytesThroughput = 0;
087            session.readMessagesThroughput = 0;
088            session.writtenBytesThroughput = 0;
089            session.writtenMessagesThroughput = 0;
090        }
091    };
092
093    /**
094     * An internal write request object that triggers session close.
095     * 
096     * @see #writeRequestQueue
097     */
098    private static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
099
100    private final Object lock = new Object();
101
102    private IoSessionAttributeMap attributes;
103
104    private WriteRequestQueue writeRequestQueue;
105
106    private WriteRequest currentWriteRequest;
107
108    /** The Session creation's time */
109    private final long creationTime;
110
111    /** An id generator guaranteed to generate unique IDs for the session */
112    private static AtomicLong idGenerator = new AtomicLong(0);
113
114    /** The session ID */
115    private long sessionId;
116
117    /**
118     * A future that will be set 'closed' when the connection is closed.
119     */
120    private final CloseFuture closeFuture = new DefaultCloseFuture(this);
121
122    private volatile boolean closing;
123
124    // traffic control
125    private boolean readSuspended = false;
126
127    private boolean writeSuspended = false;
128
129    // Status variables
130    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
131
132    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
133
134    private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
135
136    private long readBytes;
137
138    private long writtenBytes;
139
140    private long readMessages;
141
142    private long writtenMessages;
143
144    private long lastReadTime;
145
146    private long lastWriteTime;
147
148    private long lastThroughputCalculationTime;
149
150    private long lastReadBytes;
151
152    private long lastWrittenBytes;
153
154    private long lastReadMessages;
155
156    private long lastWrittenMessages;
157
158    private double readBytesThroughput;
159
160    private double writtenBytesThroughput;
161
162    private double readMessagesThroughput;
163
164    private double writtenMessagesThroughput;
165
166    private AtomicInteger idleCountForBoth = new AtomicInteger();
167
168    private AtomicInteger idleCountForRead = new AtomicInteger();
169
170    private AtomicInteger idleCountForWrite = new AtomicInteger();
171
172    private long lastIdleTimeForBoth;
173
174    private long lastIdleTimeForRead;
175
176    private long lastIdleTimeForWrite;
177
178    private boolean deferDecreaseReadBuffer = true;
179
180    /**
181     * Create a Session for a service
182     * 
183     * @param service the Service for this session
184     */
185    protected AbstractIoSession(IoService service) {
186        this.service = service;
187        this.handler = service.getHandler();
188
189        // Initialize all the Session counters to the current time
190        long currentTime = System.currentTimeMillis();
191        creationTime = currentTime;
192        lastThroughputCalculationTime = currentTime;
193        lastReadTime = currentTime;
194        lastWriteTime = currentTime;
195        lastIdleTimeForBoth = currentTime;
196        lastIdleTimeForRead = currentTime;
197        lastIdleTimeForWrite = currentTime;
198
199        // TODO add documentation
200        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
201
202        // Set a new ID for this session
203        sessionId = idGenerator.incrementAndGet();
204    }
205
206    /**
207     * {@inheritDoc}
208     * 
209     * We use an AtomicLong to guarantee that the session ID are unique.
210     */
211    public final long getId() {
212        return sessionId;
213    }
214
215    /**
216     * @return The associated IoProcessor for this session
217     */
218    public abstract IoProcessor getProcessor();
219
220    /**
221     * {@inheritDoc}
222     */
223    public final boolean isConnected() {
224        return !closeFuture.isClosed();
225    }
226
227    /**
228     * {@inheritDoc}
229     */
230    public boolean isActive() {
231        // Return true by default
232        return true;
233    }
234
235    /**
236     * {@inheritDoc}
237     */
238    public final boolean isClosing() {
239        return closing || closeFuture.isClosed();
240    }
241
242    /**
243     * {@inheritDoc}
244     */
245    public boolean isSecured() {
246        // Always false...
247        return false;
248    }
249
250    /**
251     * {@inheritDoc}
252     */
253    public final CloseFuture getCloseFuture() {
254        return closeFuture;
255    }
256
257    /**
258     * Tells if the session is scheduled for flushed
259     * 
260     * @return true if the session is scheduled for flush
261     */
262    public final boolean isScheduledForFlush() {
263        return scheduledForFlush.get();
264    }
265
266    /**
267     * Schedule the session for flushed
268     */
269    public final void scheduledForFlush() {
270        scheduledForFlush.set(true);
271    }
272
273    /**
274     * Change the session's status : it's not anymore scheduled for flush
275     */
276    public final void unscheduledForFlush() {
277        scheduledForFlush.set(false);
278    }
279
280    /**
281     * Set the scheduledForFLush flag. As we may have concurrent access to this
282     * flag, we compare and set it in one call.
283     * 
284     * @param schedule
285     *            the new value to set if not already set.
286     * @return true if the session flag has been set, and if it wasn't set
287     *         already.
288     */
289    public final boolean setScheduledForFlush(boolean schedule) {
290        if (schedule) {
291            // If the current tag is set to false, switch it to true,
292            // otherwise, we do nothing but return false : the session
293            // is already scheduled for flush
294            return scheduledForFlush.compareAndSet(false, schedule);
295        }
296
297        scheduledForFlush.set(schedule);
298        return true;
299    }
300
301    /**
302     * {@inheritDoc}
303     */
304    public final CloseFuture close(boolean rightNow) {
305        if (rightNow) {
306            return closeNow();
307        } else {
308            return closeOnFlush();
309        }
310    }
311
312    /**
313     * {@inheritDoc}
314     */
315    public final CloseFuture close() {
316        try {
317            closeNow();
318        } finally {
319            return closeFuture;
320        }
321    }
322
323    /**
324     * {@inheritDoc}
325     */
326    public final CloseFuture closeOnFlush() {
327        if (!isClosing()) {
328            getWriteRequestQueue().offer(this, CLOSE_REQUEST);
329            getProcessor().flush(this);
330            return closeFuture;
331        } else {
332            return closeFuture;
333        }
334    }
335
336    /**
337     * {@inheritDoc}
338     */
339    public final CloseFuture closeNow() {
340        synchronized (lock) {
341            if (isClosing()) {
342                return closeFuture;
343            }
344
345            closing = true;
346        }
347
348        getFilterChain().fireFilterClose();
349
350        return closeFuture;
351    }
352
353    /**
354     * {@inheritDoc}
355     */
356    public IoHandler getHandler() {
357        return handler;
358    }
359
360    /**
361     * {@inheritDoc}
362     */
363    public IoSessionConfig getConfig() {
364        return config;
365    }
366
367    /**
368     * {@inheritDoc}
369     */
370    public final ReadFuture read() {
371        if (!getConfig().isUseReadOperation()) {
372            throw new IllegalStateException("useReadOperation is not enabled.");
373        }
374
375        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
376        ReadFuture future;
377        synchronized (readyReadFutures) {
378            future = readyReadFutures.poll();
379            if (future != null) {
380                if (future.isClosed()) {
381                    // Let other readers get notified.
382                    readyReadFutures.offer(future);
383                }
384            } else {
385                future = new DefaultReadFuture(this);
386                getWaitingReadFutures().offer(future);
387            }
388        }
389
390        return future;
391    }
392
393    /**
394     * Associates a message to a ReadFuture
395     * 
396     * @param message the message to associate to the ReadFuture
397     * 
398     */
399    public final void offerReadFuture(Object message) {
400        newReadFuture().setRead(message);
401    }
402
403    /**
404     * Associates a failure to a ReadFuture
405     * 
406     * @param exception the exception to associate to the ReadFuture
407     */
408    public final void offerFailedReadFuture(Throwable exception) {
409        newReadFuture().setException(exception);
410    }
411
412    /**
413     * Inform the ReadFuture that the session has been closed
414     */
415    public final void offerClosedReadFuture() {
416        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
417        
418        synchronized (readyReadFutures) {
419            newReadFuture().setClosed();
420        }
421    }
422
423    /**
424     * @return a readFuture get from the waiting ReadFuture
425     */
426    private ReadFuture newReadFuture() {
427        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
428        Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
429        ReadFuture future;
430        
431        synchronized (readyReadFutures) {
432            future = waitingReadFutures.poll();
433            
434            if (future == null) {
435                future = new DefaultReadFuture(this);
436                readyReadFutures.offer(future);
437            }
438        }
439        
440        return future;
441    }
442
443    /**
444     * @return a queue of ReadFuture
445     */
446    private Queue<ReadFuture> getReadyReadFutures() {
447        Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
448        
449        if (readyReadFutures == null) {
450            readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
451
452            Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
453                    readyReadFutures);
454            
455            if (oldReadyReadFutures != null) {
456                readyReadFutures = oldReadyReadFutures;
457            }
458        }
459        
460        return readyReadFutures;
461    }
462
463    /**
464     * @return the queue of waiting ReadFuture
465     */
466    private Queue<ReadFuture> getWaitingReadFutures() {
467        Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
468        
469        if (waitingReadyReadFutures == null) {
470            waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
471
472            Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
473                    WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
474            
475            if (oldWaitingReadyReadFutures != null) {
476                waitingReadyReadFutures = oldWaitingReadyReadFutures;
477            }
478        }
479        
480        return waitingReadyReadFutures;
481    }
482
483    /**
484     * {@inheritDoc}
485     */
486    public WriteFuture write(Object message) {
487        return write(message, null);
488    }
489
490    /**
491     * {@inheritDoc}
492     */
493    public WriteFuture write(Object message, SocketAddress remoteAddress) {
494        if (message == null) {
495            throw new IllegalArgumentException("Trying to write a null message : not allowed");
496        }
497
498        // We can't send a message to a connected session if we don't have
499        // the remote address
500        if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
501            throw new UnsupportedOperationException();
502        }
503
504        // If the session has been closed or is closing, we can't either
505        // send a message to the remote side. We generate a future
506        // containing an exception.
507        if (isClosing() || !isConnected()) {
508            WriteFuture future = new DefaultWriteFuture(this);
509            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
510            WriteException writeException = new WriteToClosedSessionException(request);
511            future.setException(writeException);
512            return future;
513        }
514
515        FileChannel openedFileChannel = null;
516
517        // TODO: remove this code as soon as we use InputStream
518        // instead of Object for the message.
519        try {
520            if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
521                // Nothing to write : probably an error in the user code
522                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
523            } else if (message instanceof FileChannel) {
524                FileChannel fileChannel = (FileChannel) message;
525                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
526            } else if (message instanceof File) {
527                File file = (File) message;
528                openedFileChannel = new FileInputStream(file).getChannel();
529                message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
530            }
531        } catch (IOException e) {
532            ExceptionMonitor.getInstance().exceptionCaught(e);
533            return DefaultWriteFuture.newNotWrittenFuture(this, e);
534        }
535
536        // Now, we can write the message. First, create a future
537        WriteFuture writeFuture = new DefaultWriteFuture(this);
538        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
539
540        // Then, get the chain and inject the WriteRequest into it
541        IoFilterChain filterChain = getFilterChain();
542        filterChain.fireFilterWrite(writeRequest);
543
544        // TODO : This is not our business ! The caller has created a
545        // FileChannel,
546        // he has to close it !
547        if (openedFileChannel != null) {
548            // If we opened a FileChannel, it needs to be closed when the write
549            // has completed
550            final FileChannel finalChannel = openedFileChannel;
551            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
552                public void operationComplete(WriteFuture future) {
553                    try {
554                        finalChannel.close();
555                    } catch (IOException e) {
556                        ExceptionMonitor.getInstance().exceptionCaught(e);
557                    }
558                }
559            });
560        }
561
562        // Return the WriteFuture.
563        return writeFuture;
564    }
565
566    /**
567     * {@inheritDoc}
568     */
569    public final Object getAttachment() {
570        return getAttribute("");
571    }
572
573    /**
574     * {@inheritDoc}
575     */
576    public final Object setAttachment(Object attachment) {
577        return setAttribute("", attachment);
578    }
579
580    /**
581     * {@inheritDoc}
582     */
583    public final Object getAttribute(Object key) {
584        return getAttribute(key, null);
585    }
586
587    /**
588     * {@inheritDoc}
589     */
590    public final Object getAttribute(Object key, Object defaultValue) {
591        return attributes.getAttribute(this, key, defaultValue);
592    }
593
594    /**
595     * {@inheritDoc}
596     */
597    public final Object setAttribute(Object key, Object value) {
598        return attributes.setAttribute(this, key, value);
599    }
600
601    /**
602     * {@inheritDoc}
603     */
604    public final Object setAttribute(Object key) {
605        return setAttribute(key, Boolean.TRUE);
606    }
607
608    /**
609     * {@inheritDoc}
610     */
611    public final Object setAttributeIfAbsent(Object key, Object value) {
612        return attributes.setAttributeIfAbsent(this, key, value);
613    }
614
615    /**
616     * {@inheritDoc}
617     */
618    public final Object setAttributeIfAbsent(Object key) {
619        return setAttributeIfAbsent(key, Boolean.TRUE);
620    }
621
622    /**
623     * {@inheritDoc}
624     */
625    public final Object removeAttribute(Object key) {
626        return attributes.removeAttribute(this, key);
627    }
628
629    /**
630     * {@inheritDoc}
631     */
632    public final boolean removeAttribute(Object key, Object value) {
633        return attributes.removeAttribute(this, key, value);
634    }
635
636    /**
637     * {@inheritDoc}
638     */
639    public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
640        return attributes.replaceAttribute(this, key, oldValue, newValue);
641    }
642
643    /**
644     * {@inheritDoc}
645     */
646    public final boolean containsAttribute(Object key) {
647        return attributes.containsAttribute(this, key);
648    }
649
650    /**
651     * {@inheritDoc}
652     */
653    public final Set<Object> getAttributeKeys() {
654        return attributes.getAttributeKeys(this);
655    }
656
657    /**
658     * @return The map of attributes associated with the session
659     */
660    public final IoSessionAttributeMap getAttributeMap() {
661        return attributes;
662    }
663
664    /**
665     * Set the map of attributes associated with the session
666     * 
667     * @param attributes The Map of attributes
668     */
669    public final void setAttributeMap(IoSessionAttributeMap attributes) {
670        this.attributes = attributes;
671    }
672
673    /**
674     * Create a new close aware write queue, based on the given write queue.
675     * 
676     * @param writeRequestQueue The write request queue
677     */
678    public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
679        this.writeRequestQueue = new CloseAwareWriteQueue(writeRequestQueue);
680    }
681
682    /**
683     * {@inheritDoc}
684     */
685    public final void suspendRead() {
686        readSuspended = true;
687        if (isClosing() || !isConnected()) {
688            return;
689        }
690        getProcessor().updateTrafficControl(this);
691    }
692
693    /**
694     * {@inheritDoc}
695     */
696    public final void suspendWrite() {
697        writeSuspended = true;
698        if (isClosing() || !isConnected()) {
699            return;
700        }
701        getProcessor().updateTrafficControl(this);
702    }
703
704    /**
705     * {@inheritDoc}
706     */
707    @SuppressWarnings("unchecked")
708    public final void resumeRead() {
709        readSuspended = false;
710        if (isClosing() || !isConnected()) {
711            return;
712        }
713        getProcessor().updateTrafficControl(this);
714    }
715
716    /**
717     * {@inheritDoc}
718     */
719    @SuppressWarnings("unchecked")
720    public final void resumeWrite() {
721        writeSuspended = false;
722        if (isClosing() || !isConnected()) {
723            return;
724        }
725        getProcessor().updateTrafficControl(this);
726    }
727
728    /**
729     * {@inheritDoc}
730     */
731    public boolean isReadSuspended() {
732        return readSuspended;
733    }
734
735    /**
736     * {@inheritDoc}
737     */
738    public boolean isWriteSuspended() {
739        return writeSuspended;
740    }
741
742    /**
743     * {@inheritDoc}
744     */
745    public final long getReadBytes() {
746        return readBytes;
747    }
748
749    /**
750     * {@inheritDoc}
751     */
752    public final long getWrittenBytes() {
753        return writtenBytes;
754    }
755
756    /**
757     * {@inheritDoc}
758     */
759    public final long getReadMessages() {
760        return readMessages;
761    }
762
763    /**
764     * {@inheritDoc}
765     */
766    public final long getWrittenMessages() {
767        return writtenMessages;
768    }
769
770    /**
771     * {@inheritDoc}
772     */
773    public final double getReadBytesThroughput() {
774        return readBytesThroughput;
775    }
776
777    /**
778     * {@inheritDoc}
779     */
780    public final double getWrittenBytesThroughput() {
781        return writtenBytesThroughput;
782    }
783
784    /**
785     * {@inheritDoc}
786     */
787    public final double getReadMessagesThroughput() {
788        return readMessagesThroughput;
789    }
790
791    /**
792     * {@inheritDoc}
793     */
794    public final double getWrittenMessagesThroughput() {
795        return writtenMessagesThroughput;
796    }
797
798    /**
799     * {@inheritDoc}
800     */
801    public final void updateThroughput(long currentTime, boolean force) {
802        int interval = (int) (currentTime - lastThroughputCalculationTime);
803
804        long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
805
806        if (((minInterval == 0) || (interval < minInterval)) && !force) {
807            return;
808        }
809
810        readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
811        writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
812        readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
813        writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
814
815        lastReadBytes = readBytes;
816        lastWrittenBytes = writtenBytes;
817        lastReadMessages = readMessages;
818        lastWrittenMessages = writtenMessages;
819
820        lastThroughputCalculationTime = currentTime;
821    }
822
823    /**
824     * {@inheritDoc}
825     */
826    public final long getScheduledWriteBytes() {
827        return scheduledWriteBytes.get();
828    }
829
830    /**
831     * {@inheritDoc}
832     */
833    public final int getScheduledWriteMessages() {
834        return scheduledWriteMessages.get();
835    }
836
837    /**
838     * Set the number of scheduled write bytes
839     * 
840     * @param byteCount The number of scheduled bytes for write
841     */
842    protected void setScheduledWriteBytes(int byteCount) {
843        scheduledWriteBytes.set(byteCount);
844    }
845
846    /**
847     * Set the number of scheduled write messages
848     * 
849     * @param messages The number of scheduled messages for write
850     */
851    protected void setScheduledWriteMessages(int messages) {
852        scheduledWriteMessages.set(messages);
853    }
854
855    /**
856     * Increase the number of read bytes
857     * 
858     * @param increment The number of read bytes
859     * @param currentTime The current time
860     */
861    public final void increaseReadBytes(long increment, long currentTime) {
862        if (increment <= 0) {
863            return;
864        }
865
866        readBytes += increment;
867        lastReadTime = currentTime;
868        idleCountForBoth.set(0);
869        idleCountForRead.set(0);
870
871        if (getService() instanceof AbstractIoService) {
872            ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
873        }
874    }
875
876    /**
877     * Increase the number of read messages
878     * 
879     * @param currentTime The current time
880     */
881    public final void increaseReadMessages(long currentTime) {
882        readMessages++;
883        lastReadTime = currentTime;
884        idleCountForBoth.set(0);
885        idleCountForRead.set(0);
886
887        if (getService() instanceof AbstractIoService) {
888            ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
889        }
890    }
891
892    /**
893     * Increase the number of written bytes
894     * 
895     * @param increment The number of written bytes
896     * @param currentTime The current time
897     */
898    public final void increaseWrittenBytes(int increment, long currentTime) {
899        if (increment <= 0) {
900            return;
901        }
902
903        writtenBytes += increment;
904        lastWriteTime = currentTime;
905        idleCountForBoth.set(0);
906        idleCountForWrite.set(0);
907
908        if (getService() instanceof AbstractIoService) {
909            ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
910        }
911
912        increaseScheduledWriteBytes(-increment);
913    }
914
915    /**
916     * Increase the number of written messages
917     * 
918     * @param request The written message
919     * @param currentTime The current tile
920     */
921    public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
922        Object message = request.getMessage();
923
924        if (message instanceof IoBuffer) {
925            IoBuffer b = (IoBuffer) message;
926
927            if (b.hasRemaining()) {
928                return;
929            }
930        }
931
932        writtenMessages++;
933        lastWriteTime = currentTime;
934
935        if (getService() instanceof AbstractIoService) {
936            ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
937        }
938
939        decreaseScheduledWriteMessages();
940    }
941
942    /**
943     * Increase the number of scheduled write bytes for the session
944     * 
945     * @param increment The number of newly added bytes to write
946     */
947    public final void increaseScheduledWriteBytes(int increment) {
948        scheduledWriteBytes.addAndGet(increment);
949        if (getService() instanceof AbstractIoService) {
950            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
951        }
952    }
953
954    /**
955     * Increase the number of scheduled message to write
956     */
957    public final void increaseScheduledWriteMessages() {
958        scheduledWriteMessages.incrementAndGet();
959        
960        if (getService() instanceof AbstractIoService) {
961            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
962        }
963    }
964
965    /**
966     * Decrease the number of scheduled message written
967     */
968    private void decreaseScheduledWriteMessages() {
969        scheduledWriteMessages.decrementAndGet();
970        if (getService() instanceof AbstractIoService) {
971            ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
972        }
973    }
974
975    /**
976     * Decrease the counters of written messages and written bytes when a message has been written
977     * 
978     * @param request The written message
979     */
980    public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
981        Object message = request.getMessage();
982        
983        if (message instanceof IoBuffer) {
984            IoBuffer b = (IoBuffer) message;
985            
986            if (b.hasRemaining()) {
987                increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
988            } else {
989                decreaseScheduledWriteMessages();
990            }
991        } else {
992            decreaseScheduledWriteMessages();
993        }
994    }
995
996    /**
997     * {@inheritDoc}
998     */
999    public final WriteRequestQueue getWriteRequestQueue() {
1000        if (writeRequestQueue == null) {
1001            throw new IllegalStateException();
1002        }
1003        
1004        return writeRequestQueue;
1005    }
1006
1007    /**
1008     * {@inheritDoc}
1009     */
1010    public final WriteRequest getCurrentWriteRequest() {
1011        return currentWriteRequest;
1012    }
1013
1014    /**
1015     * {@inheritDoc}
1016     */
1017    public final Object getCurrentWriteMessage() {
1018        WriteRequest req = getCurrentWriteRequest();
1019        
1020        if (req == null) {
1021            return null;
1022        }
1023        return req.getMessage();
1024    }
1025
1026    /**
1027     * {@inheritDoc}
1028     */
1029    public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1030        this.currentWriteRequest = currentWriteRequest;
1031    }
1032
1033    /**
1034     * Increase the ReadBuffer size (it will double)
1035     */
1036    public final void increaseReadBufferSize() {
1037        int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1038        if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1039            getConfig().setReadBufferSize(newReadBufferSize);
1040        } else {
1041            getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1042        }
1043
1044        deferDecreaseReadBuffer = true;
1045    }
1046
1047    /**
1048     * Decrease the ReadBuffer size (it will be divided by a factor 2)
1049     */
1050    public final void decreaseReadBufferSize() {
1051        if (deferDecreaseReadBuffer) {
1052            deferDecreaseReadBuffer = false;
1053            return;
1054        }
1055
1056        if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1057            getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1058        }
1059
1060        deferDecreaseReadBuffer = true;
1061    }
1062
1063    /**
1064     * {@inheritDoc}
1065     */
1066    public final long getCreationTime() {
1067        return creationTime;
1068    }
1069
1070    /**
1071     * {@inheritDoc}
1072     */
1073    public final long getLastIoTime() {
1074        return Math.max(lastReadTime, lastWriteTime);
1075    }
1076
1077    /**
1078     * {@inheritDoc}
1079     */
1080    public final long getLastReadTime() {
1081        return lastReadTime;
1082    }
1083
1084    /**
1085     * {@inheritDoc}
1086     */
1087    public final long getLastWriteTime() {
1088        return lastWriteTime;
1089    }
1090
1091    /**
1092     * {@inheritDoc}
1093     */
1094    public final boolean isIdle(IdleStatus status) {
1095        if (status == IdleStatus.BOTH_IDLE) {
1096            return idleCountForBoth.get() > 0;
1097        }
1098
1099        if (status == IdleStatus.READER_IDLE) {
1100            return idleCountForRead.get() > 0;
1101        }
1102
1103        if (status == IdleStatus.WRITER_IDLE) {
1104            return idleCountForWrite.get() > 0;
1105        }
1106
1107        throw new IllegalArgumentException("Unknown idle status: " + status);
1108    }
1109
1110    /**
1111     * {@inheritDoc}
1112     */
1113    public final boolean isBothIdle() {
1114        return isIdle(IdleStatus.BOTH_IDLE);
1115    }
1116
1117    /**
1118     * {@inheritDoc}
1119     */
1120    public final boolean isReaderIdle() {
1121        return isIdle(IdleStatus.READER_IDLE);
1122    }
1123
1124    /**
1125     * {@inheritDoc}
1126     */
1127    public final boolean isWriterIdle() {
1128        return isIdle(IdleStatus.WRITER_IDLE);
1129    }
1130
1131    /**
1132     * {@inheritDoc}
1133     */
1134    public final int getIdleCount(IdleStatus status) {
1135        if (getConfig().getIdleTime(status) == 0) {
1136            if (status == IdleStatus.BOTH_IDLE) {
1137                idleCountForBoth.set(0);
1138            }
1139
1140            if (status == IdleStatus.READER_IDLE) {
1141                idleCountForRead.set(0);
1142            }
1143
1144            if (status == IdleStatus.WRITER_IDLE) {
1145                idleCountForWrite.set(0);
1146            }
1147        }
1148
1149        if (status == IdleStatus.BOTH_IDLE) {
1150            return idleCountForBoth.get();
1151        }
1152
1153        if (status == IdleStatus.READER_IDLE) {
1154            return idleCountForRead.get();
1155        }
1156
1157        if (status == IdleStatus.WRITER_IDLE) {
1158            return idleCountForWrite.get();
1159        }
1160
1161        throw new IllegalArgumentException("Unknown idle status: " + status);
1162    }
1163
1164    /**
1165     * {@inheritDoc}
1166     */
1167    public final long getLastIdleTime(IdleStatus status) {
1168        if (status == IdleStatus.BOTH_IDLE) {
1169            return lastIdleTimeForBoth;
1170        }
1171
1172        if (status == IdleStatus.READER_IDLE) {
1173            return lastIdleTimeForRead;
1174        }
1175
1176        if (status == IdleStatus.WRITER_IDLE) {
1177            return lastIdleTimeForWrite;
1178        }
1179
1180        throw new IllegalArgumentException("Unknown idle status: " + status);
1181    }
1182
1183    /**
1184     * Increase the count of the various Idle counter
1185     * 
1186     * @param status The current status
1187     * @param currentTime The current time
1188     */
1189    public final void increaseIdleCount(IdleStatus status, long currentTime) {
1190        if (status == IdleStatus.BOTH_IDLE) {
1191            idleCountForBoth.incrementAndGet();
1192            lastIdleTimeForBoth = currentTime;
1193        } else if (status == IdleStatus.READER_IDLE) {
1194            idleCountForRead.incrementAndGet();
1195            lastIdleTimeForRead = currentTime;
1196        } else if (status == IdleStatus.WRITER_IDLE) {
1197            idleCountForWrite.incrementAndGet();
1198            lastIdleTimeForWrite = currentTime;
1199        } else {
1200            throw new IllegalArgumentException("Unknown idle status: " + status);
1201        }
1202    }
1203
1204    /**
1205     * {@inheritDoc}
1206     */
1207    public final int getBothIdleCount() {
1208        return getIdleCount(IdleStatus.BOTH_IDLE);
1209    }
1210
1211    /**
1212     * {@inheritDoc}
1213     */
1214    public final long getLastBothIdleTime() {
1215        return getLastIdleTime(IdleStatus.BOTH_IDLE);
1216    }
1217
1218    /**
1219     * {@inheritDoc}
1220     */
1221    public final long getLastReaderIdleTime() {
1222        return getLastIdleTime(IdleStatus.READER_IDLE);
1223    }
1224
1225    /**
1226     * {@inheritDoc}
1227     */
1228    public final long getLastWriterIdleTime() {
1229        return getLastIdleTime(IdleStatus.WRITER_IDLE);
1230    }
1231
1232    /**
1233     * {@inheritDoc}
1234     */
1235    public final int getReaderIdleCount() {
1236        return getIdleCount(IdleStatus.READER_IDLE);
1237    }
1238
1239    /**
1240     * {@inheritDoc}
1241     */
1242    public final int getWriterIdleCount() {
1243        return getIdleCount(IdleStatus.WRITER_IDLE);
1244    }
1245
1246    /**
1247     * {@inheritDoc}
1248     */
1249    public SocketAddress getServiceAddress() {
1250        IoService service = getService();
1251        if (service instanceof IoAcceptor) {
1252            return ((IoAcceptor) service).getLocalAddress();
1253        }
1254
1255        return getRemoteAddress();
1256    }
1257
1258    /**
1259     * {@inheritDoc}
1260     */
1261    @Override
1262    public final int hashCode() {
1263        return super.hashCode();
1264    }
1265
1266    /**
1267     * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1268     * replaced.
1269     */
1270    @Override
1271    public final boolean equals(Object o) {
1272        return super.equals(o);
1273    }
1274
1275    /**
1276     * {@inheritDoc}
1277     */
1278    @Override
1279    public String toString() {
1280        if (isConnected() || isClosing()) {
1281            String remote = null;
1282            String local = null;
1283
1284            try {
1285                remote = String.valueOf(getRemoteAddress());
1286            } catch (Exception e) {
1287                remote = "Cannot get the remote address informations: " + e.getMessage();
1288            }
1289
1290            try {
1291                local = String.valueOf(getLocalAddress());
1292            } catch (Exception e) {
1293            }
1294
1295            if (getService() instanceof IoAcceptor) {
1296                return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1297            }
1298
1299            return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1300        }
1301
1302        return "(" + getIdAsString() + ") Session disconnected ...";
1303    }
1304
1305    /**
1306     * Get the Id as a String
1307     */
1308    private String getIdAsString() {
1309        String id = Long.toHexString(getId()).toUpperCase();
1310        
1311        if (id.length() <= 8) {
1312            return "0x00000000".substring(0, 10 - id.length()) + id;
1313        } else {
1314            return "0x" + id;
1315        }
1316    }
1317
1318    /**
1319     * TGet the Service name
1320     */
1321    private String getServiceName() {
1322        TransportMetadata tm = getTransportMetadata();
1323        if (tm == null) {
1324            return "null";
1325        }
1326
1327        return tm.getProviderName() + ' ' + tm.getName();
1328    }
1329
1330    /**
1331     * {@inheritDoc}
1332     */
1333    public IoService getService() {
1334        return service;
1335    }
1336
1337    /**
1338     * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1339     * in the specified collection.
1340     * 
1341     * @param sessions The sessions that are notified
1342     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1343     */
1344    public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1345        while (sessions.hasNext()) {
1346            IoSession session = sessions.next();
1347            
1348            if (!session.getCloseFuture().isClosed()) {
1349                notifyIdleSession(session, currentTime);
1350            }
1351        }
1352    }
1353
1354    /**
1355     * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1356     * specified {@code session}.
1357     * 
1358     * @param session The session that is notified
1359     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1360     */
1361    public static void notifyIdleSession(IoSession session, long currentTime) {
1362        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1363                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1364
1365        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1366                IdleStatus.READER_IDLE,
1367                Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1368
1369        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1370                IdleStatus.WRITER_IDLE,
1371                Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1372
1373        notifyWriteTimeout(session, currentTime);
1374    }
1375
1376    private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1377            long lastIoTime) {
1378        if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1379            session.getFilterChain().fireSessionIdle(status);
1380        }
1381    }
1382
1383    private static void notifyWriteTimeout(IoSession session, long currentTime) {
1384
1385        long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1386        if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1387                && !session.getWriteRequestQueue().isEmpty(session)) {
1388            WriteRequest request = session.getCurrentWriteRequest();
1389            if (request != null) {
1390                session.setCurrentWriteRequest(null);
1391                WriteTimeoutException cause = new WriteTimeoutException(request);
1392                request.getFuture().setException(cause);
1393                session.getFilterChain().fireExceptionCaught(cause);
1394                // WriteException is an IOException, so we close the session.
1395                session.close(true);
1396            }
1397        }
1398    }
1399
1400    /**
1401     * A queue which handles the CLOSE request.
1402     * 
1403     * TODO : Check that when closing a session, all the pending requests are
1404     * correctly sent.
1405     */
1406    private class CloseAwareWriteQueue implements WriteRequestQueue {
1407
1408        private final WriteRequestQueue queue;
1409
1410        /**
1411         * {@inheritDoc}
1412         */
1413        public CloseAwareWriteQueue(WriteRequestQueue queue) {
1414            this.queue = queue;
1415        }
1416
1417        /**
1418         * {@inheritDoc}
1419         */
1420        public synchronized WriteRequest poll(IoSession session) {
1421            WriteRequest answer = queue.poll(session);
1422
1423            if (answer == CLOSE_REQUEST) {
1424                AbstractIoSession.this.close( true );
1425                dispose(session);
1426                answer = null;
1427            }
1428
1429            return answer;
1430        }
1431
1432        /**
1433         * {@inheritDoc}
1434         */
1435        public void offer(IoSession session, WriteRequest e) {
1436            queue.offer(session, e);
1437        }
1438
1439        /**
1440         * {@inheritDoc}
1441         */
1442        public boolean isEmpty(IoSession session) {
1443            return queue.isEmpty(session);
1444        }
1445
1446        /**
1447         * {@inheritDoc}
1448         */
1449        public void clear(IoSession session) {
1450            queue.clear(session);
1451        }
1452
1453        /**
1454         * {@inheritDoc}
1455         */
1456        public void dispose(IoSession session) {
1457            queue.dispose(session);
1458        }
1459
1460        /**
1461         * {@inheritDoc}
1462         */
1463        public int size() {
1464            return queue.size();
1465        }
1466    }
1467}