001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.session;
021
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.IOException;
025import java.net.SocketAddress;
026import java.nio.channels.FileChannel;
027import java.util.Iterator;
028import java.util.Queue;
029import java.util.Set;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.mina.core.buffer.IoBuffer;
036import org.apache.mina.core.file.DefaultFileRegion;
037import org.apache.mina.core.file.FilenameFileRegion;
038import org.apache.mina.core.filterchain.IoFilterChain;
039import org.apache.mina.core.future.CloseFuture;
040import org.apache.mina.core.future.DefaultCloseFuture;
041import org.apache.mina.core.future.DefaultReadFuture;
042import org.apache.mina.core.future.DefaultWriteFuture;
043import org.apache.mina.core.future.IoFutureListener;
044import org.apache.mina.core.future.ReadFuture;
045import org.apache.mina.core.future.WriteFuture;
046import org.apache.mina.core.service.AbstractIoService;
047import org.apache.mina.core.service.IoAcceptor;
048import org.apache.mina.core.service.IoHandler;
049import org.apache.mina.core.service.IoProcessor;
050import org.apache.mina.core.service.IoService;
051import org.apache.mina.core.service.TransportMetadata;
052import org.apache.mina.core.write.DefaultWriteRequest;
053import org.apache.mina.core.write.WriteException;
054import org.apache.mina.core.write.WriteRequest;
055import org.apache.mina.core.write.WriteRequestQueue;
056import org.apache.mina.core.write.WriteTimeoutException;
057import org.apache.mina.core.write.WriteToClosedSessionException;
058import org.apache.mina.util.ExceptionMonitor;
059
060/**
061 * Base implementation of {@link IoSession}.
062 * 
063 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
064 */
065public abstract class AbstractIoSession implements IoSession {
066    /** The associated handler */
067    private final IoHandler handler;
068
069    /** The session config */
070    protected IoSessionConfig config;
071
072    /** The service which will manage this session */
073    private final IoService service;
074
075    private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
076            "readyReadFutures");
077
078    private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
079            "waitingReadFutures");
080
081    private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
082        public void operationComplete(CloseFuture future) {
083            AbstractIoSession session = (AbstractIoSession) future.getSession();
084            session.scheduledWriteBytes.set(0);
085            session.scheduledWriteMessages.set(0);
086            session.readBytesThroughput = 0;
087            session.readMessagesThroughput = 0;
088            session.writtenBytesThroughput = 0;
089            session.writtenMessagesThroughput = 0;
090        }
091    };
092
093    /**
094     * An internal write request object that triggers session close.
095     * 
096     * @see #writeRequestQueue
097     */
098    public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
099
100    /**
101     * An internal write request object that triggers message sent events.
102     * 
103     * @see #writeRequestQueue
104     */
105    public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
106
107    private final Object lock = new Object();
108
109    private IoSessionAttributeMap attributes;
110
111    private WriteRequestQueue writeRequestQueue;
112
113    private WriteRequest currentWriteRequest;
114
115    /** The Session creation's time */
116    private final long creationTime;
117
118    /** An id generator guaranteed to generate unique IDs for the session */
119    private static AtomicLong idGenerator = new AtomicLong(0);
120
121    /** The session ID */
122    private long sessionId;
123
124    /**
125     * A future that will be set 'closed' when the connection is closed.
126     */
127    private final CloseFuture closeFuture = new DefaultCloseFuture(this);
128
129    private volatile boolean closing;
130
131    // traffic control
132    private boolean readSuspended = false;
133
134    private boolean writeSuspended = false;
135
136    // Status variables
137    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
138
139    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
140
141    private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
142
143    private long readBytes;
144
145    private long writtenBytes;
146
147    private long readMessages;
148
149    private long writtenMessages;
150
151    private long lastReadTime;
152
153    private long lastWriteTime;
154
155    private long lastThroughputCalculationTime;
156
157    private long lastReadBytes;
158
159    private long lastWrittenBytes;
160
161    private long lastReadMessages;
162
163    private long lastWrittenMessages;
164
165    private double readBytesThroughput;
166
167    private double writtenBytesThroughput;
168
169    private double readMessagesThroughput;
170
171    private double writtenMessagesThroughput;
172
173    private AtomicInteger idleCountForBoth = new AtomicInteger();
174
175    private AtomicInteger idleCountForRead = new AtomicInteger();
176
177    private AtomicInteger idleCountForWrite = new AtomicInteger();
178
179    private long lastIdleTimeForBoth;
180
181    private long lastIdleTimeForRead;
182
183    private long lastIdleTimeForWrite;
184
185    private boolean deferDecreaseReadBuffer = true;
186
187    /**
188     * Create a Session for a service
189     * 
190     * @param service the Service for this session
191     */
192    protected AbstractIoSession(IoService service) {
193        this.service = service;
194        this.handler = service.getHandler();
195
196        // Initialize all the Session counters to the current time
197        long currentTime = System.currentTimeMillis();
198        creationTime = currentTime;
199        lastThroughputCalculationTime = currentTime;
200        lastReadTime = currentTime;
201        lastWriteTime = currentTime;
202        lastIdleTimeForBoth = currentTime;
203        lastIdleTimeForRead = currentTime;
204        lastIdleTimeForWrite = currentTime;
205
206        // TODO add documentation
207        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
208
209        // Set a new ID for this session
210        sessionId = idGenerator.incrementAndGet();
211    }
212
213    /**
214     * {@inheritDoc}
215     * 
216     * We use an AtomicLong to guarantee that the session ID are unique.
217     */
218    public final long getId() {
219        return sessionId;
220    }
221
222    /**
223     * @return The associated IoProcessor for this session
224     */
225    public abstract IoProcessor getProcessor();
226
227    /**
228     * {@inheritDoc}
229     */
230    public final boolean isConnected() {
231        return !closeFuture.isClosed();
232    }
233
234    /**
235     * {@inheritDoc}
236     */
237    public boolean isActive() {
238        // Return true by default
239        return true;
240    }
241
242    /**
243     * {@inheritDoc}
244     */
245    public final boolean isClosing() {
246        return closing || closeFuture.isClosed();
247    }
248
249    /**
250     * {@inheritDoc}
251     */
252    public boolean isSecured() {
253        // Always false...
254        return false;
255    }
256
257    /**
258     * {@inheritDoc}
259     */
260    public final CloseFuture getCloseFuture() {
261        return closeFuture;
262    }
263
264    /**
265     * Tells if the session is scheduled for flushed
266     * 
267     * @return true if the session is scheduled for flush
268     */
269    public final boolean isScheduledForFlush() {
270        return scheduledForFlush.get();
271    }
272
273    /**
274     * Schedule the session for flushed
275     */
276    public final void scheduledForFlush() {
277        scheduledForFlush.set(true);
278    }
279
280    /**
281     * Change the session's status : it's not anymore scheduled for flush
282     */
283    public final void unscheduledForFlush() {
284        scheduledForFlush.set(false);
285    }
286
287    /**
288     * Set the scheduledForFLush flag. As we may have concurrent access to this
289     * flag, we compare and set it in one call.
290     * 
291     * @param schedule
292     *            the new value to set if not already set.
293     * @return true if the session flag has been set, and if it wasn't set
294     *         already.
295     */
296    public final boolean setScheduledForFlush(boolean schedule) {
297        if (schedule) {
298            // If the current tag is set to false, switch it to true,
299            // otherwise, we do nothing but return false : the session
300            // is already scheduled for flush
301            return scheduledForFlush.compareAndSet(false, schedule);
302        }
303
304        scheduledForFlush.set(schedule);
305        return true;
306    }
307
308    /**
309     * {@inheritDoc}
310     */
311    public final CloseFuture close(boolean rightNow) {
312        if (rightNow) {
313            return closeNow();
314        } else {
315            return closeOnFlush();
316        }
317    }
318
319    /**
320     * {@inheritDoc}
321     */
322    public final CloseFuture close() {
323        return closeNow();
324    }
325
326    /**
327     * {@inheritDoc}
328     */
329    public final CloseFuture closeOnFlush() {
330        if (!isClosing()) {
331            getWriteRequestQueue().offer(this, CLOSE_REQUEST);
332            getProcessor().flush(this);
333        }
334        
335        return closeFuture;
336    }
337
338    /**
339     * {@inheritDoc}
340     */
341    public final CloseFuture closeNow() {
342        synchronized (lock) {
343            if (isClosing()) {
344                return closeFuture;
345            }
346
347            closing = true;
348            
349            try {
350                destroy();
351            } catch (Exception e) {
352                IoFilterChain filterChain = getFilterChain();
353                filterChain.fireExceptionCaught(e);
354            }
355        }
356
357        getFilterChain().fireFilterClose();
358
359        return closeFuture;
360    }
361    
362    /**
363     * Destroy the session
364     */
365    protected void destroy() {
366        if (writeRequestQueue != null) {
367            while (!writeRequestQueue.isEmpty(this)) {
368                WriteRequest writeRequest = writeRequestQueue.poll(this);
369                writeRequest.getFuture().setWritten();
370            }
371        }
372    }
373
374    /**
375     * {@inheritDoc}
376     */
377    public IoHandler getHandler() {
378        return handler;
379    }
380
381    /**
382     * {@inheritDoc}
383     */
384    public IoSessionConfig getConfig() {
385        return config;
386    }
387
388    /**
389     * {@inheritDoc}
390     */
391    public final ReadFuture read() {
392        if (!getConfig().isUseReadOperation()) {
393            throw new IllegalStateException("useReadOperation is not enabled.");
394        }
395
396        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
397        ReadFuture future;
398        synchronized (readyReadFutures) {
399            future = readyReadFutures.poll();
400            if (future != null) {
401                if (future.isClosed()) {
402                    // Let other readers get notified.
403                    readyReadFutures.offer(future);
404                }
405            } else {
406                future = new DefaultReadFuture(this);
407                getWaitingReadFutures().offer(future);
408            }
409        }
410
411        return future;
412    }
413
414    /**
415     * Associates a message to a ReadFuture
416     * 
417     * @param message the message to associate to the ReadFuture
418     * 
419     */
420    public final void offerReadFuture(Object message) {
421        newReadFuture().setRead(message);
422    }
423
424    /**
425     * Associates a failure to a ReadFuture
426     * 
427     * @param exception the exception to associate to the ReadFuture
428     */
429    public final void offerFailedReadFuture(Throwable exception) {
430        newReadFuture().setException(exception);
431    }
432
433    /**
434     * Inform the ReadFuture that the session has been closed
435     */
436    public final void offerClosedReadFuture() {
437        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
438        
439        synchronized (readyReadFutures) {
440            newReadFuture().setClosed();
441        }
442    }
443
444    /**
445     * @return a readFuture get from the waiting ReadFuture
446     */
447    private ReadFuture newReadFuture() {
448        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
449        Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
450        ReadFuture future;
451        
452        synchronized (readyReadFutures) {
453            future = waitingReadFutures.poll();
454            
455            if (future == null) {
456                future = new DefaultReadFuture(this);
457                readyReadFutures.offer(future);
458            }
459        }
460        
461        return future;
462    }
463
464    /**
465     * @return a queue of ReadFuture
466     */
467    private Queue<ReadFuture> getReadyReadFutures() {
468        Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
469        
470        if (readyReadFutures == null) {
471            readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
472
473            Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
474                    readyReadFutures);
475            
476            if (oldReadyReadFutures != null) {
477                readyReadFutures = oldReadyReadFutures;
478            }
479        }
480        
481        return readyReadFutures;
482    }
483
484    /**
485     * @return the queue of waiting ReadFuture
486     */
487    private Queue<ReadFuture> getWaitingReadFutures() {
488        Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
489        
490        if (waitingReadyReadFutures == null) {
491            waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
492
493            Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
494                    WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
495            
496            if (oldWaitingReadyReadFutures != null) {
497                waitingReadyReadFutures = oldWaitingReadyReadFutures;
498            }
499        }
500        
501        return waitingReadyReadFutures;
502    }
503
504    /**
505     * {@inheritDoc}
506     */
507    public WriteFuture write(Object message) {
508        return write(message, null);
509    }
510
511    /**
512     * {@inheritDoc}
513     */
514    public WriteFuture write(Object message, SocketAddress remoteAddress) {
515        if (message == null) {
516            throw new IllegalArgumentException("Trying to write a null message : not allowed");
517        }
518
519        // We can't send a message to a connected session if we don't have
520        // the remote address
521        if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
522            throw new UnsupportedOperationException();
523        }
524
525        // If the session has been closed or is closing, we can't either
526        // send a message to the remote side. We generate a future
527        // containing an exception.
528        if (isClosing() || !isConnected()) {
529            WriteFuture future = new DefaultWriteFuture(this);
530            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
531            WriteException writeException = new WriteToClosedSessionException(request);
532            future.setException(writeException);
533            return future;
534        }
535
536        FileChannel openedFileChannel = null;
537
538        // TODO: remove this code as soon as we use InputStream
539        // instead of Object for the message.
540        try {
541            if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
542                // Nothing to write : probably an error in the user code
543                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
544            } else if (message instanceof FileChannel) {
545                FileChannel fileChannel = (FileChannel) message;
546                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
547            } else if (message instanceof File) {
548                File file = (File) message;
549                openedFileChannel = new FileInputStream(file).getChannel();
550                message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
551            }
552        } catch (IOException e) {
553            ExceptionMonitor.getInstance().exceptionCaught(e);
554            return DefaultWriteFuture.newNotWrittenFuture(this, e);
555        }
556
557        // Now, we can write the message. First, create a future
558        WriteFuture writeFuture = new DefaultWriteFuture(this);
559        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
560
561        // Then, get the chain and inject the WriteRequest into it
562        IoFilterChain filterChain = getFilterChain();
563        filterChain.fireFilterWrite(writeRequest);
564
565        // TODO : This is not our business ! The caller has created a
566        // FileChannel,
567        // he has to close it !
568        if (openedFileChannel != null) {
569            // If we opened a FileChannel, it needs to be closed when the write
570            // has completed
571            final FileChannel finalChannel = openedFileChannel;
572            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
573                public void operationComplete(WriteFuture future) {
574                    try {
575                        finalChannel.close();
576                    } catch (IOException e) {
577                        ExceptionMonitor.getInstance().exceptionCaught(e);
578                    }
579                }
580            });
581        }
582
583        // Return the WriteFuture.
584        return writeFuture;
585    }
586
587    /**
588     * {@inheritDoc}
589     */
590    public final Object getAttachment() {
591        return getAttribute("");
592    }
593
594    /**
595     * {@inheritDoc}
596     */
597    public final Object setAttachment(Object attachment) {
598        return setAttribute("", attachment);
599    }
600
601    /**
602     * {@inheritDoc}
603     */
604    public final Object getAttribute(Object key) {
605        return getAttribute(key, null);
606    }
607
608    /**
609     * {@inheritDoc}
610     */
611    public final Object getAttribute(Object key, Object defaultValue) {
612        return attributes.getAttribute(this, key, defaultValue);
613    }
614
615    /**
616     * {@inheritDoc}
617     */
618    public final Object setAttribute(Object key, Object value) {
619        return attributes.setAttribute(this, key, value);
620    }
621
622    /**
623     * {@inheritDoc}
624     */
625    public final Object setAttribute(Object key) {
626        return setAttribute(key, Boolean.TRUE);
627    }
628
629    /**
630     * {@inheritDoc}
631     */
632    public final Object setAttributeIfAbsent(Object key, Object value) {
633        return attributes.setAttributeIfAbsent(this, key, value);
634    }
635
636    /**
637     * {@inheritDoc}
638     */
639    public final Object setAttributeIfAbsent(Object key) {
640        return setAttributeIfAbsent(key, Boolean.TRUE);
641    }
642
643    /**
644     * {@inheritDoc}
645     */
646    public final Object removeAttribute(Object key) {
647        return attributes.removeAttribute(this, key);
648    }
649
650    /**
651     * {@inheritDoc}
652     */
653    public final boolean removeAttribute(Object key, Object value) {
654        return attributes.removeAttribute(this, key, value);
655    }
656
657    /**
658     * {@inheritDoc}
659     */
660    public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
661        return attributes.replaceAttribute(this, key, oldValue, newValue);
662    }
663
664    /**
665     * {@inheritDoc}
666     */
667    public final boolean containsAttribute(Object key) {
668        return attributes.containsAttribute(this, key);
669    }
670
671    /**
672     * {@inheritDoc}
673     */
674    public final Set<Object> getAttributeKeys() {
675        return attributes.getAttributeKeys(this);
676    }
677
678    /**
679     * @return The map of attributes associated with the session
680     */
681    public final IoSessionAttributeMap getAttributeMap() {
682        return attributes;
683    }
684
685    /**
686     * Set the map of attributes associated with the session
687     * 
688     * @param attributes The Map of attributes
689     */
690    public final void setAttributeMap(IoSessionAttributeMap attributes) {
691        this.attributes = attributes;
692    }
693
694    /**
695     * Create a new close aware write queue, based on the given write queue.
696     * 
697     * @param writeRequestQueue The write request queue
698     */
699    public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
700        this.writeRequestQueue = writeRequestQueue;
701    }
702
703    /**
704     * {@inheritDoc}
705     */
706    public final void suspendRead() {
707        readSuspended = true;
708        if (isClosing() || !isConnected()) {
709            return;
710        }
711        getProcessor().updateTrafficControl(this);
712    }
713
714    /**
715     * {@inheritDoc}
716     */
717    public final void suspendWrite() {
718        writeSuspended = true;
719        if (isClosing() || !isConnected()) {
720            return;
721        }
722        getProcessor().updateTrafficControl(this);
723    }
724
725    /**
726     * {@inheritDoc}
727     */
728    @SuppressWarnings("unchecked")
729    public final void resumeRead() {
730        readSuspended = false;
731        if (isClosing() || !isConnected()) {
732            return;
733        }
734        getProcessor().updateTrafficControl(this);
735    }
736
737    /**
738     * {@inheritDoc}
739     */
740    @SuppressWarnings("unchecked")
741    public final void resumeWrite() {
742        writeSuspended = false;
743        if (isClosing() || !isConnected()) {
744            return;
745        }
746        getProcessor().updateTrafficControl(this);
747    }
748
749    /**
750     * {@inheritDoc}
751     */
752    public boolean isReadSuspended() {
753        return readSuspended;
754    }
755
756    /**
757     * {@inheritDoc}
758     */
759    public boolean isWriteSuspended() {
760        return writeSuspended;
761    }
762
763    /**
764     * {@inheritDoc}
765     */
766    public final long getReadBytes() {
767        return readBytes;
768    }
769
770    /**
771     * {@inheritDoc}
772     */
773    public final long getWrittenBytes() {
774        return writtenBytes;
775    }
776
777    /**
778     * {@inheritDoc}
779     */
780    public final long getReadMessages() {
781        return readMessages;
782    }
783
784    /**
785     * {@inheritDoc}
786     */
787    public final long getWrittenMessages() {
788        return writtenMessages;
789    }
790
791    /**
792     * {@inheritDoc}
793     */
794    public final double getReadBytesThroughput() {
795        return readBytesThroughput;
796    }
797
798    /**
799     * {@inheritDoc}
800     */
801    public final double getWrittenBytesThroughput() {
802        return writtenBytesThroughput;
803    }
804
805    /**
806     * {@inheritDoc}
807     */
808    public final double getReadMessagesThroughput() {
809        return readMessagesThroughput;
810    }
811
812    /**
813     * {@inheritDoc}
814     */
815    public final double getWrittenMessagesThroughput() {
816        return writtenMessagesThroughput;
817    }
818
819    /**
820     * {@inheritDoc}
821     */
822    public final void updateThroughput(long currentTime, boolean force) {
823        int interval = (int) (currentTime - lastThroughputCalculationTime);
824
825        long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
826
827        if (((minInterval == 0) || (interval < minInterval)) && !force) {
828            return;
829        }
830
831        readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
832        writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
833        readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
834        writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
835
836        lastReadBytes = readBytes;
837        lastWrittenBytes = writtenBytes;
838        lastReadMessages = readMessages;
839        lastWrittenMessages = writtenMessages;
840
841        lastThroughputCalculationTime = currentTime;
842    }
843
844    /**
845     * {@inheritDoc}
846     */
847    public final long getScheduledWriteBytes() {
848        return scheduledWriteBytes.get();
849    }
850
851    /**
852     * {@inheritDoc}
853     */
854    public final int getScheduledWriteMessages() {
855        return scheduledWriteMessages.get();
856    }
857
858    /**
859     * Set the number of scheduled write bytes
860     * 
861     * @param byteCount The number of scheduled bytes for write
862     */
863    protected void setScheduledWriteBytes(int byteCount) {
864        scheduledWriteBytes.set(byteCount);
865    }
866
867    /**
868     * Set the number of scheduled write messages
869     * 
870     * @param messages The number of scheduled messages for write
871     */
872    protected void setScheduledWriteMessages(int messages) {
873        scheduledWriteMessages.set(messages);
874    }
875
876    /**
877     * Increase the number of read bytes
878     * 
879     * @param increment The number of read bytes
880     * @param currentTime The current time
881     */
882    public final void increaseReadBytes(long increment, long currentTime) {
883        if (increment <= 0) {
884            return;
885        }
886
887        readBytes += increment;
888        lastReadTime = currentTime;
889        idleCountForBoth.set(0);
890        idleCountForRead.set(0);
891
892        if (getService() instanceof AbstractIoService) {
893            ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
894        }
895    }
896
897    /**
898     * Increase the number of read messages
899     * 
900     * @param currentTime The current time
901     */
902    public final void increaseReadMessages(long currentTime) {
903        readMessages++;
904        lastReadTime = currentTime;
905        idleCountForBoth.set(0);
906        idleCountForRead.set(0);
907
908        if (getService() instanceof AbstractIoService) {
909            ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
910        }
911    }
912
913    /**
914     * Increase the number of written bytes
915     * 
916     * @param increment The number of written bytes
917     * @param currentTime The current time
918     */
919    public final void increaseWrittenBytes(int increment, long currentTime) {
920        if (increment <= 0) {
921            return;
922        }
923
924        writtenBytes += increment;
925        lastWriteTime = currentTime;
926        idleCountForBoth.set(0);
927        idleCountForWrite.set(0);
928
929        if (getService() instanceof AbstractIoService) {
930            ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
931        }
932
933        increaseScheduledWriteBytes(-increment);
934    }
935
936    /**
937     * Increase the number of written messages
938     * 
939     * @param request The written message
940     * @param currentTime The current tile
941     */
942    public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
943        Object message = request.getMessage();
944
945        if (message instanceof IoBuffer) {
946            IoBuffer b = (IoBuffer) message;
947
948            if (b.hasRemaining()) {
949                return;
950            }
951        }
952
953        writtenMessages++;
954        lastWriteTime = currentTime;
955
956        if (getService() instanceof AbstractIoService) {
957            ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
958        }
959
960        decreaseScheduledWriteMessages();
961    }
962
963    /**
964     * Increase the number of scheduled write bytes for the session
965     * 
966     * @param increment The number of newly added bytes to write
967     */
968    public final void increaseScheduledWriteBytes(int increment) {
969        scheduledWriteBytes.addAndGet(increment);
970        if (getService() instanceof AbstractIoService) {
971            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
972        }
973    }
974
975    /**
976     * Increase the number of scheduled message to write
977     */
978    public final void increaseScheduledWriteMessages() {
979        scheduledWriteMessages.incrementAndGet();
980        
981        if (getService() instanceof AbstractIoService) {
982            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
983        }
984    }
985
986    /**
987     * Decrease the number of scheduled message written
988     */
989    private void decreaseScheduledWriteMessages() {
990        scheduledWriteMessages.decrementAndGet();
991        if (getService() instanceof AbstractIoService) {
992            ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
993        }
994    }
995
996    /**
997     * Decrease the counters of written messages and written bytes when a message has been written
998     * 
999     * @param request The written message
1000     */
1001    public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
1002        Object message = request.getMessage();
1003        
1004        if (message instanceof IoBuffer) {
1005            IoBuffer b = (IoBuffer) message;
1006            
1007            if (b.hasRemaining()) {
1008                increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
1009            } else {
1010                decreaseScheduledWriteMessages();
1011            }
1012        } else {
1013            decreaseScheduledWriteMessages();
1014        }
1015    }
1016
1017    /**
1018     * {@inheritDoc}
1019     */
1020    public final WriteRequestQueue getWriteRequestQueue() {
1021        if (writeRequestQueue == null) {
1022            throw new IllegalStateException();
1023        }
1024        
1025        return writeRequestQueue;
1026    }
1027
1028    /**
1029     * {@inheritDoc}
1030     */
1031    public final WriteRequest getCurrentWriteRequest() {
1032        return currentWriteRequest;
1033    }
1034
1035    /**
1036     * {@inheritDoc}
1037     */
1038    public final Object getCurrentWriteMessage() {
1039        WriteRequest req = getCurrentWriteRequest();
1040        
1041        if (req == null) {
1042            return null;
1043        }
1044        return req.getMessage();
1045    }
1046
1047    /**
1048     * {@inheritDoc}
1049     */
1050    public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1051        this.currentWriteRequest = currentWriteRequest;
1052    }
1053
1054    /**
1055     * Increase the ReadBuffer size (it will double)
1056     */
1057    public final void increaseReadBufferSize() {
1058        int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1059        if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1060            getConfig().setReadBufferSize(newReadBufferSize);
1061        } else {
1062            getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1063        }
1064
1065        deferDecreaseReadBuffer = true;
1066    }
1067
1068    /**
1069     * Decrease the ReadBuffer size (it will be divided by a factor 2)
1070     */
1071    public final void decreaseReadBufferSize() {
1072        if (deferDecreaseReadBuffer) {
1073            deferDecreaseReadBuffer = false;
1074            return;
1075        }
1076
1077        if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1078            getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1079        }
1080
1081        deferDecreaseReadBuffer = true;
1082    }
1083
1084    /**
1085     * {@inheritDoc}
1086     */
1087    public final long getCreationTime() {
1088        return creationTime;
1089    }
1090
1091    /**
1092     * {@inheritDoc}
1093     */
1094    public final long getLastIoTime() {
1095        return Math.max(lastReadTime, lastWriteTime);
1096    }
1097
1098    /**
1099     * {@inheritDoc}
1100     */
1101    public final long getLastReadTime() {
1102        return lastReadTime;
1103    }
1104
1105    /**
1106     * {@inheritDoc}
1107     */
1108    public final long getLastWriteTime() {
1109        return lastWriteTime;
1110    }
1111
1112    /**
1113     * {@inheritDoc}
1114     */
1115    public final boolean isIdle(IdleStatus status) {
1116        if (status == IdleStatus.BOTH_IDLE) {
1117            return idleCountForBoth.get() > 0;
1118        }
1119
1120        if (status == IdleStatus.READER_IDLE) {
1121            return idleCountForRead.get() > 0;
1122        }
1123
1124        if (status == IdleStatus.WRITER_IDLE) {
1125            return idleCountForWrite.get() > 0;
1126        }
1127
1128        throw new IllegalArgumentException("Unknown idle status: " + status);
1129    }
1130
1131    /**
1132     * {@inheritDoc}
1133     */
1134    public final boolean isBothIdle() {
1135        return isIdle(IdleStatus.BOTH_IDLE);
1136    }
1137
1138    /**
1139     * {@inheritDoc}
1140     */
1141    public final boolean isReaderIdle() {
1142        return isIdle(IdleStatus.READER_IDLE);
1143    }
1144
1145    /**
1146     * {@inheritDoc}
1147     */
1148    public final boolean isWriterIdle() {
1149        return isIdle(IdleStatus.WRITER_IDLE);
1150    }
1151
1152    /**
1153     * {@inheritDoc}
1154     */
1155    public final int getIdleCount(IdleStatus status) {
1156        if (getConfig().getIdleTime(status) == 0) {
1157            if (status == IdleStatus.BOTH_IDLE) {
1158                idleCountForBoth.set(0);
1159            }
1160
1161            if (status == IdleStatus.READER_IDLE) {
1162                idleCountForRead.set(0);
1163            }
1164
1165            if (status == IdleStatus.WRITER_IDLE) {
1166                idleCountForWrite.set(0);
1167            }
1168        }
1169
1170        if (status == IdleStatus.BOTH_IDLE) {
1171            return idleCountForBoth.get();
1172        }
1173
1174        if (status == IdleStatus.READER_IDLE) {
1175            return idleCountForRead.get();
1176        }
1177
1178        if (status == IdleStatus.WRITER_IDLE) {
1179            return idleCountForWrite.get();
1180        }
1181
1182        throw new IllegalArgumentException("Unknown idle status: " + status);
1183    }
1184
1185    /**
1186     * {@inheritDoc}
1187     */
1188    public final long getLastIdleTime(IdleStatus status) {
1189        if (status == IdleStatus.BOTH_IDLE) {
1190            return lastIdleTimeForBoth;
1191        }
1192
1193        if (status == IdleStatus.READER_IDLE) {
1194            return lastIdleTimeForRead;
1195        }
1196
1197        if (status == IdleStatus.WRITER_IDLE) {
1198            return lastIdleTimeForWrite;
1199        }
1200
1201        throw new IllegalArgumentException("Unknown idle status: " + status);
1202    }
1203
1204    /**
1205     * Increase the count of the various Idle counter
1206     * 
1207     * @param status The current status
1208     * @param currentTime The current time
1209     */
1210    public final void increaseIdleCount(IdleStatus status, long currentTime) {
1211        if (status == IdleStatus.BOTH_IDLE) {
1212            idleCountForBoth.incrementAndGet();
1213            lastIdleTimeForBoth = currentTime;
1214        } else if (status == IdleStatus.READER_IDLE) {
1215            idleCountForRead.incrementAndGet();
1216            lastIdleTimeForRead = currentTime;
1217        } else if (status == IdleStatus.WRITER_IDLE) {
1218            idleCountForWrite.incrementAndGet();
1219            lastIdleTimeForWrite = currentTime;
1220        } else {
1221            throw new IllegalArgumentException("Unknown idle status: " + status);
1222        }
1223    }
1224
1225    /**
1226     * {@inheritDoc}
1227     */
1228    public final int getBothIdleCount() {
1229        return getIdleCount(IdleStatus.BOTH_IDLE);
1230    }
1231
1232    /**
1233     * {@inheritDoc}
1234     */
1235    public final long getLastBothIdleTime() {
1236        return getLastIdleTime(IdleStatus.BOTH_IDLE);
1237    }
1238
1239    /**
1240     * {@inheritDoc}
1241     */
1242    public final long getLastReaderIdleTime() {
1243        return getLastIdleTime(IdleStatus.READER_IDLE);
1244    }
1245
1246    /**
1247     * {@inheritDoc}
1248     */
1249    public final long getLastWriterIdleTime() {
1250        return getLastIdleTime(IdleStatus.WRITER_IDLE);
1251    }
1252
1253    /**
1254     * {@inheritDoc}
1255     */
1256    public final int getReaderIdleCount() {
1257        return getIdleCount(IdleStatus.READER_IDLE);
1258    }
1259
1260    /**
1261     * {@inheritDoc}
1262     */
1263    public final int getWriterIdleCount() {
1264        return getIdleCount(IdleStatus.WRITER_IDLE);
1265    }
1266
1267    /**
1268     * {@inheritDoc}
1269     */
1270    public SocketAddress getServiceAddress() {
1271        IoService service = getService();
1272        if (service instanceof IoAcceptor) {
1273            return ((IoAcceptor) service).getLocalAddress();
1274        }
1275
1276        return getRemoteAddress();
1277    }
1278
1279    /**
1280     * {@inheritDoc}
1281     */
1282    @Override
1283    public final int hashCode() {
1284        return super.hashCode();
1285    }
1286
1287    /**
1288     * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1289     * replaced.
1290     */
1291    @Override
1292    public final boolean equals(Object o) {
1293        return super.equals(o);
1294    }
1295
1296    /**
1297     * {@inheritDoc}
1298     */
1299    @Override
1300    public String toString() {
1301        if (isConnected() || isClosing()) {
1302            String remote = null;
1303            String local = null;
1304
1305            try {
1306                remote = String.valueOf(getRemoteAddress());
1307            } catch (Exception e) {
1308                remote = "Cannot get the remote address informations: " + e.getMessage();
1309            }
1310
1311            try {
1312                local = String.valueOf(getLocalAddress());
1313            } catch (Exception e) {
1314            }
1315
1316            if (getService() instanceof IoAcceptor) {
1317                return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1318            }
1319
1320            return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1321        }
1322
1323        return "(" + getIdAsString() + ") Session disconnected ...";
1324    }
1325
1326    /**
1327     * Get the Id as a String
1328     */
1329    private String getIdAsString() {
1330        String id = Long.toHexString(getId()).toUpperCase();
1331        
1332        if (id.length() <= 8) {
1333            return "0x00000000".substring(0, 10 - id.length()) + id;
1334        } else {
1335            return "0x" + id;
1336        }
1337    }
1338
1339    /**
1340     * TGet the Service name
1341     */
1342    private String getServiceName() {
1343        TransportMetadata tm = getTransportMetadata();
1344        if (tm == null) {
1345            return "null";
1346        }
1347
1348        return tm.getProviderName() + ' ' + tm.getName();
1349    }
1350
1351    /**
1352     * {@inheritDoc}
1353     */
1354    public IoService getService() {
1355        return service;
1356    }
1357
1358    /**
1359     * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1360     * in the specified collection.
1361     * 
1362     * @param sessions The sessions that are notified
1363     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1364     */
1365    public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1366        while (sessions.hasNext()) {
1367            IoSession session = sessions.next();
1368            
1369            if (!session.getCloseFuture().isClosed()) {
1370                notifyIdleSession(session, currentTime);
1371            }
1372        }
1373    }
1374
1375    /**
1376     * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1377     * specified {@code session}.
1378     * 
1379     * @param session The session that is notified
1380     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1381     */
1382    public static void notifyIdleSession(IoSession session, long currentTime) {
1383        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1384                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1385
1386        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1387                IdleStatus.READER_IDLE,
1388                Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1389
1390        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1391                IdleStatus.WRITER_IDLE,
1392                Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1393
1394        notifyWriteTimeout(session, currentTime);
1395    }
1396
1397    private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1398            long lastIoTime) {
1399        if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1400            session.getFilterChain().fireSessionIdle(status);
1401        }
1402    }
1403
1404    private static void notifyWriteTimeout(IoSession session, long currentTime) {
1405
1406        long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1407        if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1408                && !session.getWriteRequestQueue().isEmpty(session)) {
1409            WriteRequest request = session.getCurrentWriteRequest();
1410            if (request != null) {
1411                session.setCurrentWriteRequest(null);
1412                WriteTimeoutException cause = new WriteTimeoutException(request);
1413                request.getFuture().setException(cause);
1414                session.getFilterChain().fireExceptionCaught(cause);
1415                // WriteException is an IOException, so we close the session.
1416                session.closeNow();
1417            }
1418        }
1419    }
1420}