View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.DefaultFileRegion;
37  import org.apache.mina.core.file.FilenameFileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.future.CloseFuture;
40  import org.apache.mina.core.future.DefaultCloseFuture;
41  import org.apache.mina.core.future.DefaultReadFuture;
42  import org.apache.mina.core.future.DefaultWriteFuture;
43  import org.apache.mina.core.future.IoFutureListener;
44  import org.apache.mina.core.future.ReadFuture;
45  import org.apache.mina.core.future.WriteFuture;
46  import org.apache.mina.core.service.AbstractIoService;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoProcessor;
49  import org.apache.mina.core.service.IoService;
50  import org.apache.mina.core.service.TransportMetadata;
51  import org.apache.mina.core.write.DefaultWriteRequest;
52  import org.apache.mina.core.write.WriteException;
53  import org.apache.mina.core.write.WriteRequest;
54  import org.apache.mina.core.write.WriteRequestQueue;
55  import org.apache.mina.core.write.WriteTimeoutException;
56  import org.apache.mina.core.write.WriteToClosedSessionException;
57  import org.apache.mina.util.ExceptionMonitor;
58  
59  
60  /**
61   * Base implementation of {@link IoSession}.
62   *
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractIoSession implements IoSession {
66  
67      private static final AttributeKey READY_READ_FUTURES_KEY =
68          new AttributeKey(AbstractIoSession.class, "readyReadFutures");
69      
70      private static final AttributeKey WAITING_READ_FUTURES_KEY =
71          new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
72  
73      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
74          new IoFutureListener<CloseFuture>() {
75              public void operationComplete(CloseFuture future) {
76                  AbstractIoSession session = (AbstractIoSession) future.getSession();
77                  session.scheduledWriteBytes.set(0);
78                  session.scheduledWriteMessages.set(0);
79                  session.readBytesThroughput = 0;
80                  session.readMessagesThroughput = 0;
81                  session.writtenBytesThroughput = 0;
82                  session.writtenMessagesThroughput = 0;
83              }
84      };
85  
86      /**
87       * An internal write request object that triggers session close.
88       * @see #writeRequestQueue
89       */
90      private static final WriteRequest CLOSE_REQUEST =
91          new DefaultWriteRequest(new Object());
92  
93      private final Object lock = new Object();
94  
95      private IoSessionAttributeMap attributes;
96      private WriteRequestQueue writeRequestQueue;
97      private WriteRequest currentWriteRequest;
98      
99      // The Session creation's time */
100     private final long creationTime;
101 
102     /** An id generator guaranteed to generate unique IDs for the session */
103     private static AtomicLong idGenerator = new AtomicLong(0);
104     
105     /** The session ID */
106     private long sessionId;
107     
108     /**
109      * A future that will be set 'closed' when the connection is closed.
110      */
111     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
112 
113     private volatile boolean closing;
114     
115     // traffic control
116     private boolean readSuspended=false;
117     private boolean writeSuspended=false;
118 
119     // Status variables
120     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
121     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
122     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
123 
124     private long readBytes;
125     private long writtenBytes;
126     private long readMessages;
127     private long writtenMessages;
128     private long lastReadTime;
129     private long lastWriteTime;
130 
131     private long lastThroughputCalculationTime;
132     private long lastReadBytes;
133     private long lastWrittenBytes;
134     private long lastReadMessages;
135     private long lastWrittenMessages;
136     private double readBytesThroughput;
137     private double writtenBytesThroughput;
138     private double readMessagesThroughput;
139     private double writtenMessagesThroughput;
140 
141     private AtomicInteger idleCountForBoth = new AtomicInteger();
142     private AtomicInteger idleCountForRead = new AtomicInteger();
143     private AtomicInteger idleCountForWrite = new AtomicInteger();
144 
145     private long lastIdleTimeForBoth;
146     private long lastIdleTimeForRead;
147     private long lastIdleTimeForWrite;
148 
149     private boolean deferDecreaseReadBuffer = true;
150 
151     /**
152      * TODO Add method documentation
153      */
154     protected AbstractIoSession() {
155         // Initialize all the Session counters to the current time 
156         long currentTime = System.currentTimeMillis();
157         creationTime = currentTime;
158         lastThroughputCalculationTime = currentTime;
159         lastReadTime = currentTime;
160         lastWriteTime = currentTime;
161         lastIdleTimeForBoth = currentTime;
162         lastIdleTimeForRead = currentTime;
163         lastIdleTimeForWrite = currentTime;
164         
165         // TODO add documentation
166         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
167         
168         // Set a new ID for this session
169         sessionId = idGenerator.incrementAndGet();
170     }
171 
172     /**
173      * {@inheritDoc}
174      * 
175      * We use an AtomicLong to guarantee that the session ID are
176      * unique.
177      */
178     public final long getId() {
179         return sessionId;
180     }
181 
182     /**
183      * @return The associated IoProcessor for this session
184      */
185     public abstract IoProcessor getProcessor();
186 
187     /**
188      * {@inheritDoc}
189      */
190     public final boolean isConnected() {
191         return !closeFuture.isClosed();
192     }
193 
194     /**
195      * {@inheritDoc}
196      */
197     public final boolean isClosing() {
198         return closing || closeFuture.isClosed();
199     }
200 
201     /**
202      * {@inheritDoc}
203      */
204     public final CloseFuture getCloseFuture() {
205         return closeFuture;
206     }
207 
208     /**
209      * Tells if the session is scheduled for flushed
210      * @param true if the session is scheduled for flush
211      */
212     public final boolean isScheduledForFlush() {
213         return scheduledForFlush.get();
214     }
215 
216     /**
217      * Schedule the session for flushed
218      */
219     public final void scheduledForFlush() {
220         scheduledForFlush.set(true);
221     }
222 
223     /**
224      * Change the session's status : it's not anymore scheduled for flush
225      */
226     public final void unscheduledForFlush() {
227         scheduledForFlush.set(false);
228     }
229 
230     /**
231      * Set the scheduledForFLush flag. As we may have concurrent access
232      * to this flag, we compare and set it in one call.
233      * @param schedule the new value to set if not already set.
234      * @return true if the session flag has been set, and if 
235      * it wasn't set already.
236      */
237     public final boolean setScheduledForFlush(boolean schedule) {
238         if (schedule) {
239             // If the current tag is set to false, switch it to true,
240             // otherwise, we do nothing but return false : the session
241             // is already scheduled for flush
242             return scheduledForFlush.compareAndSet(false, schedule);
243         }
244         
245         scheduledForFlush.set(schedule);
246         return true;
247     }
248 
249     /**
250      * {@inheritDoc}
251      */
252     public final CloseFuture close(boolean rightNow) {
253         if (rightNow) {
254             return close();
255         }
256         
257         return closeOnFlush();
258     }
259 
260     /**
261      * {@inheritDoc}
262      */
263     public final CloseFuture close() {
264         synchronized (lock) {
265             if (isClosing()) {
266                 return closeFuture;
267             }
268             
269             closing = true;
270         }
271 
272         getFilterChain().fireFilterClose();
273         return closeFuture;
274     }
275 
276     private final CloseFuture closeOnFlush() {
277         getWriteRequestQueue().offer(this, CLOSE_REQUEST);
278         getProcessor().flush(this);
279         return closeFuture;
280     }
281 
282     /**
283      * {@inheritDoc}
284      */
285     public final ReadFuture read() {
286         if (!getConfig().isUseReadOperation()) {
287             throw new IllegalStateException("useReadOperation is not enabled.");
288         }
289 
290         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
291         ReadFuture future;
292         synchronized (readyReadFutures) {
293             future = readyReadFutures.poll();
294             if (future != null) {
295                 if (future.isClosed()) {
296                     // Let other readers get notified.
297                     readyReadFutures.offer(future);
298                 }
299             } else {
300                 future = new DefaultReadFuture(this);
301                 getWaitingReadFutures().offer(future);
302             }
303         }
304 
305         return future;
306     }
307 
308     /**
309      * TODO Add method documentation
310      */
311     public final void offerReadFuture(Object message) {
312         newReadFuture().setRead(message);
313     }
314 
315     /**
316      * TODO Add method documentation
317      */
318     public final void offerFailedReadFuture(Throwable exception) {
319         newReadFuture().setException(exception);
320     }
321 
322     /**
323      * TODO Add method documentation
324      */
325     public final void offerClosedReadFuture() {
326         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
327         synchronized (readyReadFutures) {
328             newReadFuture().setClosed();
329         }
330     }
331 
332     /**
333      * TODO Add method documentation
334      */
335     private ReadFuture newReadFuture() {
336         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
337         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
338         ReadFuture future;
339         synchronized (readyReadFutures) {
340             future = waitingReadFutures.poll();
341             if (future == null) {
342                 future = new DefaultReadFuture(this);
343                 readyReadFutures.offer(future);
344             }
345         }
346         return future;
347     }
348 
349     /**
350      * TODO Add method documentation
351      */
352     private Queue<ReadFuture> getReadyReadFutures() {
353         Queue<ReadFuture> readyReadFutures =
354             (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
355         if (readyReadFutures == null) {
356             readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
357 
358             Queue<ReadFuture> oldReadyReadFutures =
359                 (Queue<ReadFuture>) setAttributeIfAbsent(
360                         READY_READ_FUTURES_KEY, readyReadFutures);
361             if (oldReadyReadFutures != null) {
362                 readyReadFutures = oldReadyReadFutures;
363             }
364         }
365         return readyReadFutures;
366     }
367 
368     /**
369      * TODO Add method documentation
370      */
371     private Queue<ReadFuture> getWaitingReadFutures() {
372         Queue<ReadFuture> waitingReadyReadFutures =
373             (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
374         if (waitingReadyReadFutures == null) {
375             waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
376 
377             Queue<ReadFuture> oldWaitingReadyReadFutures =
378                 (Queue<ReadFuture>) setAttributeIfAbsent(
379                         WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
380             if (oldWaitingReadyReadFutures != null) {
381                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
382             }
383         }
384         return waitingReadyReadFutures;
385     }
386 
387     /**
388      * {@inheritDoc}
389      */
390     public WriteFuture write(Object message) {
391         return write(message, null);
392     }
393 
394     /**
395      * {@inheritDoc}
396      */
397     public WriteFuture write(Object message, SocketAddress remoteAddress) {
398         if (message == null) {
399             throw new IllegalArgumentException("message");
400         }
401 
402         // We can't send a message to a connected session if we don't have 
403         // the remote address
404         if (!getTransportMetadata().isConnectionless() &&
405                 remoteAddress != null) {
406             throw new UnsupportedOperationException();
407         }
408 
409         
410         // If the session has been closed or is closing, we can't either
411         // send a message to the remote side. We generate a future
412         // containing an exception.
413         if (isClosing() || !isConnected()) {
414             WriteFuture future = new DefaultWriteFuture(this);
415             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
416             WriteException writeException = new WriteToClosedSessionException(request);
417             future.setException(writeException);
418             return future;
419         }
420 
421         FileChannel openedFileChannel = null;
422         
423         // TODO: remove this code as soon as we use InputStream
424         // instead of Object for the message.
425         try {
426             if (message instanceof IoBuffer
427                     && !((IoBuffer) message).hasRemaining()) {
428                 // Nothing to write : probably an error in the user code
429                 throw new IllegalArgumentException(
430                 "message is empty. Forgot to call flip()?");
431             } else if (message instanceof FileChannel) {
432                 FileChannel fileChannel = (FileChannel) message;
433                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
434             } else if (message instanceof File) {
435                 File file = (File) message;
436                 openedFileChannel = new FileInputStream(file).getChannel();
437                 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
438             }
439         } catch (IOException e) {
440             ExceptionMonitor.getInstance().exceptionCaught(e);
441             return DefaultWriteFuture.newNotWrittenFuture(this, e);
442         }
443 
444         // Now, we can write the message. First, create a future
445         WriteFuture writeFuture = new DefaultWriteFuture(this);
446         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
447         
448         // Then, get the chain and inject the WriteRequest into it
449         IoFilterChain filterChain = getFilterChain();
450         filterChain.fireFilterWrite(writeRequest);
451 
452         // TODO : This is not our business ! The caller has created a FileChannel,
453         // he has to close it !
454         if (openedFileChannel != null) {
455             // If we opened a FileChannel, it needs to be closed when the write has completed
456             final FileChannel finalChannel = openedFileChannel;
457             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
458                 public void operationComplete(WriteFuture future) {
459                     try {
460                         finalChannel.close();
461                     } catch (IOException e) {
462                         ExceptionMonitor.getInstance().exceptionCaught(e);
463                     }
464                 }
465             });
466         }
467 
468         // Return the WriteFuture.
469         return writeFuture;
470     }
471 
472     /**
473      * {@inheritDoc}
474      */
475     public final Object getAttachment() {
476         return getAttribute("");
477     }
478 
479     /**
480      * {@inheritDoc}
481      */
482     public final Object setAttachment(Object attachment) {
483         return setAttribute("", attachment);
484     }
485 
486     /**
487      * {@inheritDoc}
488      */
489     public final Object getAttribute(Object key) {
490         return getAttribute(key, null);
491     }
492 
493     /**
494      * {@inheritDoc}
495      */
496     public final Object getAttribute(Object key, Object defaultValue) {
497         return attributes.getAttribute(this, key, defaultValue);
498     }
499 
500     /**
501      * {@inheritDoc}
502      */
503     public final Object setAttribute(Object key, Object value) {
504         return attributes.setAttribute(this, key, value);
505     }
506 
507     /**
508      * {@inheritDoc}
509      */
510     public final Object setAttribute(Object key) {
511         return setAttribute(key, Boolean.TRUE);
512     }
513 
514     /**
515      * {@inheritDoc}
516      */
517     public final Object setAttributeIfAbsent(Object key, Object value) {
518         return attributes.setAttributeIfAbsent(this, key, value);
519     }
520 
521     /**
522      * {@inheritDoc}
523      */
524     public final Object setAttributeIfAbsent(Object key) {
525         return setAttributeIfAbsent(key, Boolean.TRUE);
526     }
527 
528     /**
529      * {@inheritDoc}
530      */
531     public final Object removeAttribute(Object key) {
532         return attributes.removeAttribute(this, key);
533     }
534 
535     /**
536      * {@inheritDoc}
537      */
538     public final boolean removeAttribute(Object key, Object value) {
539         return attributes.removeAttribute(this, key, value);
540     }
541 
542     /**
543      * {@inheritDoc}
544      */
545     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
546         return attributes.replaceAttribute(this, key, oldValue, newValue);
547     }
548 
549     /**
550      * {@inheritDoc}
551      */
552     public final boolean containsAttribute(Object key) {
553         return attributes.containsAttribute(this, key);
554     }
555 
556     /**
557      * {@inheritDoc}
558      */
559     public final Set<Object> getAttributeKeys() {
560         return attributes.getAttributeKeys(this);
561     }
562 
563     /**
564      * TODO Add method documentation
565      */
566     public final IoSessionAttributeMap getAttributeMap() {
567         return attributes;
568     }
569 
570     /**
571      * TODO Add method documentation
572      */
573     public final void setAttributeMap(IoSessionAttributeMap attributes) {
574         this.attributes = attributes;
575     }
576 
577     /**
578      * Create a new close aware write queue, based on the given write queue.
579      * 
580      * @param writeRequestQueue The write request queue
581      */
582     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
583         this.writeRequestQueue =
584             new CloseAwareWriteQueue(writeRequestQueue);
585     }
586 
587 
588     /**
589      * {@inheritDoc}
590      */
591     public final void suspendRead() {
592         readSuspended = true;
593         if (isClosing() || !isConnected()) {
594             return;
595         }
596         getProcessor().updateTrafficControl(this);
597     }
598 
599     /**
600      * {@inheritDoc}
601      */
602     public final void suspendWrite() {
603         writeSuspended = true;
604         if (isClosing() || !isConnected()) {
605             return;
606         }
607         getProcessor().updateTrafficControl(this);
608     }
609 
610     /**
611      * {@inheritDoc}
612      */
613     @SuppressWarnings("unchecked")
614     public final void resumeRead() {
615         readSuspended = false;
616         if (isClosing() || !isConnected()) {
617             return;
618         }
619         getProcessor().updateTrafficControl(this);
620     }
621 
622     /**
623      * {@inheritDoc}
624      */
625     @SuppressWarnings("unchecked")
626     public final void resumeWrite() {
627         writeSuspended = false;
628         if (isClosing() || !isConnected()) {
629             return;
630         }
631         getProcessor().updateTrafficControl(this);
632     }
633 
634     /**
635      * {@inheritDoc}
636      */
637     public boolean isReadSuspended() {
638         return readSuspended;
639     }
640 
641     /**
642      * {@inheritDoc}
643      */
644     public boolean isWriteSuspended() {
645         return writeSuspended; 
646     }
647     
648     /**
649      * {@inheritDoc}
650      */
651     public final long getReadBytes() {
652         return readBytes;
653     }
654 
655     /**
656      * {@inheritDoc}
657      */
658     public final long getWrittenBytes() {
659         return writtenBytes;
660     }
661 
662     /**
663      * {@inheritDoc}
664      */
665     public final long getReadMessages() {
666         return readMessages;
667     }
668 
669     /**
670      * {@inheritDoc}
671      */
672     public final long getWrittenMessages() {
673         return writtenMessages;
674     }
675 
676     /**
677      * {@inheritDoc}
678      */
679     public final double getReadBytesThroughput() {
680         return readBytesThroughput;
681     }
682 
683     /**
684      * {@inheritDoc}
685      */
686     public final double getWrittenBytesThroughput() {
687         return writtenBytesThroughput;
688     }
689 
690     /**
691      * {@inheritDoc}
692      */
693     public final double getReadMessagesThroughput() {
694         return readMessagesThroughput;
695     }
696 
697     /**
698      * {@inheritDoc}
699      */
700     public final double getWrittenMessagesThroughput() {
701         return writtenMessagesThroughput;
702     }
703 
704     /**
705      * {@inheritDoc}
706      */
707     public final void updateThroughput(long currentTime, boolean force) {
708         int interval = (int) (currentTime - lastThroughputCalculationTime);
709 
710         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
711         if (minInterval == 0 || interval < minInterval) {
712             if (!force) {
713                 return;
714             }
715         }
716 
717         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
718         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
719         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
720         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
721 
722         lastReadBytes = readBytes;
723         lastWrittenBytes = writtenBytes;
724         lastReadMessages = readMessages;
725         lastWrittenMessages = writtenMessages;
726 
727         lastThroughputCalculationTime = currentTime;
728     }
729 
730     /**
731      * {@inheritDoc}
732      */
733     public final long getScheduledWriteBytes() {
734         return scheduledWriteBytes.get();
735     }
736 
737     /**
738      * {@inheritDoc}
739      */
740     public final int getScheduledWriteMessages() {
741         return scheduledWriteMessages.get();
742     }
743 
744     /**
745      * TODO Add method documentation
746      */
747     protected void setScheduledWriteBytes(int byteCount){
748         scheduledWriteBytes.set(byteCount);
749     }
750 
751     /**
752      * TODO Add method documentation
753      */
754     protected void setScheduledWriteMessages(int messages) {
755         scheduledWriteMessages.set(messages);
756     }
757 
758     /**
759      * TODO Add method documentation
760      */
761     public final void increaseReadBytes(long increment, long currentTime) {
762         if (increment <= 0) {
763             return;
764         }
765 
766         readBytes += increment;
767         lastReadTime = currentTime;
768         idleCountForBoth.set(0);
769         idleCountForRead.set(0);
770 
771         if (getService() instanceof AbstractIoService) {
772             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
773         }
774     }
775 
776     /**
777      * TODO Add method documentation
778      */
779     public final void increaseReadMessages(long currentTime) {
780         readMessages++;
781         lastReadTime = currentTime;
782         idleCountForBoth.set(0);
783         idleCountForRead.set(0);
784 
785         if (getService() instanceof AbstractIoService) {
786             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
787         }
788     }
789 
790     /**
791      * TODO Add method documentation
792      */
793     public final void increaseWrittenBytes(int increment, long currentTime) {
794         if (increment <= 0) {
795             return;
796         }
797 
798         writtenBytes += increment;
799         lastWriteTime = currentTime;
800         idleCountForBoth.set(0);
801         idleCountForWrite.set(0);
802 
803         if (getService() instanceof AbstractIoService) {
804             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
805         }
806 
807         increaseScheduledWriteBytes(-increment);
808     }
809 
810     /**
811      * TODO Add method documentation
812      */
813     public final void increaseWrittenMessages(
814             WriteRequest request, long currentTime) {
815         Object message = request.getMessage();
816         if (message instanceof IoBuffer) {
817             IoBuffer b = (IoBuffer) message;
818             if (b.hasRemaining()) {
819                 return;
820             }
821         }
822 
823         writtenMessages++;
824         lastWriteTime = currentTime;
825         if (getService() instanceof AbstractIoService) {
826             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
827         }
828 
829         decreaseScheduledWriteMessages();
830     }
831 
832     /**
833      * TODO Add method documentation
834      */
835     public final void increaseScheduledWriteBytes(int increment) {
836         scheduledWriteBytes.addAndGet(increment);
837         if (getService() instanceof AbstractIoService) {
838             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
839         }
840     }
841 
842     /**
843      * TODO Add method documentation
844      */
845     public final void increaseScheduledWriteMessages() {
846         scheduledWriteMessages.incrementAndGet();
847         if (getService() instanceof AbstractIoService) {
848             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
849         }
850     }
851 
852     /**
853      * TODO Add method documentation
854      */
855     private void decreaseScheduledWriteMessages() {
856         scheduledWriteMessages.decrementAndGet();
857         if (getService() instanceof AbstractIoService) {
858             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
859         }
860     }
861 
862     /**
863      * TODO Add method documentation
864      */
865     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
866         Object message = request.getMessage();
867         if (message instanceof IoBuffer) {
868             IoBuffer b = (IoBuffer) message;
869             if (b.hasRemaining()) {
870                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
871             } else {
872                 decreaseScheduledWriteMessages();
873             }
874         } else {
875             decreaseScheduledWriteMessages();
876         }
877     }
878 
879     /**
880      * {@inheritDoc}
881      */
882     public final WriteRequestQueue getWriteRequestQueue() {
883         if (writeRequestQueue == null) {
884             throw new IllegalStateException();
885         }
886         return writeRequestQueue;
887     }
888 
889     /**
890      * {@inheritDoc}
891      */
892     public final WriteRequest getCurrentWriteRequest() {
893         return currentWriteRequest;
894     }
895 
896     /**
897      * {@inheritDoc}
898      */
899     public final Object getCurrentWriteMessage() {
900         WriteRequest req = getCurrentWriteRequest();
901         if (req == null) {
902             return null;
903         }
904         return req.getMessage();
905     }
906 
907     /**
908      * {@inheritDoc}
909      */
910     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
911         this.currentWriteRequest = currentWriteRequest;
912     }
913 
914     /**
915      * TODO Add method documentation
916      */
917     public final void increaseReadBufferSize() {
918         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
919         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
920             getConfig().setReadBufferSize(newReadBufferSize);
921         } else {
922             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
923         }
924 
925         deferDecreaseReadBuffer = true;
926     }
927 
928     /**
929      * TODO Add method documentation
930      */
931     public final void decreaseReadBufferSize() {
932         if (deferDecreaseReadBuffer) {
933             deferDecreaseReadBuffer = false;
934             return;
935         }
936 
937         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
938             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
939         }
940 
941         deferDecreaseReadBuffer = true;
942     }
943 
944     /**
945      * {@inheritDoc}
946      */
947     public final long getCreationTime() {
948         return creationTime;
949     }
950 
951     /**
952      * {@inheritDoc}
953      */
954     public final long getLastIoTime() {
955         return Math.max(lastReadTime, lastWriteTime);
956     }
957 
958     /**
959      * {@inheritDoc}
960      */
961     public final long getLastReadTime() {
962         return lastReadTime;
963     }
964 
965     /**
966      * {@inheritDoc}
967      */
968     public final long getLastWriteTime() {
969         return lastWriteTime;
970     }
971 
972     /**
973      * {@inheritDoc}
974      */
975     public final boolean isIdle(IdleStatus status) {
976         if (status == IdleStatus.BOTH_IDLE) {
977             return idleCountForBoth.get() > 0;
978         }
979 
980         if (status == IdleStatus.READER_IDLE) {
981             return idleCountForRead.get() > 0;
982         }
983 
984         if (status == IdleStatus.WRITER_IDLE) {
985             return idleCountForWrite.get() > 0;
986         }
987 
988         throw new IllegalArgumentException("Unknown idle status: " + status);
989     }
990 
991     /**
992      * {@inheritDoc}
993      */
994     public final boolean isBothIdle() {
995         return isIdle(IdleStatus.BOTH_IDLE);
996     }
997 
998     /**
999      * {@inheritDoc}
1000      */
1001     public final boolean isReaderIdle() {
1002         return isIdle(IdleStatus.READER_IDLE);
1003     }
1004 
1005     /**
1006      * {@inheritDoc}
1007      */
1008     public final boolean isWriterIdle() {
1009         return isIdle(IdleStatus.WRITER_IDLE);
1010     }
1011 
1012     /**
1013      * {@inheritDoc}
1014      */
1015     public final int getIdleCount(IdleStatus status) {
1016         if (getConfig().getIdleTime(status) == 0) {
1017             if (status == IdleStatus.BOTH_IDLE) {
1018                 idleCountForBoth.set(0);
1019             }
1020 
1021             if (status == IdleStatus.READER_IDLE) {
1022                 idleCountForRead.set(0);
1023             }
1024 
1025             if (status == IdleStatus.WRITER_IDLE) {
1026                 idleCountForWrite.set(0);
1027             }
1028         }
1029 
1030         if (status == IdleStatus.BOTH_IDLE) {
1031             return idleCountForBoth.get();
1032         }
1033 
1034         if (status == IdleStatus.READER_IDLE) {
1035             return idleCountForRead.get();
1036         }
1037 
1038         if (status == IdleStatus.WRITER_IDLE) {
1039             return idleCountForWrite.get();
1040         }
1041 
1042         throw new IllegalArgumentException("Unknown idle status: " + status);
1043     }
1044 
1045     /**
1046      * {@inheritDoc}
1047      */
1048     public final long getLastIdleTime(IdleStatus status) {
1049         if (status == IdleStatus.BOTH_IDLE) {
1050             return lastIdleTimeForBoth;
1051         }
1052 
1053         if (status == IdleStatus.READER_IDLE) {
1054             return lastIdleTimeForRead;
1055         }
1056 
1057         if (status == IdleStatus.WRITER_IDLE) {
1058             return lastIdleTimeForWrite;
1059         }
1060 
1061         throw new IllegalArgumentException("Unknown idle status: " + status);
1062     }
1063 
1064     /**
1065      * TODO Add method documentation
1066      */
1067     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1068         if (status == IdleStatus.BOTH_IDLE) {
1069             idleCountForBoth.incrementAndGet();
1070             lastIdleTimeForBoth = currentTime;
1071         } else if (status == IdleStatus.READER_IDLE) {
1072             idleCountForRead.incrementAndGet();
1073             lastIdleTimeForRead = currentTime;
1074         } else if (status == IdleStatus.WRITER_IDLE) {
1075             idleCountForWrite.incrementAndGet();
1076             lastIdleTimeForWrite = currentTime;
1077         } else {
1078             throw new IllegalArgumentException("Unknown idle status: " + status);
1079         }
1080     }
1081 
1082     /**
1083      * {@inheritDoc}
1084      */
1085     public final int getBothIdleCount() {
1086         return getIdleCount(IdleStatus.BOTH_IDLE);
1087     }
1088 
1089     /**
1090      * {@inheritDoc}
1091      */
1092     public final long getLastBothIdleTime() {
1093         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1094     }
1095 
1096     /**
1097      * {@inheritDoc}
1098      */
1099     public final long getLastReaderIdleTime() {
1100         return getLastIdleTime(IdleStatus.READER_IDLE);
1101     }
1102 
1103     /**
1104      * {@inheritDoc}
1105      */
1106     public final long getLastWriterIdleTime() {
1107         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1108     }
1109 
1110     /**
1111      * {@inheritDoc}
1112      */
1113     public final int getReaderIdleCount() {
1114         return getIdleCount(IdleStatus.READER_IDLE);
1115     }
1116 
1117     /**
1118      * {@inheritDoc}
1119      */
1120     public final int getWriterIdleCount() {
1121         return getIdleCount(IdleStatus.WRITER_IDLE);
1122     }
1123 
1124     /**
1125      * {@inheritDoc}
1126      */
1127     public SocketAddress getServiceAddress() {
1128         IoService service = getService();
1129         if (service instanceof IoAcceptor) {
1130             return ((IoAcceptor) service).getLocalAddress();
1131         }
1132         
1133         return getRemoteAddress();
1134     }
1135 
1136     /**
1137      * {@inheritDoc}
1138      */
1139     @Override
1140     public final int hashCode() {
1141         return super.hashCode();
1142     }
1143 
1144     /**
1145      * {@inheritDoc}
1146      * TODO This is a ridiculous implementation. Need to be replaced.
1147      */
1148     @Override
1149     public final boolean equals(Object o) {
1150         return super.equals(o);
1151     }
1152 
1153     /**
1154      * {@inheritDoc}
1155      */
1156     @Override
1157     public String toString() {
1158         if (isConnected()||isClosing()) {
1159             try {
1160                 SocketAddress remote = getRemoteAddress();
1161                 SocketAddress local = getLocalAddress();
1162                 
1163                 if (getService() instanceof IoAcceptor) {
1164                     return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1165                             remote + " => " + local + ')';
1166                 }
1167     
1168                 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1169                             local + " => " + remote + ')';
1170             } catch (Exception e) {
1171                 return "Session is disconnecting ...";
1172             }
1173         }
1174         
1175         return "Session disconnected ...";
1176     }
1177 
1178     /**
1179      * TODO Add method documentation
1180      */
1181     private String getIdAsString() {
1182         String id = Long.toHexString(getId()).toUpperCase();
1183 
1184         // Somewhat inefficient, but it won't happen that often
1185         // because an ID is often a big integer.
1186         while (id.length() < 8) {
1187             id = '0' + id; // padding
1188         }
1189         id = "0x" + id;
1190 
1191         return id;
1192     }
1193 
1194     /**
1195      * TODO Add method documentation
1196      */
1197     private String getServiceName() {
1198         TransportMetadata tm = getTransportMetadata();
1199         if (tm == null) {
1200             return "null";
1201         }
1202         
1203         return tm.getProviderName() + ' ' + tm.getName();
1204     }
1205 
1206     /**
1207      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable
1208      * sessions in the specified collection.
1209      *
1210      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1211      */
1212     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1213         IoSession s = null;
1214         while (sessions.hasNext()) {
1215             s = sessions.next();
1216             notifyIdleSession(s, currentTime);
1217         }
1218     }
1219 
1220     /**
1221      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1222      * specified {@code session}.
1223      *
1224      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1225      */
1226     public static void notifyIdleSession(IoSession session, long currentTime) {
1227         notifyIdleSession0(
1228                 session, currentTime,
1229                 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1230                 IdleStatus.BOTH_IDLE, Math.max(
1231                     session.getLastIoTime(),
1232                     session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1233 
1234         notifyIdleSession0(
1235                 session, currentTime,
1236                 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1237                 IdleStatus.READER_IDLE, Math.max(
1238                     session.getLastReadTime(),
1239                     session.getLastIdleTime(IdleStatus.READER_IDLE)));
1240 
1241         notifyIdleSession0(
1242                 session, currentTime,
1243                 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1244                 IdleStatus.WRITER_IDLE, Math.max(
1245                     session.getLastWriteTime(),
1246                     session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1247 
1248         notifyWriteTimeout(session, currentTime);
1249     }
1250 
1251     private static void notifyIdleSession0(
1252             IoSession session, long currentTime,
1253             long idleTime, IdleStatus status, long lastIoTime) {
1254         if (idleTime > 0 && lastIoTime != 0
1255                 && currentTime - lastIoTime >= idleTime) {
1256             session.getFilterChain().fireSessionIdle(status);
1257         }
1258     }
1259 
1260     private static void notifyWriteTimeout(
1261             IoSession session, long currentTime) {
1262 
1263         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1264         if (writeTimeout > 0 &&
1265                 currentTime - session.getLastWriteTime() >= writeTimeout &&
1266                 !session.getWriteRequestQueue().isEmpty(session)) {
1267             WriteRequest request = session.getCurrentWriteRequest();
1268             if (request != null) {
1269                 session.setCurrentWriteRequest(null);
1270                 WriteTimeoutException cause = new WriteTimeoutException(request);
1271                 request.getFuture().setException(cause);
1272                 session.getFilterChain().fireExceptionCaught(cause);
1273                 // WriteException is an IOException, so we close the session.
1274                 session.close(true);
1275             }
1276         }
1277     }
1278 
1279     
1280     
1281     /**
1282      * A queue which handles the CLOSE request.
1283      * 
1284      * TODO : Check that when closing a session, all the pending
1285      * requests are correctly sent.
1286      */
1287     private class CloseAwareWriteQueue implements WriteRequestQueue {
1288 
1289         private final WriteRequestQueue queue;
1290 
1291         /**
1292          * {@inheritDoc}
1293          */
1294         public CloseAwareWriteQueue(WriteRequestQueue queue) {
1295             this.queue = queue;
1296         }
1297 
1298         /**
1299          * {@inheritDoc}
1300          */
1301         public synchronized WriteRequest poll(IoSession session) {
1302             WriteRequest answer = queue.poll(session);
1303 
1304             if (answer == CLOSE_REQUEST) {
1305                 AbstractIoSession.this.close();
1306                 dispose(session);
1307                 answer = null;
1308             }
1309             
1310             return answer;
1311         }
1312 
1313         /**
1314          * {@inheritDoc}
1315          */
1316         public void offer(IoSession session, WriteRequest e) {
1317             queue.offer(session, e);
1318         }
1319 
1320         /**
1321          * {@inheritDoc}
1322          */
1323         public boolean isEmpty(IoSession session) {
1324             return queue.isEmpty(session);
1325         }
1326 
1327         /**
1328          * {@inheritDoc}
1329          */
1330         public void clear(IoSession session) {
1331             queue.clear(session);
1332         }
1333 
1334         /**
1335          * {@inheritDoc}
1336          */
1337         public void dispose(IoSession session) {
1338             queue.dispose(session);
1339         }
1340     }
1341 }