View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.file.DefaultFileRegion;
36  import org.apache.mina.core.filterchain.IoFilterChain;
37  import org.apache.mina.core.future.CloseFuture;
38  import org.apache.mina.core.future.DefaultCloseFuture;
39  import org.apache.mina.core.future.DefaultReadFuture;
40  import org.apache.mina.core.future.DefaultWriteFuture;
41  import org.apache.mina.core.future.IoFutureListener;
42  import org.apache.mina.core.future.ReadFuture;
43  import org.apache.mina.core.future.WriteFuture;
44  import org.apache.mina.core.service.AbstractIoService;
45  import org.apache.mina.core.service.IoAcceptor;
46  import org.apache.mina.core.service.IoProcessor;
47  import org.apache.mina.core.service.IoService;
48  import org.apache.mina.core.service.TransportMetadata;
49  import org.apache.mina.core.write.DefaultWriteRequest;
50  import org.apache.mina.core.write.WriteException;
51  import org.apache.mina.core.write.WriteRequest;
52  import org.apache.mina.core.write.WriteRequestQueue;
53  import org.apache.mina.core.write.WriteTimeoutException;
54  import org.apache.mina.core.write.WriteToClosedSessionException;
55  import org.apache.mina.util.CircularQueue;
56  import org.apache.mina.util.ExceptionMonitor;
57  
58  
59  /**
60   * Base implementation of {@link IoSession}.
61   *
62   * @author The Apache MINA Project (dev@mina.apache.org)
63   */
64  public abstract class AbstractIoSession implements IoSession {
65  
66      private static final AttributeKey READY_READ_FUTURES_KEY =
67          new AttributeKey(AbstractIoSession.class, "readyReadFutures");
68      
69      private static final AttributeKey WAITING_READ_FUTURES_KEY =
70          new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
71  
72      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
73          new IoFutureListener<CloseFuture>() {
74              public void operationComplete(CloseFuture future) {
75                  AbstractIoSession session = (AbstractIoSession) future.getSession();
76                  session.scheduledWriteBytes.set(0);
77                  session.scheduledWriteMessages.set(0);
78                  session.readBytesThroughput = 0;
79                  session.readMessagesThroughput = 0;
80                  session.writtenBytesThroughput = 0;
81                  session.writtenMessagesThroughput = 0;
82              }
83      };
84  
85      /**
86       * An internal write request object that triggers session close.
87       * @see #writeRequestQueue
88       */
89      private static final WriteRequest CLOSE_REQUEST =
90          new DefaultWriteRequest(new Object());
91  
92      private final Object lock = new Object();
93  
94      private IoSessionAttributeMap attributes;
95      private WriteRequestQueue writeRequestQueue;
96      private WriteRequest currentWriteRequest;
97      
98      // The Session creation's time */
99      private final long creationTime;
100 
101     /** An id generator guaranteed to generate unique IDs for the session */
102     private static AtomicLong idGenerator = new AtomicLong(0);
103     
104     /** The session ID */
105     private long sessionId;
106     
107     /**
108      * A future that will be set 'closed' when the connection is closed.
109      */
110     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
111 
112     private volatile boolean closing;
113     
114     // traffic control
115     private boolean readSuspended=false;
116     private boolean writeSuspended=false;
117 
118     // Status variables
119     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
120     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
121     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
122 
123     private long readBytes;
124     private long writtenBytes;
125     private long readMessages;
126     private long writtenMessages;
127     private long lastReadTime;
128     private long lastWriteTime;
129 
130     private long lastThroughputCalculationTime;
131     private long lastReadBytes;
132     private long lastWrittenBytes;
133     private long lastReadMessages;
134     private long lastWrittenMessages;
135     private double readBytesThroughput;
136     private double writtenBytesThroughput;
137     private double readMessagesThroughput;
138     private double writtenMessagesThroughput;
139 
140     private int idleCountForBoth;
141     private int idleCountForRead;
142     private int idleCountForWrite;
143 
144     private long lastIdleTimeForBoth;
145     private long lastIdleTimeForRead;
146     private long lastIdleTimeForWrite;
147 
148     private boolean deferDecreaseReadBuffer = true;
149 
150     /**
151      * TODO Add method documentation
152      */
153     protected AbstractIoSession() {
154         // Initialize all the Session counters to the current time 
155         long currentTime = System.currentTimeMillis();
156         creationTime = currentTime;
157         lastThroughputCalculationTime = currentTime;
158         lastReadTime = currentTime;
159         lastWriteTime = currentTime;
160         lastIdleTimeForBoth = currentTime;
161         lastIdleTimeForRead = currentTime;
162         lastIdleTimeForWrite = currentTime;
163         
164         // TODO add documentation
165         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
166         
167         // Set a new ID for this session
168         sessionId = idGenerator.incrementAndGet();
169     }
170 
171     /**
172      * {@inheritDoc}
173      * 
174      * We use an AtomicLong to guarantee that the session ID are
175      * unique.
176      */
177     public final long getId() {
178         return sessionId;
179     }
180 
181     /**
182      * TODO Add method documentation
183      */
184     public abstract IoProcessor getProcessor();
185 
186     /**
187      * {@inheritDoc}
188      */
189     public final boolean isConnected() {
190         return !closeFuture.isClosed();
191     }
192 
193     /**
194      * {@inheritDoc}
195      */
196     public final boolean isClosing() {
197         return closing || closeFuture.isClosed();
198     }
199 
200     /**
201      * {@inheritDoc}
202      */
203     public final CloseFuture getCloseFuture() {
204         return closeFuture;
205     }
206 
207     /**
208      * TODO Add method documentation
209      */
210     public final boolean isScheduledForFlush() {
211         return scheduledForFlush.get();
212     }
213 
214     /**
215      * TODO Add method documentation
216      */
217     public final boolean setScheduledForFlush(boolean flag) {
218         if (flag) {
219             return scheduledForFlush.compareAndSet(false, true);
220         }
221         
222         scheduledForFlush.set(false);
223         return true;
224     }
225 
226     /**
227      * {@inheritDoc}
228      */
229     public final CloseFuture close(boolean rightNow) {
230         if (rightNow) {
231             return close();
232         }
233         
234         return closeOnFlush();
235     }
236 
237     /**
238      * {@inheritDoc}
239      */
240     public final CloseFuture close() {
241         synchronized (lock) {
242             if (isClosing()) {
243                 return closeFuture;
244             }
245             
246             closing = true;
247         }
248 
249         getFilterChain().fireFilterClose();
250         return closeFuture;
251     }
252 
253     private final CloseFuture closeOnFlush() {
254         getWriteRequestQueue().offer(this, CLOSE_REQUEST);
255         getProcessor().flush(this);
256         return closeFuture;
257     }
258 
259     /**
260      * {@inheritDoc}
261      */
262     public final ReadFuture read() {
263         if (!getConfig().isUseReadOperation()) {
264             throw new IllegalStateException("useReadOperation is not enabled.");
265         }
266 
267         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
268         ReadFuture future;
269         synchronized (readyReadFutures) {
270             future = readyReadFutures.poll();
271             if (future != null) {
272                 if (future.isClosed()) {
273                     // Let other readers get notified.
274                     readyReadFutures.offer(future);
275                 }
276             } else {
277                 future = new DefaultReadFuture(this);
278                 getWaitingReadFutures().offer(future);
279             }
280         }
281 
282         return future;
283     }
284 
285     /**
286      * TODO Add method documentation
287      */
288     public final void offerReadFuture(Object message) {
289         newReadFuture().setRead(message);
290     }
291 
292     /**
293      * TODO Add method documentation
294      */
295     public final void offerFailedReadFuture(Throwable exception) {
296         newReadFuture().setException(exception);
297     }
298 
299     /**
300      * TODO Add method documentation
301      */
302     public final void offerClosedReadFuture() {
303         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
304         synchronized (readyReadFutures) {
305             newReadFuture().setClosed();
306         }
307     }
308 
309     /**
310      * TODO Add method documentation
311      */
312     private ReadFuture newReadFuture() {
313         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
314         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
315         ReadFuture future;
316         synchronized (readyReadFutures) {
317             future = waitingReadFutures.poll();
318             if (future == null) {
319                 future = new DefaultReadFuture(this);
320                 readyReadFutures.offer(future);
321             }
322         }
323         return future;
324     }
325 
326     /**
327      * TODO Add method documentation
328      */
329     private Queue<ReadFuture> getReadyReadFutures() {
330         Queue<ReadFuture> readyReadFutures =
331             (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
332         if (readyReadFutures == null) {
333             readyReadFutures = new CircularQueue<ReadFuture>();
334 
335             Queue<ReadFuture> oldReadyReadFutures =
336                 (Queue<ReadFuture>) setAttributeIfAbsent(
337                         READY_READ_FUTURES_KEY, readyReadFutures);
338             if (oldReadyReadFutures != null) {
339                 readyReadFutures = oldReadyReadFutures;
340             }
341         }
342         return readyReadFutures;
343     }
344 
345     /**
346      * TODO Add method documentation
347      */
348     private Queue<ReadFuture> getWaitingReadFutures() {
349         Queue<ReadFuture> waitingReadyReadFutures =
350             (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
351         if (waitingReadyReadFutures == null) {
352             waitingReadyReadFutures = new CircularQueue<ReadFuture>();
353 
354             Queue<ReadFuture> oldWaitingReadyReadFutures =
355                 (Queue<ReadFuture>) setAttributeIfAbsent(
356                         WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
357             if (oldWaitingReadyReadFutures != null) {
358                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
359             }
360         }
361         return waitingReadyReadFutures;
362     }
363 
364     /**
365      * {@inheritDoc}
366      */
367     public WriteFuture write(Object message) {
368         return write(message, null);
369     }
370 
371     /**
372      * {@inheritDoc}
373      */
374     public WriteFuture write(Object message, SocketAddress remoteAddress) {
375         if (message == null) {
376             throw new NullPointerException("message");
377         }
378 
379         // We can't send a message to a connected session if we don't have 
380         // the remote address
381         if (!getTransportMetadata().isConnectionless() &&
382                 remoteAddress != null) {
383             throw new UnsupportedOperationException();
384         }
385 
386         
387         // If the session has been closed or is closing, we can't either
388         // send a message to the remote side. We generate a future
389         // containing an exception.
390         if (isClosing() || !isConnected()) {
391             WriteFuture future = new DefaultWriteFuture(this);
392             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
393             WriteException writeException = new WriteToClosedSessionException(request);
394             future.setException(writeException);
395             return future;
396         }
397 
398         FileChannel openedFileChannel = null;
399         
400         // TODO: remove this code as soon as we use InputStream
401         // instead of Object for the message.
402         try {
403             if (message instanceof IoBuffer
404                     && !((IoBuffer) message).hasRemaining()) {
405                 // Nothing to write : probably an error in the user code
406                 throw new IllegalArgumentException(
407                 "message is empty. Forgot to call flip()?");
408             } else if (message instanceof FileChannel) {
409                 FileChannel fileChannel = (FileChannel) message;
410                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
411             } else if (message instanceof File) {
412                 File file = (File) message;
413                 openedFileChannel = new FileInputStream(file).getChannel();
414                 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
415             }
416         } catch (IOException e) {
417             ExceptionMonitor.getInstance().exceptionCaught(e);
418             return DefaultWriteFuture.newNotWrittenFuture(this, e);
419         }
420 
421         // Now, we can write the message. First, create a future
422         WriteFuture writeFuture = new DefaultWriteFuture(this);
423         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
424         
425         // Then, get the chain and inject the WriteRequest into it
426         IoFilterChain filterChain = getFilterChain();
427         filterChain.fireFilterWrite(writeRequest);
428 
429         // TODO : This is not our business ! The caller has created a FileChannel,
430         // he has to close it !
431         if (openedFileChannel != null) {
432             // If we opened a FileChannel, it needs to be closed when the write has completed
433             final FileChannel finalChannel = openedFileChannel;
434             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
435                 public void operationComplete(WriteFuture future) {
436                     try {
437                         finalChannel.close();
438                     } catch (IOException e) {
439                         ExceptionMonitor.getInstance().exceptionCaught(e);
440                     }
441                 }
442             });
443         }
444 
445         // Return the WriteFuture.
446         return writeFuture;
447     }
448 
449     /**
450      * {@inheritDoc}
451      */
452     public final Object getAttachment() {
453         return getAttribute("");
454     }
455 
456     /**
457      * {@inheritDoc}
458      */
459     public final Object setAttachment(Object attachment) {
460         return setAttribute("", attachment);
461     }
462 
463     /**
464      * {@inheritDoc}
465      */
466     public final Object getAttribute(Object key) {
467         return getAttribute(key, null);
468     }
469 
470     /**
471      * {@inheritDoc}
472      */
473     public final Object getAttribute(Object key, Object defaultValue) {
474         return attributes.getAttribute(this, key, defaultValue);
475     }
476 
477     /**
478      * {@inheritDoc}
479      */
480     public final Object setAttribute(Object key, Object value) {
481         return attributes.setAttribute(this, key, value);
482     }
483 
484     /**
485      * {@inheritDoc}
486      */
487     public final Object setAttribute(Object key) {
488         return setAttribute(key, Boolean.TRUE);
489     }
490 
491     /**
492      * {@inheritDoc}
493      */
494     public final Object setAttributeIfAbsent(Object key, Object value) {
495         return attributes.setAttributeIfAbsent(this, key, value);
496     }
497 
498     /**
499      * {@inheritDoc}
500      */
501     public final Object setAttributeIfAbsent(Object key) {
502         return setAttributeIfAbsent(key, Boolean.TRUE);
503     }
504 
505     /**
506      * {@inheritDoc}
507      */
508     public final Object removeAttribute(Object key) {
509         return attributes.removeAttribute(this, key);
510     }
511 
512     /**
513      * {@inheritDoc}
514      */
515     public final boolean removeAttribute(Object key, Object value) {
516         return attributes.removeAttribute(this, key, value);
517     }
518 
519     /**
520      * {@inheritDoc}
521      */
522     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
523         return attributes.replaceAttribute(this, key, oldValue, newValue);
524     }
525 
526     /**
527      * {@inheritDoc}
528      */
529     public final boolean containsAttribute(Object key) {
530         return attributes.containsAttribute(this, key);
531     }
532 
533     /**
534      * {@inheritDoc}
535      */
536     public final Set<Object> getAttributeKeys() {
537         return attributes.getAttributeKeys(this);
538     }
539 
540     /**
541      * TODO Add method documentation
542      */
543     public final IoSessionAttributeMap getAttributeMap() {
544         return attributes;
545     }
546 
547     /**
548      * TODO Add method documentation
549      */
550     public final void setAttributeMap(IoSessionAttributeMap attributes) {
551         this.attributes = attributes;
552     }
553 
554     /**
555      * Create a new close aware write queue, based on the given write queue.
556      * 
557      * @param writeRequestQueue The write request queue
558      */
559     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
560         this.writeRequestQueue =
561             new CloseAwareWriteQueue(writeRequestQueue);
562     }
563 
564 
565     /**
566      * {@inheritDoc}
567      */
568     public final void suspendRead() {
569         readSuspended = true;
570         if (isClosing() || !isConnected()) {
571             return;
572         }
573         getProcessor().updateTrafficControl(this);
574     }
575 
576     /**
577      * {@inheritDoc}
578      */
579     public final void suspendWrite() {
580         writeSuspended = true;
581         if (isClosing() || !isConnected()) {
582             return;
583         }
584         getProcessor().updateTrafficControl(this);
585     }
586 
587     /**
588      * {@inheritDoc}
589      */
590     @SuppressWarnings("unchecked")
591     public final void resumeRead() {
592         readSuspended = false;
593         if (isClosing() || !isConnected()) {
594             return;
595         }
596         getProcessor().updateTrafficControl(this);
597     }
598 
599     /**
600      * {@inheritDoc}
601      */
602     @SuppressWarnings("unchecked")
603     public final void resumeWrite() {
604         writeSuspended = false;
605         if (isClosing() || !isConnected()) {
606             return;
607         }
608         getProcessor().updateTrafficControl(this);
609     }
610 
611     /**
612      * {@inheritDoc}
613      */
614     public boolean isReadSuspended() {
615         return readSuspended;
616     }
617 
618     /**
619      * {@inheritDoc}
620      */
621     public boolean isWriteSuspended() {
622         return writeSuspended; 
623     }
624     
625     /**
626      * {@inheritDoc}
627      */
628     public final long getReadBytes() {
629         return readBytes;
630     }
631 
632     /**
633      * {@inheritDoc}
634      */
635     public final long getWrittenBytes() {
636         return writtenBytes;
637     }
638 
639     /**
640      * {@inheritDoc}
641      */
642     public final long getReadMessages() {
643         return readMessages;
644     }
645 
646     /**
647      * {@inheritDoc}
648      */
649     public final long getWrittenMessages() {
650         return writtenMessages;
651     }
652 
653     /**
654      * {@inheritDoc}
655      */
656     public final double getReadBytesThroughput() {
657         return readBytesThroughput;
658     }
659 
660     /**
661      * {@inheritDoc}
662      */
663     public final double getWrittenBytesThroughput() {
664         return writtenBytesThroughput;
665     }
666 
667     /**
668      * {@inheritDoc}
669      */
670     public final double getReadMessagesThroughput() {
671         return readMessagesThroughput;
672     }
673 
674     /**
675      * {@inheritDoc}
676      */
677     public final double getWrittenMessagesThroughput() {
678         return writtenMessagesThroughput;
679     }
680 
681     /**
682      * {@inheritDoc}
683      */
684     public final void updateThroughput(long currentTime, boolean force) {
685         int interval = (int) (currentTime - lastThroughputCalculationTime);
686 
687         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
688         if (minInterval == 0 || interval < minInterval) {
689             if (!force) {
690                 return;
691             }
692         }
693 
694         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
695         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
696         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
697         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
698 
699         lastReadBytes = readBytes;
700         lastWrittenBytes = writtenBytes;
701         lastReadMessages = readMessages;
702         lastWrittenMessages = writtenMessages;
703 
704         lastThroughputCalculationTime = currentTime;
705     }
706 
707     /**
708      * {@inheritDoc}
709      */
710     public final long getScheduledWriteBytes() {
711         return scheduledWriteBytes.get();
712     }
713 
714     /**
715      * {@inheritDoc}
716      */
717     public final int getScheduledWriteMessages() {
718         return scheduledWriteMessages.get();
719     }
720 
721     /**
722      * TODO Add method documentation
723      */
724     protected void setScheduledWriteBytes(int byteCount){
725         scheduledWriteBytes.set(byteCount);
726     }
727 
728     /**
729      * TODO Add method documentation
730      */
731     protected void setScheduledWriteMessages(int messages) {
732         scheduledWriteMessages.set(messages);
733     }
734 
735     /**
736      * TODO Add method documentation
737      */
738     public final void increaseReadBytes(long increment, long currentTime) {
739         if (increment <= 0) {
740             return;
741         }
742 
743         readBytes += increment;
744         lastReadTime = currentTime;
745         idleCountForBoth = 0;
746         idleCountForRead = 0;
747 
748         if (getService() instanceof AbstractIoService) {
749             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
750         }
751     }
752 
753     /**
754      * TODO Add method documentation
755      */
756     public final void increaseReadMessages(long currentTime) {
757         readMessages++;
758         lastReadTime = currentTime;
759         idleCountForBoth = 0;
760         idleCountForRead = 0;
761 
762         if (getService() instanceof AbstractIoService) {
763             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
764         }
765     }
766 
767     /**
768      * TODO Add method documentation
769      */
770     public final void increaseWrittenBytes(int increment, long currentTime) {
771         if (increment <= 0) {
772             return;
773         }
774 
775         writtenBytes += increment;
776         lastWriteTime = currentTime;
777         idleCountForBoth = 0;
778         idleCountForWrite = 0;
779 
780         if (getService() instanceof AbstractIoService) {
781             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
782         }
783 
784         increaseScheduledWriteBytes(-increment);
785     }
786 
787     /**
788      * TODO Add method documentation
789      */
790     public final void increaseWrittenMessages(
791             WriteRequest request, long currentTime) {
792         Object message = request.getMessage();
793         if (message instanceof IoBuffer) {
794             IoBuffer b = (IoBuffer) message;
795             if (b.hasRemaining()) {
796                 return;
797             }
798         }
799 
800         writtenMessages++;
801         lastWriteTime = currentTime;
802         if (getService() instanceof AbstractIoService) {
803             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
804         }
805 
806         decreaseScheduledWriteMessages();
807     }
808 
809     /**
810      * TODO Add method documentation
811      */
812     public final void increaseScheduledWriteBytes(int increment) {
813         scheduledWriteBytes.addAndGet(increment);
814         if (getService() instanceof AbstractIoService) {
815             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
816         }
817     }
818 
819     /**
820      * TODO Add method documentation
821      */
822     public final void increaseScheduledWriteMessages() {
823         scheduledWriteMessages.incrementAndGet();
824         if (getService() instanceof AbstractIoService) {
825             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
826         }
827     }
828 
829     /**
830      * TODO Add method documentation
831      */
832     private void decreaseScheduledWriteMessages() {
833         scheduledWriteMessages.decrementAndGet();
834         if (getService() instanceof AbstractIoService) {
835             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
836         }
837     }
838 
839     /**
840      * TODO Add method documentation
841      */
842     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
843         Object message = request.getMessage();
844         if (message instanceof IoBuffer) {
845             IoBuffer b = (IoBuffer) message;
846             if (b.hasRemaining()) {
847                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
848             } else {
849                 decreaseScheduledWriteMessages();
850             }
851         } else {
852             decreaseScheduledWriteMessages();
853         }
854     }
855 
856     /**
857      * {@inheritDoc}
858      */
859     public final WriteRequestQueue getWriteRequestQueue() {
860         if (writeRequestQueue == null) {
861             throw new IllegalStateException();
862         }
863         return writeRequestQueue;
864     }
865 
866     /**
867      * {@inheritDoc}
868      */
869     public final WriteRequest getCurrentWriteRequest() {
870         return currentWriteRequest;
871     }
872 
873     /**
874      * {@inheritDoc}
875      */
876     public final Object getCurrentWriteMessage() {
877         WriteRequest req = getCurrentWriteRequest();
878         if (req == null) {
879             return null;
880         }
881         return req.getMessage();
882     }
883 
884     /**
885      * {@inheritDoc}
886      */
887     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
888         this.currentWriteRequest = currentWriteRequest;
889     }
890 
891     /**
892      * TODO Add method documentation
893      */
894     public final void increaseReadBufferSize() {
895         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
896         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
897             getConfig().setReadBufferSize(newReadBufferSize);
898         } else {
899             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
900         }
901 
902         deferDecreaseReadBuffer = true;
903     }
904 
905     /**
906      * TODO Add method documentation
907      */
908     public final void decreaseReadBufferSize() {
909         if (deferDecreaseReadBuffer) {
910             deferDecreaseReadBuffer = false;
911             return;
912         }
913 
914         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
915             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
916         }
917 
918         deferDecreaseReadBuffer = true;
919     }
920 
921     /**
922      * {@inheritDoc}
923      */
924     public final long getCreationTime() {
925         return creationTime;
926     }
927 
928     /**
929      * {@inheritDoc}
930      */
931     public final long getLastIoTime() {
932         return Math.max(lastReadTime, lastWriteTime);
933     }
934 
935     /**
936      * {@inheritDoc}
937      */
938     public final long getLastReadTime() {
939         return lastReadTime;
940     }
941 
942     /**
943      * {@inheritDoc}
944      */
945     public final long getLastWriteTime() {
946         return lastWriteTime;
947     }
948 
949     /**
950      * {@inheritDoc}
951      */
952     public final boolean isIdle(IdleStatus status) {
953         if (status == IdleStatus.BOTH_IDLE) {
954             return idleCountForBoth > 0;
955         }
956 
957         if (status == IdleStatus.READER_IDLE) {
958             return idleCountForRead > 0;
959         }
960 
961         if (status == IdleStatus.WRITER_IDLE) {
962             return idleCountForWrite > 0;
963         }
964 
965         throw new IllegalArgumentException("Unknown idle status: " + status);
966     }
967 
968     /**
969      * {@inheritDoc}
970      */
971     public final boolean isBothIdle() {
972         return isIdle(IdleStatus.BOTH_IDLE);
973     }
974 
975     /**
976      * {@inheritDoc}
977      */
978     public final boolean isReaderIdle() {
979         return isIdle(IdleStatus.READER_IDLE);
980     }
981 
982     /**
983      * {@inheritDoc}
984      */
985     public final boolean isWriterIdle() {
986         return isIdle(IdleStatus.WRITER_IDLE);
987     }
988 
989     /**
990      * {@inheritDoc}
991      */
992     public final int getIdleCount(IdleStatus status) {
993         if (getConfig().getIdleTime(status) == 0) {
994             if (status == IdleStatus.BOTH_IDLE) {
995                 idleCountForBoth = 0;
996             }
997 
998             if (status == IdleStatus.READER_IDLE) {
999                 idleCountForRead = 0;
1000             }
1001 
1002             if (status == IdleStatus.WRITER_IDLE) {
1003                 idleCountForWrite = 0;
1004             }
1005         }
1006 
1007         if (status == IdleStatus.BOTH_IDLE) {
1008             return idleCountForBoth;
1009         }
1010 
1011         if (status == IdleStatus.READER_IDLE) {
1012             return idleCountForRead;
1013         }
1014 
1015         if (status == IdleStatus.WRITER_IDLE) {
1016             return idleCountForWrite;
1017         }
1018 
1019         throw new IllegalArgumentException("Unknown idle status: " + status);
1020     }
1021 
1022     /**
1023      * {@inheritDoc}
1024      */
1025     public final long getLastIdleTime(IdleStatus status) {
1026         if (status == IdleStatus.BOTH_IDLE) {
1027             return lastIdleTimeForBoth;
1028         }
1029 
1030         if (status == IdleStatus.READER_IDLE) {
1031             return lastIdleTimeForRead;
1032         }
1033 
1034         if (status == IdleStatus.WRITER_IDLE) {
1035             return lastIdleTimeForWrite;
1036         }
1037 
1038         throw new IllegalArgumentException("Unknown idle status: " + status);
1039     }
1040 
1041     /**
1042      * TODO Add method documentation
1043      */
1044     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1045         if (status == IdleStatus.BOTH_IDLE) {
1046             idleCountForBoth++;
1047             lastIdleTimeForBoth = currentTime;
1048         } else if (status == IdleStatus.READER_IDLE) {
1049             idleCountForRead++;
1050             lastIdleTimeForRead = currentTime;
1051         } else if (status == IdleStatus.WRITER_IDLE) {
1052             idleCountForWrite++;
1053             lastIdleTimeForWrite = currentTime;
1054         } else {
1055             throw new IllegalArgumentException("Unknown idle status: " + status);
1056         }
1057     }
1058 
1059     /**
1060      * {@inheritDoc}
1061      */
1062     public final int getBothIdleCount() {
1063         return getIdleCount(IdleStatus.BOTH_IDLE);
1064     }
1065 
1066     /**
1067      * {@inheritDoc}
1068      */
1069     public final long getLastBothIdleTime() {
1070         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1071     }
1072 
1073     /**
1074      * {@inheritDoc}
1075      */
1076     public final long getLastReaderIdleTime() {
1077         return getLastIdleTime(IdleStatus.READER_IDLE);
1078     }
1079 
1080     /**
1081      * {@inheritDoc}
1082      */
1083     public final long getLastWriterIdleTime() {
1084         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1085     }
1086 
1087     /**
1088      * {@inheritDoc}
1089      */
1090     public final int getReaderIdleCount() {
1091         return getIdleCount(IdleStatus.READER_IDLE);
1092     }
1093 
1094     /**
1095      * {@inheritDoc}
1096      */
1097     public final int getWriterIdleCount() {
1098         return getIdleCount(IdleStatus.WRITER_IDLE);
1099     }
1100 
1101     /**
1102      * {@inheritDoc}
1103      */
1104     public SocketAddress getServiceAddress() {
1105         IoService service = getService();
1106         if (service instanceof IoAcceptor) {
1107             return ((IoAcceptor) service).getLocalAddress();
1108         }
1109         
1110         return getRemoteAddress();
1111     }
1112 
1113     /**
1114      * {@inheritDoc}
1115      */
1116     @Override
1117     public final int hashCode() {
1118         return super.hashCode();
1119     }
1120 
1121     /**
1122      * {@inheritDoc}
1123      * TODO This is a ridiculous implementation. Need to be replaced.
1124      */
1125     @Override
1126     public final boolean equals(Object o) {
1127         return super.equals(o);
1128     }
1129 
1130     /**
1131      * {@inheritDoc}
1132      */
1133     @Override
1134     public String toString() {
1135         if (isConnected()||isClosing()) {
1136             if (getService() instanceof IoAcceptor) {
1137                 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1138                         getRemoteAddress() + " => " + getLocalAddress() + ')';
1139             }
1140 
1141             return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1142                         getLocalAddress() + " => " + getRemoteAddress() + ')';
1143         }
1144         
1145         return "Session disconnected ...";
1146     }
1147 
1148     /**
1149      * TODO Add method documentation
1150      */
1151     private String getIdAsString() {
1152         String id = Long.toHexString(getId()).toUpperCase();
1153 
1154         // Somewhat inefficient, but it won't happen that often
1155         // because an ID is often a big integer.
1156         while (id.length() < 8) {
1157             id = '0' + id; // padding
1158         }
1159         id = "0x" + id;
1160 
1161         return id;
1162     }
1163 
1164     /**
1165      * TODO Add method documentation
1166      */
1167     private String getServiceName() {
1168         TransportMetadata tm = getTransportMetadata();
1169         if (tm == null) {
1170             return "null";
1171         }
1172         
1173         return tm.getProviderName() + ' ' + tm.getName();
1174     }
1175 
1176     /**
1177      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable
1178      * sessions in the specified collection.
1179      *
1180      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1181      */
1182     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1183         IoSession s = null;
1184         while (sessions.hasNext()) {
1185             s = sessions.next();
1186             notifyIdleSession(s, currentTime);
1187         }
1188     }
1189 
1190     /**
1191      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1192      * specified {@code session}.
1193      *
1194      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1195      */
1196     public static void notifyIdleSession(IoSession session, long currentTime) {
1197         notifyIdleSession0(
1198                 session, currentTime,
1199                 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1200                 IdleStatus.BOTH_IDLE, Math.max(
1201                     session.getLastIoTime(),
1202                     session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1203 
1204         notifyIdleSession0(
1205                 session, currentTime,
1206                 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1207                 IdleStatus.READER_IDLE, Math.max(
1208                     session.getLastReadTime(),
1209                     session.getLastIdleTime(IdleStatus.READER_IDLE)));
1210 
1211         notifyIdleSession0(
1212                 session, currentTime,
1213                 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1214                 IdleStatus.WRITER_IDLE, Math.max(
1215                     session.getLastWriteTime(),
1216                     session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1217 
1218         notifyWriteTimeout(session, currentTime);
1219     }
1220 
1221     private static void notifyIdleSession0(
1222             IoSession session, long currentTime,
1223             long idleTime, IdleStatus status, long lastIoTime) {
1224         if (idleTime > 0 && lastIoTime != 0
1225                 && currentTime - lastIoTime >= idleTime) {
1226             session.getFilterChain().fireSessionIdle(status);
1227         }
1228     }
1229 
1230     private static void notifyWriteTimeout(
1231             IoSession session, long currentTime) {
1232 
1233         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1234         if (writeTimeout > 0 &&
1235                 currentTime - session.getLastWriteTime() >= writeTimeout &&
1236                 !session.getWriteRequestQueue().isEmpty(session)) {
1237             WriteRequest request = session.getCurrentWriteRequest();
1238             if (request != null) {
1239                 session.setCurrentWriteRequest(null);
1240                 WriteTimeoutException cause = new WriteTimeoutException(request);
1241                 request.getFuture().setException(cause);
1242                 session.getFilterChain().fireExceptionCaught(cause);
1243                 // WriteException is an IOException, so we close the session.
1244                 session.close(true);
1245             }
1246         }
1247     }
1248 
1249     
1250     
1251     /**
1252      * A queue which handles the CLOSE request.
1253      * 
1254      * TODO : Check that when closing a session, all the pending
1255      * requests are correctly sent.
1256      */
1257     private class CloseAwareWriteQueue implements WriteRequestQueue {
1258 
1259         private final WriteRequestQueue q;
1260 
1261         /**
1262          * {@inheritDoc}
1263          */
1264         public CloseAwareWriteQueue(WriteRequestQueue q) {
1265             this.q = q;
1266         }
1267 
1268         /**
1269          * {@inheritDoc}
1270          */
1271         public synchronized WriteRequest poll(IoSession session) {
1272             WriteRequest answer = q.poll(session);
1273             
1274             if (answer == CLOSE_REQUEST) {
1275                 AbstractIoSession.this.close();
1276                 dispose(session);
1277                 answer = null;
1278             }
1279             
1280             return answer;
1281         }
1282 
1283         /**
1284          * {@inheritDoc}
1285          */
1286         public void offer(IoSession session, WriteRequest e) {
1287             q.offer(session, e);
1288         }
1289 
1290         /**
1291          * {@inheritDoc}
1292          */
1293         public boolean isEmpty(IoSession session) {
1294             return q.isEmpty(session);
1295         }
1296 
1297         /**
1298          * {@inheritDoc}
1299          */
1300         public void clear(IoSession session) {
1301             q.clear(session);
1302         }
1303 
1304         /**
1305          * {@inheritDoc}
1306          */
1307         public void dispose(IoSession session) {
1308             q.dispose(session);
1309         }
1310     }
1311 }