View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.file.DefaultFileRegion;
36  import org.apache.mina.core.filterchain.IoFilterChain;
37  import org.apache.mina.core.future.CloseFuture;
38  import org.apache.mina.core.future.DefaultCloseFuture;
39  import org.apache.mina.core.future.DefaultReadFuture;
40  import org.apache.mina.core.future.DefaultWriteFuture;
41  import org.apache.mina.core.future.IoFutureListener;
42  import org.apache.mina.core.future.ReadFuture;
43  import org.apache.mina.core.future.WriteFuture;
44  import org.apache.mina.core.service.AbstractIoService;
45  import org.apache.mina.core.service.IoAcceptor;
46  import org.apache.mina.core.service.IoProcessor;
47  import org.apache.mina.core.service.IoService;
48  import org.apache.mina.core.service.TransportMetadata;
49  import org.apache.mina.core.write.DefaultWriteRequest;
50  import org.apache.mina.core.write.WriteException;
51  import org.apache.mina.core.write.WriteRequest;
52  import org.apache.mina.core.write.WriteRequestQueue;
53  import org.apache.mina.core.write.WriteTimeoutException;
54  import org.apache.mina.core.write.WriteToClosedSessionException;
55  import org.apache.mina.util.CircularQueue;
56  import org.apache.mina.util.ExceptionMonitor;
57  
58  
59  /**
60   * Base implementation of {@link IoSession}.
61   *
62   * @author The Apache MINA Project (dev@mina.apache.org)
63   * @version $Rev: 756270 $, $Date: 2009-03-20 00:49:57 +0100 (Fri, 20 Mar 2009) $
64   */
65  public abstract class AbstractIoSession implements IoSession {
66  
67      private static final AttributeKey READY_READ_FUTURES_KEY =
68          new AttributeKey(AbstractIoSession.class, "readyReadFutures");
69      
70      private static final AttributeKey WAITING_READ_FUTURES_KEY =
71          new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
72  
73      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
74          new IoFutureListener<CloseFuture>() {
75              public void operationComplete(CloseFuture future) {
76                  AbstractIoSession s = (AbstractIoSession) future.getSession();
77                  s.scheduledWriteBytes.set(0);
78                  s.scheduledWriteMessages.set(0);
79                  s.readBytesThroughput = 0;
80                  s.readMessagesThroughput = 0;
81                  s.writtenBytesThroughput = 0;
82                  s.writtenMessagesThroughput = 0;
83              }
84      };
85  
86      /**
87       * An internal write request object that triggers session close.
88       * @see #writeRequestQueue
89       */
90      private static final WriteRequest CLOSE_REQUEST =
91          new DefaultWriteRequest(new Object());
92  
93      private final Object lock = new Object();
94  
95      private IoSessionAttributeMap attributes;
96      private WriteRequestQueue writeRequestQueue;
97      private WriteRequest currentWriteRequest;
98      
99      // The Session creation's time */
100     private final long creationTime;
101 
102     /** An id generator guaranteed to generate unique IDs for the session */
103     private static AtomicLong idGenerator = new AtomicLong(0);
104     
105     /** The session ID */
106     private long sessionId;
107     
108     /**
109      * A future that will be set 'closed' when the connection is closed.
110      */
111     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
112 
113     private volatile boolean closing;
114     
115     // traffic control
116     private boolean readSuspended=false;
117     private boolean writeSuspended=false;
118 
119     // Status variables
120     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
121     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
122     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
123 
124     private long readBytes;
125     private long writtenBytes;
126     private long readMessages;
127     private long writtenMessages;
128     private long lastReadTime;
129     private long lastWriteTime;
130 
131     private long lastThroughputCalculationTime;
132     private long lastReadBytes;
133     private long lastWrittenBytes;
134     private long lastReadMessages;
135     private long lastWrittenMessages;
136     private double readBytesThroughput;
137     private double writtenBytesThroughput;
138     private double readMessagesThroughput;
139     private double writtenMessagesThroughput;
140 
141     private int idleCountForBoth;
142     private int idleCountForRead;
143     private int idleCountForWrite;
144 
145     private long lastIdleTimeForBoth;
146     private long lastIdleTimeForRead;
147     private long lastIdleTimeForWrite;
148 
149     private boolean deferDecreaseReadBuffer = true;
150 
151     /**
152      * TODO Add method documentation
153      */
154     protected AbstractIoSession() {
155         // Initialize all the Session counters to the current time 
156         long currentTime = System.currentTimeMillis();
157         creationTime = currentTime;
158         lastThroughputCalculationTime = currentTime;
159         lastReadTime = currentTime;
160         lastWriteTime = currentTime;
161         lastIdleTimeForBoth = currentTime;
162         lastIdleTimeForRead = currentTime;
163         lastIdleTimeForWrite = currentTime;
164         
165         // TODO add documentation
166         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
167         
168         // Set a new ID for this session
169         sessionId = idGenerator.incrementAndGet();
170     }
171 
172     /**
173      * {@inheritDoc}
174      * 
175      * We use an AtomicLong to guarantee that the session ID are
176      * unique.
177      */
178     public final long getId() {
179         return sessionId;
180     }
181 
182     /**
183      * TODO Add method documentation
184      */
185     public abstract IoProcessor getProcessor();
186 
187     /**
188      * {@inheritDoc}
189      */
190     public final boolean isConnected() {
191         return !closeFuture.isClosed();
192     }
193 
194     /**
195      * {@inheritDoc}
196      */
197     public final boolean isClosing() {
198         return closing || closeFuture.isClosed();
199     }
200 
201     /**
202      * {@inheritDoc}
203      */
204     public final CloseFuture getCloseFuture() {
205         return closeFuture;
206     }
207 
208     /**
209      * TODO Add method documentation
210      */
211     public final boolean isScheduledForFlush() {
212         return scheduledForFlush.get();
213     }
214 
215     /**
216      * TODO Add method documentation
217      */
218     public final boolean setScheduledForFlush(boolean flag) {
219         if (flag) {
220             return scheduledForFlush.compareAndSet(false, true);
221         } else {
222             scheduledForFlush.set(false);
223             return true;
224         }
225     }
226 
227     /**
228      * {@inheritDoc}
229      */
230     public final CloseFuture close(boolean rightNow) {
231         if (rightNow) {
232             return close();
233         } else {
234             return closeOnFlush();
235         }
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     public final CloseFuture close() {
242         synchronized (lock) {
243             if (isClosing()) {
244                 return closeFuture;
245             } else {
246                 closing = true;
247             }
248         }
249 
250         getFilterChain().fireFilterClose();
251         return closeFuture;
252     }
253 
254     private final CloseFuture closeOnFlush() {
255         getWriteRequestQueue().offer(this, CLOSE_REQUEST);
256         getProcessor().flush(this);
257         return closeFuture;
258     }
259 
260     /**
261      * {@inheritDoc}
262      */
263     public final ReadFuture read() {
264         if (!getConfig().isUseReadOperation()) {
265             throw new IllegalStateException("useReadOperation is not enabled.");
266         }
267 
268         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
269         ReadFuture future;
270         synchronized (readyReadFutures) {
271             future = readyReadFutures.poll();
272             if (future != null) {
273                 if (future.isClosed()) {
274                     // Let other readers get notified.
275                     readyReadFutures.offer(future);
276                 }
277             } else {
278                 future = new DefaultReadFuture(this);
279                 getWaitingReadFutures().offer(future);
280             }
281         }
282 
283         return future;
284     }
285 
286     /**
287      * TODO Add method documentation
288      */
289     public final void offerReadFuture(Object message) {
290         newReadFuture().setRead(message);
291     }
292 
293     /**
294      * TODO Add method documentation
295      */
296     public final void offerFailedReadFuture(Throwable exception) {
297         newReadFuture().setException(exception);
298     }
299 
300     /**
301      * TODO Add method documentation
302      */
303     public final void offerClosedReadFuture() {
304         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
305         synchronized (readyReadFutures) {
306             newReadFuture().setClosed();
307         }
308     }
309 
310     /**
311      * TODO Add method documentation
312      */
313     private ReadFuture newReadFuture() {
314         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
315         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
316         ReadFuture future;
317         synchronized (readyReadFutures) {
318             future = waitingReadFutures.poll();
319             if (future == null) {
320                 future = new DefaultReadFuture(this);
321                 readyReadFutures.offer(future);
322             }
323         }
324         return future;
325     }
326 
327     /**
328      * TODO Add method documentation
329      */
330     private Queue<ReadFuture> getReadyReadFutures() {
331         Queue<ReadFuture> readyReadFutures =
332             (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
333         if (readyReadFutures == null) {
334             readyReadFutures = new CircularQueue<ReadFuture>();
335 
336             Queue<ReadFuture> oldReadyReadFutures =
337                 (Queue<ReadFuture>) setAttributeIfAbsent(
338                         READY_READ_FUTURES_KEY, readyReadFutures);
339             if (oldReadyReadFutures != null) {
340                 readyReadFutures = oldReadyReadFutures;
341             }
342         }
343         return readyReadFutures;
344     }
345 
346     /**
347      * TODO Add method documentation
348      */
349     private Queue<ReadFuture> getWaitingReadFutures() {
350         Queue<ReadFuture> waitingReadyReadFutures =
351             (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
352         if (waitingReadyReadFutures == null) {
353             waitingReadyReadFutures = new CircularQueue<ReadFuture>();
354 
355             Queue<ReadFuture> oldWaitingReadyReadFutures =
356                 (Queue<ReadFuture>) setAttributeIfAbsent(
357                         WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
358             if (oldWaitingReadyReadFutures != null) {
359                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
360             }
361         }
362         return waitingReadyReadFutures;
363     }
364 
365     /**
366      * {@inheritDoc}
367      */
368     public WriteFuture write(Object message) {
369         return write(message, null);
370     }
371 
372     /**
373      * {@inheritDoc}
374      */
375     public WriteFuture write(Object message, SocketAddress remoteAddress) {
376         if (message == null) {
377             throw new NullPointerException("message");
378         }
379 
380         // We can't send a message to a connected session if we don't have 
381         // the remote address
382         if (!getTransportMetadata().isConnectionless() &&
383                 remoteAddress != null) {
384             throw new UnsupportedOperationException();
385         }
386 
387         
388         // If the session has been closed or is closing, we can't either
389         // send a message to the remote side. We generate a future
390         // containing an exception.
391         if (isClosing() || !isConnected()) {
392             WriteFuture future = new DefaultWriteFuture(this);
393             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
394             WriteException writeException = new WriteToClosedSessionException(request);
395             future.setException(writeException);
396             return future;
397         }
398 
399         FileChannel openedFileChannel = null;
400         
401         // TODO: remove this code as soon as we use InputStream
402         // instead of Object for the message.
403         try {
404             if (message instanceof IoBuffer
405                     && !((IoBuffer) message).hasRemaining()) {
406                 // Nothing to write : probably an error in the user code
407                 throw new IllegalArgumentException(
408                 "message is empty. Forgot to call flip()?");
409             } else if (message instanceof FileChannel) {
410                 FileChannel fileChannel = (FileChannel) message;
411                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
412             } else if (message instanceof File) {
413                 File file = (File) message;
414                 openedFileChannel = new FileInputStream(file).getChannel();
415                 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
416             }
417         } catch (IOException e) {
418             ExceptionMonitor.getInstance().exceptionCaught(e);
419             return DefaultWriteFuture.newNotWrittenFuture(this, e);
420         }
421 
422         // Now, we can write the message. First, create a future
423         WriteFuture writeFuture = new DefaultWriteFuture(this);
424         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
425         
426         // Then, get the chain and inject the WriteRequest into it
427         IoFilterChain filterChain = getFilterChain();
428         filterChain.fireFilterWrite(writeRequest);
429 
430         // TODO : This is not our business ! The caller has created a FileChannel,
431         // he has to close it !
432         if (openedFileChannel != null) {
433             // If we opened a FileChannel, it needs to be closed when the write has completed
434             final FileChannel finalChannel = openedFileChannel;
435             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
436                 public void operationComplete(WriteFuture future) {
437                     try {
438                         finalChannel.close();
439                     } catch (IOException e) {
440                         ExceptionMonitor.getInstance().exceptionCaught(e);
441                     }
442                 }
443             });
444         }
445 
446         // Return the WriteFuture.
447         return writeFuture;
448     }
449 
450     /**
451      * {@inheritDoc}
452      */
453     public final Object getAttachment() {
454         return getAttribute("");
455     }
456 
457     /**
458      * {@inheritDoc}
459      */
460     public final Object setAttachment(Object attachment) {
461         return setAttribute("", attachment);
462     }
463 
464     /**
465      * {@inheritDoc}
466      */
467     public final Object getAttribute(Object key) {
468         return getAttribute(key, null);
469     }
470 
471     /**
472      * {@inheritDoc}
473      */
474     public final Object getAttribute(Object key, Object defaultValue) {
475         return attributes.getAttribute(this, key, defaultValue);
476     }
477 
478     /**
479      * {@inheritDoc}
480      */
481     public final Object setAttribute(Object key, Object value) {
482         return attributes.setAttribute(this, key, value);
483     }
484 
485     /**
486      * {@inheritDoc}
487      */
488     public final Object setAttribute(Object key) {
489         return setAttribute(key, Boolean.TRUE);
490     }
491 
492     /**
493      * {@inheritDoc}
494      */
495     public final Object setAttributeIfAbsent(Object key, Object value) {
496         return attributes.setAttributeIfAbsent(this, key, value);
497     }
498 
499     /**
500      * {@inheritDoc}
501      */
502     public final Object setAttributeIfAbsent(Object key) {
503         return setAttributeIfAbsent(key, Boolean.TRUE);
504     }
505 
506     /**
507      * {@inheritDoc}
508      */
509     public final Object removeAttribute(Object key) {
510         return attributes.removeAttribute(this, key);
511     }
512 
513     /**
514      * {@inheritDoc}
515      */
516     public final boolean removeAttribute(Object key, Object value) {
517         return attributes.removeAttribute(this, key, value);
518     }
519 
520     /**
521      * {@inheritDoc}
522      */
523     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
524         return attributes.replaceAttribute(this, key, oldValue, newValue);
525     }
526 
527     /**
528      * {@inheritDoc}
529      */
530     public final boolean containsAttribute(Object key) {
531         return attributes.containsAttribute(this, key);
532     }
533 
534     /**
535      * {@inheritDoc}
536      */
537     public final Set<Object> getAttributeKeys() {
538         return attributes.getAttributeKeys(this);
539     }
540 
541     /**
542      * TODO Add method documentation
543      */
544     public final IoSessionAttributeMap getAttributeMap() {
545         return attributes;
546     }
547 
548     /**
549      * TODO Add method documentation
550      */
551     public final void setAttributeMap(IoSessionAttributeMap attributes) {
552         this.attributes = attributes;
553     }
554 
555     /**
556      * Create a new close aware write queue, based on the given write queue.
557      * 
558      * @param writeRequestQueue The write request queue
559      */
560     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
561         this.writeRequestQueue =
562             new CloseAwareWriteQueue(writeRequestQueue);
563     }
564 
565 
566     /**
567      * {@inheritDoc}
568      */
569     public final void suspendRead() {
570         readSuspended = true;
571         if (isClosing() || !isConnected()) {
572             return;
573         }
574         getProcessor().updateTrafficControl(this);
575     }
576 
577     /**
578      * {@inheritDoc}
579      */
580     public final void suspendWrite() {
581         writeSuspended = true;
582         if (isClosing() || !isConnected()) {
583             return;
584         }
585         getProcessor().updateTrafficControl(this);
586     }
587 
588     /**
589      * {@inheritDoc}
590      */
591     @SuppressWarnings("unchecked")
592 	public final void resumeRead() {
593         readSuspended = false;
594         if (isClosing() || !isConnected()) {
595             return;
596         }
597         getProcessor().updateTrafficControl(this);
598     }
599 
600     /**
601      * {@inheritDoc}
602      */
603     @SuppressWarnings("unchecked")
604 	public final void resumeWrite() {
605         writeSuspended = false;
606         if (isClosing() || !isConnected()) {
607             return;
608         }
609         getProcessor().updateTrafficControl(this);
610     }
611 
612     /**
613      * {@inheritDoc}
614      */
615     public boolean isReadSuspended() {
616     	return readSuspended;
617     }
618 
619     /**
620      * {@inheritDoc}
621      */
622     public boolean isWriteSuspended() {
623     	return writeSuspended; 
624     }
625     
626     /**
627      * {@inheritDoc}
628      */
629     public final long getReadBytes() {
630         return readBytes;
631     }
632 
633     /**
634      * {@inheritDoc}
635      */
636     public final long getWrittenBytes() {
637         return writtenBytes;
638     }
639 
640     /**
641      * {@inheritDoc}
642      */
643     public final long getReadMessages() {
644         return readMessages;
645     }
646 
647     /**
648      * {@inheritDoc}
649      */
650     public final long getWrittenMessages() {
651         return writtenMessages;
652     }
653 
654     /**
655      * {@inheritDoc}
656      */
657     public final double getReadBytesThroughput() {
658         return readBytesThroughput;
659     }
660 
661     /**
662      * {@inheritDoc}
663      */
664     public final double getWrittenBytesThroughput() {
665         return writtenBytesThroughput;
666     }
667 
668     /**
669      * {@inheritDoc}
670      */
671     public final double getReadMessagesThroughput() {
672         return readMessagesThroughput;
673     }
674 
675     /**
676      * {@inheritDoc}
677      */
678     public final double getWrittenMessagesThroughput() {
679         return writtenMessagesThroughput;
680     }
681 
682     /**
683      * {@inheritDoc}
684      */
685     public final void updateThroughput(long currentTime, boolean force) {
686         int interval = (int) (currentTime - lastThroughputCalculationTime);
687 
688         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
689         if (minInterval == 0 || interval < minInterval) {
690             if (!force) {
691                 return;
692             }
693         }
694 
695         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
696         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
697         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
698         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
699 
700         lastReadBytes = readBytes;
701         lastWrittenBytes = writtenBytes;
702         lastReadMessages = readMessages;
703         lastWrittenMessages = writtenMessages;
704 
705         lastThroughputCalculationTime = currentTime;
706     }
707 
708     /**
709      * {@inheritDoc}
710      */
711     public final long getScheduledWriteBytes() {
712         return scheduledWriteBytes.get();
713     }
714 
715     /**
716      * {@inheritDoc}
717      */
718     public final int getScheduledWriteMessages() {
719         return scheduledWriteMessages.get();
720     }
721 
722     /**
723      * TODO Add method documentation
724      */
725     protected void setScheduledWriteBytes(int byteCount){
726         scheduledWriteBytes.set(byteCount);
727     }
728 
729     /**
730      * TODO Add method documentation
731      */
732     protected void setScheduledWriteMessages(int messages) {
733         scheduledWriteMessages.set(messages);
734     }
735 
736     /**
737      * TODO Add method documentation
738      */
739     public final void increaseReadBytes(long increment, long currentTime) {
740         if (increment <= 0) {
741             return;
742         }
743 
744         readBytes += increment;
745         lastReadTime = currentTime;
746         idleCountForBoth = 0;
747         idleCountForRead = 0;
748 
749         if (getService() instanceof AbstractIoService) {
750             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
751         }
752     }
753 
754     /**
755      * TODO Add method documentation
756      */
757     public final void increaseReadMessages(long currentTime) {
758         readMessages++;
759         lastReadTime = currentTime;
760         idleCountForBoth = 0;
761         idleCountForRead = 0;
762 
763         if (getService() instanceof AbstractIoService) {
764             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
765         }
766     }
767 
768     /**
769      * TODO Add method documentation
770      */
771     public final void increaseWrittenBytes(int increment, long currentTime) {
772         if (increment <= 0) {
773             return;
774         }
775 
776         writtenBytes += increment;
777         lastWriteTime = currentTime;
778         idleCountForBoth = 0;
779         idleCountForWrite = 0;
780 
781         if (getService() instanceof AbstractIoService) {
782             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
783         }
784 
785         increaseScheduledWriteBytes(-increment);
786     }
787 
788     /**
789      * TODO Add method documentation
790      */
791     public final void increaseWrittenMessages(
792             WriteRequest request, long currentTime) {
793         Object message = request.getMessage();
794         if (message instanceof IoBuffer) {
795             IoBuffer b = (IoBuffer) message;
796             if (b.hasRemaining()) {
797                 return;
798             }
799         }
800 
801         writtenMessages++;
802         lastWriteTime = currentTime;
803         if (getService() instanceof AbstractIoService) {
804             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
805         }
806 
807         decreaseScheduledWriteMessages();
808     }
809 
810     /**
811      * TODO Add method documentation
812      */
813     public final void increaseScheduledWriteBytes(int increment) {
814         scheduledWriteBytes.addAndGet(increment);
815         if (getService() instanceof AbstractIoService) {
816             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
817         }
818     }
819 
820     /**
821      * TODO Add method documentation
822      */
823     public final void increaseScheduledWriteMessages() {
824         scheduledWriteMessages.incrementAndGet();
825         if (getService() instanceof AbstractIoService) {
826             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
827         }
828     }
829 
830     /**
831      * TODO Add method documentation
832      */
833     private void decreaseScheduledWriteMessages() {
834         scheduledWriteMessages.decrementAndGet();
835         if (getService() instanceof AbstractIoService) {
836             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
837         }
838     }
839 
840     /**
841      * TODO Add method documentation
842      */
843     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
844         Object message = request.getMessage();
845         if (message instanceof IoBuffer) {
846             IoBuffer b = (IoBuffer) message;
847             if (b.hasRemaining()) {
848                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
849             } else {
850                 decreaseScheduledWriteMessages();
851             }
852         } else {
853             decreaseScheduledWriteMessages();
854         }
855     }
856 
857     /**
858      * {@inheritDoc}
859      */
860     public final WriteRequestQueue getWriteRequestQueue() {
861         if (writeRequestQueue == null) {
862             throw new IllegalStateException();
863         }
864         return writeRequestQueue;
865     }
866 
867     /**
868      * {@inheritDoc}
869      */
870     public final WriteRequest getCurrentWriteRequest() {
871         return currentWriteRequest;
872     }
873 
874     /**
875      * {@inheritDoc}
876      */
877     public final Object getCurrentWriteMessage() {
878         WriteRequest req = getCurrentWriteRequest();
879         if (req == null) {
880             return null;
881         }
882         return req.getMessage();
883     }
884 
885     /**
886      * {@inheritDoc}
887      */
888     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
889         this.currentWriteRequest = currentWriteRequest;
890     }
891 
892     /**
893      * TODO Add method documentation
894      */
895     public final void increaseReadBufferSize() {
896         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
897         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
898             getConfig().setReadBufferSize(newReadBufferSize);
899         } else {
900             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
901         }
902 
903         deferDecreaseReadBuffer = true;
904     }
905 
906     /**
907      * TODO Add method documentation
908      */
909     public final void decreaseReadBufferSize() {
910         if (deferDecreaseReadBuffer) {
911             deferDecreaseReadBuffer = false;
912             return;
913         }
914 
915         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
916             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
917         }
918 
919         deferDecreaseReadBuffer = true;
920     }
921 
922     /**
923      * {@inheritDoc}
924      */
925     public final long getCreationTime() {
926         return creationTime;
927     }
928 
929     /**
930      * {@inheritDoc}
931      */
932     public final long getLastIoTime() {
933         return Math.max(lastReadTime, lastWriteTime);
934     }
935 
936     /**
937      * {@inheritDoc}
938      */
939     public final long getLastReadTime() {
940         return lastReadTime;
941     }
942 
943     /**
944      * {@inheritDoc}
945      */
946     public final long getLastWriteTime() {
947         return lastWriteTime;
948     }
949 
950     /**
951      * {@inheritDoc}
952      */
953     public final boolean isIdle(IdleStatus status) {
954         if (status == IdleStatus.BOTH_IDLE) {
955             return idleCountForBoth > 0;
956         }
957 
958         if (status == IdleStatus.READER_IDLE) {
959             return idleCountForRead > 0;
960         }
961 
962         if (status == IdleStatus.WRITER_IDLE) {
963             return idleCountForWrite > 0;
964         }
965 
966         throw new IllegalArgumentException("Unknown idle status: " + status);
967     }
968 
969     /**
970      * {@inheritDoc}
971      */
972     public final boolean isBothIdle() {
973         return isIdle(IdleStatus.BOTH_IDLE);
974     }
975 
976     /**
977      * {@inheritDoc}
978      */
979     public final boolean isReaderIdle() {
980         return isIdle(IdleStatus.READER_IDLE);
981     }
982 
983     /**
984      * {@inheritDoc}
985      */
986     public final boolean isWriterIdle() {
987         return isIdle(IdleStatus.WRITER_IDLE);
988     }
989 
990     /**
991      * {@inheritDoc}
992      */
993     public final int getIdleCount(IdleStatus status) {
994         if (getConfig().getIdleTime(status) == 0) {
995             if (status == IdleStatus.BOTH_IDLE) {
996                 idleCountForBoth = 0;
997             }
998 
999             if (status == IdleStatus.READER_IDLE) {
1000                 idleCountForRead = 0;
1001             }
1002 
1003             if (status == IdleStatus.WRITER_IDLE) {
1004                 idleCountForWrite = 0;
1005             }
1006         }
1007 
1008         if (status == IdleStatus.BOTH_IDLE) {
1009             return idleCountForBoth;
1010         }
1011 
1012         if (status == IdleStatus.READER_IDLE) {
1013             return idleCountForRead;
1014         }
1015 
1016         if (status == IdleStatus.WRITER_IDLE) {
1017             return idleCountForWrite;
1018         }
1019 
1020         throw new IllegalArgumentException("Unknown idle status: " + status);
1021     }
1022 
1023     /**
1024      * {@inheritDoc}
1025      */
1026     public final long getLastIdleTime(IdleStatus status) {
1027         if (status == IdleStatus.BOTH_IDLE) {
1028             return lastIdleTimeForBoth;
1029         }
1030 
1031         if (status == IdleStatus.READER_IDLE) {
1032             return lastIdleTimeForRead;
1033         }
1034 
1035         if (status == IdleStatus.WRITER_IDLE) {
1036             return lastIdleTimeForWrite;
1037         }
1038 
1039         throw new IllegalArgumentException("Unknown idle status: " + status);
1040     }
1041 
1042     /**
1043      * TODO Add method documentation
1044      */
1045     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1046         if (status == IdleStatus.BOTH_IDLE) {
1047             idleCountForBoth++;
1048             lastIdleTimeForBoth = currentTime;
1049         } else if (status == IdleStatus.READER_IDLE) {
1050             idleCountForRead++;
1051             lastIdleTimeForRead = currentTime;
1052         } else if (status == IdleStatus.WRITER_IDLE) {
1053             idleCountForWrite++;
1054             lastIdleTimeForWrite = currentTime;
1055         } else {
1056             throw new IllegalArgumentException("Unknown idle status: " + status);
1057         }
1058     }
1059 
1060     /**
1061      * {@inheritDoc}
1062      */
1063     public final int getBothIdleCount() {
1064         return getIdleCount(IdleStatus.BOTH_IDLE);
1065     }
1066 
1067     /**
1068      * {@inheritDoc}
1069      */
1070     public final long getLastBothIdleTime() {
1071         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1072     }
1073 
1074     /**
1075      * {@inheritDoc}
1076      */
1077     public final long getLastReaderIdleTime() {
1078         return getLastIdleTime(IdleStatus.READER_IDLE);
1079     }
1080 
1081     /**
1082      * {@inheritDoc}
1083      */
1084     public final long getLastWriterIdleTime() {
1085         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1086     }
1087 
1088     /**
1089      * {@inheritDoc}
1090      */
1091     public final int getReaderIdleCount() {
1092         return getIdleCount(IdleStatus.READER_IDLE);
1093     }
1094 
1095     /**
1096      * {@inheritDoc}
1097      */
1098     public final int getWriterIdleCount() {
1099         return getIdleCount(IdleStatus.WRITER_IDLE);
1100     }
1101 
1102     /**
1103      * {@inheritDoc}
1104      */
1105     public SocketAddress getServiceAddress() {
1106         IoService service = getService();
1107         if (service instanceof IoAcceptor) {
1108             return ((IoAcceptor) service).getLocalAddress();
1109         } else {
1110             return getRemoteAddress();
1111         }
1112     }
1113 
1114     /**
1115      * {@inheritDoc}
1116      */
1117     @Override
1118     public final int hashCode() {
1119         return super.hashCode();
1120     }
1121 
1122     /**
1123      * {@inheritDoc}
1124      * TODO This is a ridiculous implementation. Need to be replaced.
1125      */
1126     @Override
1127     public final boolean equals(Object o) {
1128         return super.equals(o);
1129     }
1130 
1131     /**
1132      * {@inheritDoc}
1133      */
1134     @Override
1135     public String toString() {
1136         if (isConnected()||isClosing()) {
1137             if (getService() instanceof IoAcceptor) {
1138                 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1139                         getRemoteAddress() + " => " + getLocalAddress() + ')';
1140             } else {
1141                 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1142                         getLocalAddress() + " => " + getRemoteAddress() + ')';
1143             }
1144         } else {
1145             return "Session disconnected ...";
1146         }
1147         
1148     }
1149 
1150     /**
1151      * TODO Add method documentation
1152      */
1153     private String getIdAsString() {
1154         String id = Long.toHexString(getId()).toUpperCase();
1155 
1156         // Somewhat inefficient, but it won't happen that often
1157         // because an ID is often a big integer.
1158         while (id.length() < 8) {
1159             id = '0' + id; // padding
1160         }
1161         id = "0x" + id;
1162 
1163         return id;
1164     }
1165 
1166     /**
1167      * TODO Add method documentation
1168      */
1169     private String getServiceName() {
1170         TransportMetadata tm = getTransportMetadata();
1171         if (tm == null) {
1172             return "null";
1173         } else {
1174             return tm.getProviderName() + ' ' + tm.getName();
1175         }
1176     }
1177 
1178     /**
1179      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable
1180      * sessions in the specified collection.
1181      *
1182      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1183      */
1184     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1185         IoSession s = null;
1186         while (sessions.hasNext()) {
1187             s = sessions.next();
1188             notifyIdleSession(s, currentTime);
1189         }
1190     }
1191 
1192     /**
1193      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1194      * specified {@code session}.
1195      *
1196      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1197      */
1198     public static void notifyIdleSession(IoSession session, long currentTime) {
1199         notifyIdleSession0(
1200                 session, currentTime,
1201                 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1202                 IdleStatus.BOTH_IDLE, Math.max(
1203                     session.getLastIoTime(),
1204                     session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1205 
1206         notifyIdleSession0(
1207                 session, currentTime,
1208                 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1209                 IdleStatus.READER_IDLE, Math.max(
1210                     session.getLastReadTime(),
1211                     session.getLastIdleTime(IdleStatus.READER_IDLE)));
1212 
1213         notifyIdleSession0(
1214                 session, currentTime,
1215                 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1216                 IdleStatus.WRITER_IDLE, Math.max(
1217                     session.getLastWriteTime(),
1218                     session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1219 
1220         notifyWriteTimeout(session, currentTime);
1221     }
1222 
1223     private static void notifyIdleSession0(
1224             IoSession session, long currentTime,
1225             long idleTime, IdleStatus status, long lastIoTime) {
1226         if (idleTime > 0 && lastIoTime != 0
1227                 && currentTime - lastIoTime >= idleTime) {
1228             session.getFilterChain().fireSessionIdle(status);
1229         }
1230     }
1231 
1232     private static void notifyWriteTimeout(
1233             IoSession session, long currentTime) {
1234 
1235         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1236         if (writeTimeout > 0 &&
1237                 currentTime - session.getLastWriteTime() >= writeTimeout &&
1238                 !session.getWriteRequestQueue().isEmpty(session)) {
1239             WriteRequest request = session.getCurrentWriteRequest();
1240             if (request != null) {
1241                 session.setCurrentWriteRequest(null);
1242                 WriteTimeoutException cause = new WriteTimeoutException(request);
1243                 request.getFuture().setException(cause);
1244                 session.getFilterChain().fireExceptionCaught(cause);
1245                 // WriteException is an IOException, so we close the session.
1246                 session.close(true);
1247             }
1248         }
1249     }
1250 
1251     
1252     
1253     /**
1254      * A queue which handles the CLOSE request.
1255      * 
1256      * TODO : Check that when closing a session, all the pending
1257      * requests are correctly sent.
1258      */
1259     private class CloseAwareWriteQueue implements WriteRequestQueue {
1260 
1261         private final WriteRequestQueue q;
1262 
1263         /**
1264          * {@inheritDoc}
1265          */
1266         public CloseAwareWriteQueue(WriteRequestQueue q) {
1267             this.q = q;
1268         }
1269 
1270         /**
1271          * {@inheritDoc}
1272          */
1273         public synchronized WriteRequest poll(IoSession session) {
1274             WriteRequest answer = q.poll(session);
1275             
1276             if (answer == CLOSE_REQUEST) {
1277                 AbstractIoSession.this.close();
1278                 dispose(session);
1279                 answer = null;
1280             }
1281             
1282             return answer;
1283         }
1284 
1285         /**
1286          * {@inheritDoc}
1287          */
1288         public void offer(IoSession session, WriteRequest e) {
1289             q.offer(session, e);
1290         }
1291 
1292         /**
1293          * {@inheritDoc}
1294          */
1295         public boolean isEmpty(IoSession session) {
1296             return q.isEmpty(session);
1297         }
1298 
1299         /**
1300          * {@inheritDoc}
1301          */
1302         public void clear(IoSession session) {
1303             q.clear(session);
1304         }
1305 
1306         /**
1307          * {@inheritDoc}
1308          */
1309         public void dispose(IoSession session) {
1310             q.dispose(session);
1311         }
1312     }
1313 }