View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Queue;
28  import java.util.Set;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.apache.mina.core.buffer.IoBuffer;
33  import org.apache.mina.core.file.DefaultFileRegion;
34  import org.apache.mina.core.future.CloseFuture;
35  import org.apache.mina.core.future.DefaultCloseFuture;
36  import org.apache.mina.core.future.DefaultReadFuture;
37  import org.apache.mina.core.future.DefaultWriteFuture;
38  import org.apache.mina.core.future.IoFutureListener;
39  import org.apache.mina.core.future.ReadFuture;
40  import org.apache.mina.core.future.WriteFuture;
41  import org.apache.mina.core.service.AbstractIoService;
42  import org.apache.mina.core.service.IoAcceptor;
43  import org.apache.mina.core.service.IoProcessor;
44  import org.apache.mina.core.service.IoService;
45  import org.apache.mina.core.service.TransportMetadata;
46  import org.apache.mina.core.write.DefaultWriteRequest;
47  import org.apache.mina.core.write.WriteRequest;
48  import org.apache.mina.core.write.WriteRequestQueue;
49  import org.apache.mina.core.write.WriteToClosedSessionException;
50  import org.apache.mina.util.CircularQueue;
51  import org.apache.mina.util.ExceptionMonitor;
52  
53  
54  /**
55   * Base implementation of {@link IoSession}.
56   *
57   * @author The Apache MINA Project (dev@mina.apache.org)
58   * @version $Rev: 684597 $, $Date: 2008-08-10 23:20:07 +0200 (dim, 10 aoĆ» 2008) $
59   */
60  public abstract class AbstractIoSession implements IoSession {
61  
62      private static final AttributeKey READY_READ_FUTURES_KEY =
63          new AttributeKey(AbstractIoSession.class, "readyReadFutures");
64      
65      private static final AttributeKey WAITING_READ_FUTURES_KEY =
66          new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
67  
68      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
69          new IoFutureListener<CloseFuture>() {
70              public void operationComplete(CloseFuture future) {
71                  AbstractIoSession s = (AbstractIoSession) future.getSession();
72                  s.scheduledWriteBytes.set(0);
73                  s.scheduledWriteMessages.set(0);
74                  s.readBytesThroughput = 0;
75                  s.readMessagesThroughput = 0;
76                  s.writtenBytesThroughput = 0;
77                  s.writtenMessagesThroughput = 0;
78              }
79      };
80  
81      /**
82       * An internal write request object that triggers session close.
83       * @see #writeRequestQueue
84       */
85      private static final WriteRequest CLOSE_REQUEST =
86          new DefaultWriteRequest(new Object());
87  
88      private final Object lock = new Object();
89  
90      private IoSessionAttributeMap attributes;
91      private WriteRequestQueue writeRequestQueue;
92      private WriteRequest currentWriteRequest;
93      
94      // The Session creation's time */
95      private final long creationTime;
96  
97      /**
98       * A future that will be set 'closed' when the connection is closed.
99       */
100     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
101 
102     private volatile boolean closing;
103     private volatile TrafficMask trafficMask = TrafficMask.ALL;
104 
105     // Status variables
106     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
107     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
108     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
109 
110     private long readBytes;
111     private long writtenBytes;
112     private long readMessages;
113     private long writtenMessages;
114     private long lastReadTime;
115     private long lastWriteTime;
116 
117     private long lastThroughputCalculationTime;
118     private long lastReadBytes;
119     private long lastWrittenBytes;
120     private long lastReadMessages;
121     private long lastWrittenMessages;
122     private double readBytesThroughput;
123     private double writtenBytesThroughput;
124     private double readMessagesThroughput;
125     private double writtenMessagesThroughput;
126 
127     private int idleCountForBoth;
128     private int idleCountForRead;
129     private int idleCountForWrite;
130 
131     private long lastIdleTimeForBoth;
132     private long lastIdleTimeForRead;
133     private long lastIdleTimeForWrite;
134 
135     private boolean deferDecreaseReadBuffer = true;
136 
137     /**
138      * TODO Add method documentation
139      */
140     protected AbstractIoSession() {
141         // Initialize all the Session counters to the current time 
142         long currentTime = System.currentTimeMillis();
143         creationTime = currentTime;
144         lastThroughputCalculationTime = currentTime;
145         lastReadTime = currentTime;
146         lastWriteTime = currentTime;
147         lastIdleTimeForBoth = currentTime;
148         lastIdleTimeForRead = currentTime;
149         lastIdleTimeForWrite = currentTime;
150         
151         // TODO add documentation
152         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
153     }
154 
155     /**
156      * {@inheritDoc}
157      * 
158      * TODO this method implementation is totally wrong. It has to
159      * be rewritten.
160      */
161     public final long getId() {
162         return hashCode() & 0xFFFFFFFFL;
163     }
164 
165     /**
166      * TODO Add method documentation
167      */
168     public abstract IoProcessor getProcessor();
169 
170     /**
171      * {@inheritDoc}
172      */
173     public final boolean isConnected() {
174         return !closeFuture.isClosed();
175     }
176 
177     /**
178      * {@inheritDoc}
179      */
180     public final boolean isClosing() {
181         return closing || closeFuture.isClosed();
182     }
183 
184     /**
185      * {@inheritDoc}
186      */
187     public final CloseFuture getCloseFuture() {
188         return closeFuture;
189     }
190 
191     /**
192      * TODO Add method documentation
193      */
194     public final boolean isScheduledForFlush() {
195         return scheduledForFlush.get();
196     }
197 
198     /**
199      * TODO Add method documentation
200      */
201     public final boolean setScheduledForFlush(boolean flag) {
202         if (flag) {
203             return scheduledForFlush.compareAndSet(false, true);
204         } else {
205             scheduledForFlush.set(false);
206             return true;
207         }
208     }
209 
210     /**
211      * {@inheritDoc}
212      */
213     public final CloseFuture close(boolean rightNow) {
214         if (rightNow) {
215             return close();
216         } else {
217             return closeOnFlush();
218         }
219     }
220 
221     /**
222      * {@inheritDoc}
223      */
224     public final CloseFuture close() {
225         synchronized (lock) {
226             if (isClosing()) {
227                 return closeFuture;
228             } else {
229                 closing = true;
230             }
231         }
232 
233         getFilterChain().fireFilterClose();
234         return closeFuture;
235     }
236 
237     /**
238      * {@inheritDoc}
239      */
240     public final CloseFuture closeOnFlush() {
241         getWriteRequestQueue().offer(this, CLOSE_REQUEST);
242         getProcessor().flush(this);
243         return closeFuture;
244     }
245 
246     /**
247      * {@inheritDoc}
248      */
249     public final ReadFuture read() {
250         if (!getConfig().isUseReadOperation()) {
251             throw new IllegalStateException("useReadOperation is not enabled.");
252         }
253 
254         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
255         ReadFuture future;
256         synchronized (readyReadFutures) {
257             future = readyReadFutures.poll();
258             if (future != null) {
259                 if (future.isClosed()) {
260                     // Let other readers get notified.
261                     readyReadFutures.offer(future);
262                 }
263             } else {
264                 future = new DefaultReadFuture(this);
265                 getWaitingReadFutures().offer(future);
266             }
267         }
268 
269         return future;
270     }
271 
272     /**
273      * TODO Add method documentation
274      */
275     public final void offerReadFuture(Object message) {
276         newReadFuture().setRead(message);
277     }
278 
279     /**
280      * TODO Add method documentation
281      */
282     public final void offerFailedReadFuture(Throwable exception) {
283         newReadFuture().setException(exception);
284     }
285 
286     /**
287      * TODO Add method documentation
288      */
289     public final void offerClosedReadFuture() {
290         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
291         synchronized (readyReadFutures) {
292             newReadFuture().setClosed();
293         }
294     }
295 
296     /**
297      * TODO Add method documentation
298      */
299     private ReadFuture newReadFuture() {
300         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
301         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
302         ReadFuture future;
303         synchronized (readyReadFutures) {
304             future = waitingReadFutures.poll();
305             if (future == null) {
306                 future = new DefaultReadFuture(this);
307                 readyReadFutures.offer(future);
308             }
309         }
310         return future;
311     }
312 
313     /**
314      * TODO Add method documentation
315      */
316     private Queue<ReadFuture> getReadyReadFutures() {
317         Queue<ReadFuture> readyReadFutures =
318             (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
319         if (readyReadFutures == null) {
320             readyReadFutures = new CircularQueue<ReadFuture>();
321 
322             Queue<ReadFuture> oldReadyReadFutures =
323                 (Queue<ReadFuture>) setAttributeIfAbsent(
324                         READY_READ_FUTURES_KEY, readyReadFutures);
325             if (oldReadyReadFutures != null) {
326                 readyReadFutures = oldReadyReadFutures;
327             }
328         }
329         return readyReadFutures;
330     }
331 
332     /**
333      * TODO Add method documentation
334      */
335     private Queue<ReadFuture> getWaitingReadFutures() {
336         Queue<ReadFuture> waitingReadyReadFutures =
337             (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
338         if (waitingReadyReadFutures == null) {
339             waitingReadyReadFutures = new CircularQueue<ReadFuture>();
340 
341             Queue<ReadFuture> oldWaitingReadyReadFutures =
342                 (Queue<ReadFuture>) setAttributeIfAbsent(
343                         WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
344             if (oldWaitingReadyReadFutures != null) {
345                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
346             }
347         }
348         return waitingReadyReadFutures;
349     }
350 
351     /**
352      * {@inheritDoc}
353      */
354     public final WriteFuture write(Object message) {
355         return write(message, null);
356     }
357 
358     /**
359      * {@inheritDoc}
360      */
361     public final WriteFuture write(Object message, SocketAddress remoteAddress) {
362         if (message == null) {
363             throw new NullPointerException("message");
364         }
365 
366         if (!getTransportMetadata().isConnectionless() &&
367                 remoteAddress != null) {
368             throw new UnsupportedOperationException();
369         }
370 
371         if (isClosing() || !isConnected()) {
372             WriteFuture future = new DefaultWriteFuture(this);
373             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
374             future.setException(new WriteToClosedSessionException(request));
375             return future;
376         }
377 
378         FileChannel openedFileChannel = null;
379         try {
380             if (message instanceof IoBuffer
381                     && !((IoBuffer) message).hasRemaining()) {
382                 throw new IllegalArgumentException(
383                 "message is empty. Forgot to call flip()?");
384             } else if (message instanceof FileChannel) {
385                 FileChannel fileChannel = (FileChannel) message;
386                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
387             } else if (message instanceof File) {
388                 File file = (File) message;
389                 openedFileChannel = new FileInputStream(file).getChannel();
390                 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
391             }
392         } catch (IOException e) {
393             ExceptionMonitor.getInstance().exceptionCaught(e);
394             return DefaultWriteFuture.newNotWrittenFuture(this, e);
395         }
396 
397         WriteFuture future = new DefaultWriteFuture(this);
398         getFilterChain().fireFilterWrite(
399                 new DefaultWriteRequest(message, future, remoteAddress));
400 
401         if (openedFileChannel != null) {
402             // If we opened a FileChannel, it needs to be closed when the write has completed
403             final FileChannel finalChannel = openedFileChannel;
404             future.addListener(new IoFutureListener<WriteFuture>() {
405                 public void operationComplete(WriteFuture future) {
406                     try {
407                         finalChannel.close();
408                     } catch (IOException e) {
409                         ExceptionMonitor.getInstance().exceptionCaught(e);
410                     }
411                 }
412             });
413         }
414 
415         return future;
416     }
417 
418     /**
419      * {@inheritDoc}
420      */
421     public final Object getAttachment() {
422         return getAttribute("");
423     }
424 
425     /**
426      * {@inheritDoc}
427      */
428     public final Object setAttachment(Object attachment) {
429         return setAttribute("", attachment);
430     }
431 
432     /**
433      * {@inheritDoc}
434      */
435     public final Object getAttribute(Object key) {
436         return getAttribute(key, null);
437     }
438 
439     /**
440      * {@inheritDoc}
441      */
442     public final Object getAttribute(Object key, Object defaultValue) {
443         return attributes.getAttribute(this, key, defaultValue);
444     }
445 
446     /**
447      * {@inheritDoc}
448      */
449     public final Object setAttribute(Object key, Object value) {
450         return attributes.setAttribute(this, key, value);
451     }
452 
453     /**
454      * {@inheritDoc}
455      */
456     public final Object setAttribute(Object key) {
457         return setAttribute(key, Boolean.TRUE);
458     }
459 
460     /**
461      * {@inheritDoc}
462      */
463     public final Object setAttributeIfAbsent(Object key, Object value) {
464         return attributes.setAttributeIfAbsent(this, key, value);
465     }
466 
467     /**
468      * {@inheritDoc}
469      */
470     public final Object setAttributeIfAbsent(Object key) {
471         return setAttributeIfAbsent(key, Boolean.TRUE);
472     }
473 
474     /**
475      * {@inheritDoc}
476      */
477     public final Object removeAttribute(Object key) {
478         return attributes.removeAttribute(this, key);
479     }
480 
481     /**
482      * {@inheritDoc}
483      */
484     public final boolean removeAttribute(Object key, Object value) {
485         return attributes.removeAttribute(this, key, value);
486     }
487 
488     /**
489      * {@inheritDoc}
490      */
491     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
492         return attributes.replaceAttribute(this, key, oldValue, newValue);
493     }
494 
495     /**
496      * {@inheritDoc}
497      */
498     public final boolean containsAttribute(Object key) {
499         return attributes.containsAttribute(this, key);
500     }
501 
502     /**
503      * {@inheritDoc}
504      */
505     public final Set<Object> getAttributeKeys() {
506         return attributes.getAttributeKeys(this);
507     }
508 
509     /**
510      * TODO Add method documentation
511      */
512     public final IoSessionAttributeMap getAttributeMap() {
513         return attributes;
514     }
515 
516     /**
517      * TODO Add method documentation
518      */
519     public final void setAttributeMap(IoSessionAttributeMap attributes) {
520         this.attributes = attributes;
521     }
522 
523     /**
524      * TODO Add method documentation
525      */
526     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
527         this.writeRequestQueue =
528             new CloseRequestAwareWriteRequestQueue(writeRequestQueue);
529     }
530 
531     /**
532      * {@inheritDoc}
533      */
534     public final TrafficMask getTrafficMask() {
535         return trafficMask;
536     }
537 
538     /**
539      * {@inheritDoc}
540      */
541     public final void setTrafficMask(TrafficMask trafficMask) {
542         if (trafficMask == null) {
543             throw new NullPointerException("trafficMask");
544         }
545 
546         if (isClosing() || !isConnected()) {
547             return;
548         }
549 
550         getFilterChain().fireFilterSetTrafficMask(trafficMask);
551     }
552 
553     /**
554      * TODO Add method documentation
555      */
556     public final void setTrafficMaskNow(TrafficMask trafficMask) {
557         this.trafficMask = trafficMask;
558     }
559 
560     /**
561      * {@inheritDoc}
562      */
563     public final void suspendRead() {
564         setTrafficMask(getTrafficMask().and(TrafficMask.READ.not()));
565     }
566 
567     /**
568      * {@inheritDoc}
569      */
570     public final void suspendWrite() {
571         setTrafficMask(getTrafficMask().and(TrafficMask.WRITE.not()));
572     }
573 
574     /**
575      * {@inheritDoc}
576      */
577     public final void resumeRead() {
578         setTrafficMask(getTrafficMask().or(TrafficMask.READ));
579     }
580 
581     /**
582      * {@inheritDoc}
583      */
584     public final void resumeWrite() {
585         setTrafficMask(getTrafficMask().or(TrafficMask.WRITE));
586     }
587 
588     /**
589      * {@inheritDoc}
590      */
591     public final long getReadBytes() {
592         return readBytes;
593     }
594 
595     /**
596      * {@inheritDoc}
597      */
598     public final long getWrittenBytes() {
599         return writtenBytes;
600     }
601 
602     /**
603      * {@inheritDoc}
604      */
605     public final long getReadMessages() {
606         return readMessages;
607     }
608 
609     /**
610      * {@inheritDoc}
611      */
612     public final long getWrittenMessages() {
613         return writtenMessages;
614     }
615 
616     /**
617      * {@inheritDoc}
618      */
619     public final double getReadBytesThroughput() {
620         return readBytesThroughput;
621     }
622 
623     /**
624      * {@inheritDoc}
625      */
626     public final double getWrittenBytesThroughput() {
627         return writtenBytesThroughput;
628     }
629 
630     /**
631      * {@inheritDoc}
632      */
633     public final double getReadMessagesThroughput() {
634         return readMessagesThroughput;
635     }
636 
637     /**
638      * {@inheritDoc}
639      */
640     public final double getWrittenMessagesThroughput() {
641         return writtenMessagesThroughput;
642     }
643 
644     /**
645      * Update all statistical properties related with throughput assuming
646      * the specified time is the current time.  By default this method returns
647      * silently without updating the throughput properties if they were
648      * calculated already within last
649      * {@link IoSessionConfig#getThroughputCalculationInterval() calculation interval}.
650      * If, however, <tt>force</tt> is specified as <tt>true</tt>, this method
651      * updates the throughput properties immediately.
652 
653      * @param currentTime the current time in milliseconds
654      */
655     public final void updateThroughput(long currentTime, boolean force) {
656         int interval = (int) (currentTime - lastThroughputCalculationTime);
657 
658         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
659         if (minInterval == 0 || interval < minInterval) {
660             if (!force) {
661                 return;
662             }
663         }
664 
665         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
666         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
667         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
668         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
669 
670         lastReadBytes = readBytes;
671         lastWrittenBytes = writtenBytes;
672         lastReadMessages = readMessages;
673         lastWrittenMessages = writtenMessages;
674 
675         lastThroughputCalculationTime = currentTime;
676     }
677 
678     /**
679      * {@inheritDoc}
680      */
681     public final long getScheduledWriteBytes() {
682         return scheduledWriteBytes.get();
683     }
684 
685     /**
686      * {@inheritDoc}
687      */
688     public final int getScheduledWriteMessages() {
689         return scheduledWriteMessages.get();
690     }
691 
692     /**
693      * TODO Add method documentation
694      */
695     protected void setScheduledWriteBytes(int byteCount){
696         scheduledWriteBytes.set(byteCount);
697     }
698 
699     /**
700      * TODO Add method documentation
701      */
702     protected void setScheduledWriteMessages(int messages) {
703         scheduledWriteMessages.set(messages);
704     }
705 
706     /**
707      * TODO Add method documentation
708      */
709     public final void increaseReadBytes(long increment, long currentTime) {
710         if (increment <= 0) {
711             return;
712         }
713 
714         readBytes += increment;
715         lastReadTime = currentTime;
716         idleCountForBoth = 0;
717         idleCountForRead = 0;
718 
719         if (getService() instanceof AbstractIoService) {
720             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
721         }
722     }
723 
724     /**
725      * TODO Add method documentation
726      */
727     public final void increaseReadMessages(long currentTime) {
728         readMessages++;
729         lastReadTime = currentTime;
730         idleCountForBoth = 0;
731         idleCountForRead = 0;
732 
733         if (getService() instanceof AbstractIoService) {
734             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
735         }
736     }
737 
738     /**
739      * TODO Add method documentation
740      */
741     public final void increaseWrittenBytes(int increment, long currentTime) {
742         if (increment <= 0) {
743             return;
744         }
745 
746         writtenBytes += increment;
747         lastWriteTime = currentTime;
748         idleCountForBoth = 0;
749         idleCountForWrite = 0;
750 
751         if (getService() instanceof AbstractIoService) {
752             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
753         }
754 
755         increaseScheduledWriteBytes(-increment);
756     }
757 
758     /**
759      * TODO Add method documentation
760      */
761     public final void increaseWrittenMessages(
762             WriteRequest request, long currentTime) {
763         Object message = request.getMessage();
764         if (message instanceof IoBuffer) {
765             IoBuffer b = (IoBuffer) message;
766             if (b.hasRemaining()) {
767                 return;
768             }
769         }
770 
771         writtenMessages++;
772         lastWriteTime = currentTime;
773         if (getService() instanceof AbstractIoService) {
774             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
775         }
776 
777         decreaseScheduledWriteMessages();
778     }
779 
780     /**
781      * TODO Add method documentation
782      */
783     public final void increaseScheduledWriteBytes(int increment) {
784         scheduledWriteBytes.addAndGet(increment);
785         if (getService() instanceof AbstractIoService) {
786             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
787         }
788     }
789 
790     /**
791      * TODO Add method documentation
792      */
793     public final void increaseScheduledWriteMessages() {
794         scheduledWriteMessages.incrementAndGet();
795         if (getService() instanceof AbstractIoService) {
796             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
797         }
798     }
799 
800     /**
801      * TODO Add method documentation
802      */
803     private void decreaseScheduledWriteMessages() {
804         scheduledWriteMessages.decrementAndGet();
805         if (getService() instanceof AbstractIoService) {
806             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
807         }
808     }
809 
810     /**
811      * TODO Add method documentation
812      */
813     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
814         Object message = request.getMessage();
815         if (message instanceof IoBuffer) {
816             IoBuffer b = (IoBuffer) message;
817             if (b.hasRemaining()) {
818                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
819             } else {
820                 decreaseScheduledWriteMessages();
821             }
822         } else {
823             decreaseScheduledWriteMessages();
824         }
825     }
826 
827     /**
828      * TODO Add method documentation
829      */
830     public final WriteRequestQueue getWriteRequestQueue() {
831         if (writeRequestQueue == null) {
832             throw new IllegalStateException();
833         }
834         return writeRequestQueue;
835     }
836 
837     /**
838      * {@inheritDoc}
839      */
840     public final WriteRequest getCurrentWriteRequest() {
841         return currentWriteRequest;
842     }
843 
844     /**
845      * {@inheritDoc}
846      */
847     public final Object getCurrentWriteMessage() {
848         WriteRequest req = getCurrentWriteRequest();
849         if (req == null) {
850             return null;
851         }
852         return req.getMessage();
853     }
854 
855     /**
856      * TODO Add method documentation
857      */
858     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
859         this.currentWriteRequest = currentWriteRequest;
860     }
861 
862     /**
863      * TODO Add method documentation
864      */
865     public final void increaseReadBufferSize() {
866         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
867         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
868             getConfig().setReadBufferSize(newReadBufferSize);
869         } else {
870             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
871         }
872 
873         deferDecreaseReadBuffer = true;
874     }
875 
876     /**
877      * TODO Add method documentation
878      */
879     public final void decreaseReadBufferSize() {
880         if (deferDecreaseReadBuffer) {
881             deferDecreaseReadBuffer = false;
882             return;
883         }
884 
885         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
886             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
887         }
888 
889         deferDecreaseReadBuffer = true;
890     }
891 
892     /**
893      * {@inheritDoc}
894      */
895     public final long getCreationTime() {
896         return creationTime;
897     }
898 
899     /**
900      * {@inheritDoc}
901      */
902     public final long getLastIoTime() {
903         return Math.max(lastReadTime, lastWriteTime);
904     }
905 
906     /**
907      * {@inheritDoc}
908      */
909     public final long getLastReadTime() {
910         return lastReadTime;
911     }
912 
913     /**
914      * {@inheritDoc}
915      */
916     public final long getLastWriteTime() {
917         return lastWriteTime;
918     }
919 
920     /**
921      * {@inheritDoc}
922      */
923     public final boolean isIdle(IdleStatus status) {
924         if (status == IdleStatus.BOTH_IDLE) {
925             return idleCountForBoth > 0;
926         }
927 
928         if (status == IdleStatus.READER_IDLE) {
929             return idleCountForRead > 0;
930         }
931 
932         if (status == IdleStatus.WRITER_IDLE) {
933             return idleCountForWrite > 0;
934         }
935 
936         throw new IllegalArgumentException("Unknown idle status: " + status);
937     }
938 
939     /**
940      * {@inheritDoc}
941      */
942     public final boolean isBothIdle() {
943         return isIdle(IdleStatus.BOTH_IDLE);
944     }
945 
946     /**
947      * {@inheritDoc}
948      */
949     public final boolean isReaderIdle() {
950         return isIdle(IdleStatus.READER_IDLE);
951     }
952 
953     /**
954      * {@inheritDoc}
955      */
956     public final boolean isWriterIdle() {
957         return isIdle(IdleStatus.WRITER_IDLE);
958     }
959 
960     /**
961      * {@inheritDoc}
962      */
963     public final int getIdleCount(IdleStatus status) {
964         if (getConfig().getIdleTime(status) == 0) {
965             if (status == IdleStatus.BOTH_IDLE) {
966                 idleCountForBoth = 0;
967             }
968 
969             if (status == IdleStatus.READER_IDLE) {
970                 idleCountForRead = 0;
971             }
972 
973             if (status == IdleStatus.WRITER_IDLE) {
974                 idleCountForWrite = 0;
975             }
976         }
977 
978         if (status == IdleStatus.BOTH_IDLE) {
979             return idleCountForBoth;
980         }
981 
982         if (status == IdleStatus.READER_IDLE) {
983             return idleCountForRead;
984         }
985 
986         if (status == IdleStatus.WRITER_IDLE) {
987             return idleCountForWrite;
988         }
989 
990         throw new IllegalArgumentException("Unknown idle status: " + status);
991     }
992 
993     /**
994      * {@inheritDoc}
995      */
996     public final long getLastIdleTime(IdleStatus status) {
997         if (status == IdleStatus.BOTH_IDLE) {
998             return lastIdleTimeForBoth;
999         }
1000 
1001         if (status == IdleStatus.READER_IDLE) {
1002             return lastIdleTimeForRead;
1003         }
1004 
1005         if (status == IdleStatus.WRITER_IDLE) {
1006             return lastIdleTimeForWrite;
1007         }
1008 
1009         throw new IllegalArgumentException("Unknown idle status: " + status);
1010     }
1011 
1012     /**
1013      * TODO Add method documentation
1014      */
1015     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1016         if (status == IdleStatus.BOTH_IDLE) {
1017             idleCountForBoth++;
1018             lastIdleTimeForBoth = currentTime;
1019         } else if (status == IdleStatus.READER_IDLE) {
1020             idleCountForRead++;
1021             lastIdleTimeForRead = currentTime;
1022         } else if (status == IdleStatus.WRITER_IDLE) {
1023             idleCountForWrite++;
1024             lastIdleTimeForWrite = currentTime;
1025         } else {
1026             throw new IllegalArgumentException("Unknown idle status: " + status);
1027         }
1028     }
1029 
1030     /**
1031      * {@inheritDoc}
1032      */
1033     public final int getBothIdleCount() {
1034         return getIdleCount(IdleStatus.BOTH_IDLE);
1035     }
1036 
1037     /**
1038      * {@inheritDoc}
1039      */
1040     public final long getLastBothIdleTime() {
1041         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1042     }
1043 
1044     /**
1045      * {@inheritDoc}
1046      */
1047     public final long getLastReaderIdleTime() {
1048         return getLastIdleTime(IdleStatus.READER_IDLE);
1049     }
1050 
1051     /**
1052      * {@inheritDoc}
1053      */
1054     public final long getLastWriterIdleTime() {
1055         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1056     }
1057 
1058     /**
1059      * {@inheritDoc}
1060      */
1061     public final int getReaderIdleCount() {
1062         return getIdleCount(IdleStatus.READER_IDLE);
1063     }
1064 
1065     /**
1066      * {@inheritDoc}
1067      */
1068     public final int getWriterIdleCount() {
1069         return getIdleCount(IdleStatus.WRITER_IDLE);
1070     }
1071 
1072     /**
1073      * {@inheritDoc}
1074      */
1075     public SocketAddress getServiceAddress() {
1076         IoService service = getService();
1077         if (service instanceof IoAcceptor) {
1078             return ((IoAcceptor) service).getLocalAddress();
1079         } else {
1080             return getRemoteAddress();
1081         }
1082     }
1083 
1084     /**
1085      * {@inheritDoc}
1086      */
1087     @Override
1088     public final int hashCode() {
1089         return super.hashCode();
1090     }
1091 
1092     /**
1093      * {@inheritDoc}
1094      * TODO This is a ridiculous implementation. Need to be replaced.
1095      */
1096     @Override
1097     public final boolean equals(Object o) {
1098         return super.equals(o);
1099     }
1100 
1101     /**
1102      * {@inheritDoc}
1103      */
1104     @Override
1105     public String toString() {
1106         if (getService() instanceof IoAcceptor) {
1107             return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1108                     getRemoteAddress() + " => " + getLocalAddress() + ')';
1109         } else {
1110             return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1111                     getLocalAddress() + " => " + getRemoteAddress() + ')';
1112         }
1113     }
1114 
1115     /**
1116      * TODO Add method documentation
1117      */
1118     private String getIdAsString() {
1119         String id = Long.toHexString(getId()).toUpperCase();
1120 
1121         // Somewhat inefficient, but it won't happen that often
1122         // because an ID is often a big integer.
1123         while (id.length() < 8) {
1124             id = '0' + id; // padding
1125         }
1126         id = "0x" + id;
1127 
1128         return id;
1129     }
1130 
1131     /**
1132      * TODO Add method documentation
1133      */
1134     private String getServiceName() {
1135         TransportMetadata tm = getTransportMetadata();
1136         if (tm == null) {
1137             return "null";
1138         } else {
1139             return tm.getProviderName() + ' ' + tm.getName();
1140         }
1141     }
1142 
1143     /**
1144      * TODO Add method documentation. Name is ridiculously too long.
1145      */
1146     private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {
1147 
1148         private final WriteRequestQueue q;
1149 
1150         public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {
1151             this.q = q;
1152         }
1153 
1154         public synchronized WriteRequest poll(IoSession session) {
1155             WriteRequest answer = q.poll(session);
1156             if (answer == CLOSE_REQUEST) {
1157                 AbstractIoSession.this.close();
1158                 dispose(session);
1159                 answer = null;
1160             }
1161             return answer;
1162         }
1163 
1164         public void offer(IoSession session, WriteRequest e) {
1165             q.offer(session, e);
1166         }
1167 
1168         public boolean isEmpty(IoSession session) {
1169             return q.isEmpty(session);
1170         }
1171 
1172         public void clear(IoSession session) {
1173             q.clear(session);
1174         }
1175 
1176         public void dispose(IoSession session) {
1177             q.dispose(session);
1178         }
1179     }
1180 }