View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.handler.stream;
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.service.IoHandler;
27  
28  /**
29   * An {@link InputStream} that buffers data read from
30   * {@link IoHandler#messageReceived(IoSession, Object)} events.
31   *
32   * @author The Apache MINA Project (dev@mina.apache.org)
33   */
34  class IoSessionInputStream extends InputStream {
35      private final Object mutex = new Object();
36  
37      private final IoBuffer buf;
38  
39      private volatile boolean closed;
40  
41      private volatile boolean released;
42  
43      private IOException exception;
44  
45      public IoSessionInputStream() {
46          buf = IoBuffer.allocate(16);
47          buf.setAutoExpand(true);
48          buf.limit(0);
49      }
50  
51      @Override
52      public int available() {
53          if (released) {
54              return 0;
55          }
56  
57          synchronized (mutex) {
58              return buf.remaining();
59          }
60      }
61  
62      @Override
63      public void close() {
64          if (closed) {
65              return;
66          }
67  
68          synchronized (mutex) {
69              closed = true;
70              releaseBuffer();
71  
72              mutex.notifyAll();
73          }
74      }
75  
76      @Override
77      public int read() throws IOException {
78          synchronized (mutex) {
79              if (!waitForData()) {
80                  return -1;
81              }
82  
83              return buf.get() & 0xff;
84          }
85      }
86  
87      @Override
88      public int read(byte[] b, int off, int len) throws IOException {
89          synchronized (mutex) {
90              if (!waitForData()) {
91                  return -1;
92              }
93  
94              int readBytes;
95  
96              if (len > buf.remaining()) {
97                  readBytes = buf.remaining();
98              } else {
99                  readBytes = len;
100             }
101 
102             buf.get(b, off, readBytes);
103 
104             return readBytes;
105         }
106     }
107 
108     private boolean waitForData() throws IOException {
109         if (released) {
110             return false;
111         }
112 
113         synchronized (mutex) {
114             while (!released && buf.remaining() == 0 && exception == null) {
115                 try {
116                     mutex.wait();
117                 } catch (InterruptedException e) {
118                     IOException ioe = new IOException(
119                             "Interrupted while waiting for more data");
120                     ioe.initCause(e);
121                     throw ioe;
122                 }
123             }
124         }
125 
126         if (exception != null) {
127             releaseBuffer();
128             throw exception;
129         }
130 
131         if (closed && buf.remaining() == 0) {
132             releaseBuffer();
133 
134             return false;
135         }
136 
137         return true;
138     }
139 
140     private void releaseBuffer() {
141         if (released) {
142             return;
143         }
144 
145         released = true;
146     }
147 
148     public void write(IoBuffer src) {
149         synchronized (mutex) {
150             if (closed) {
151                 return;
152             }
153 
154             if (buf.hasRemaining()) {
155                 this.buf.compact();
156                 this.buf.put(src);
157                 this.buf.flip();
158             } else {
159                 this.buf.clear();
160                 this.buf.put(src);
161                 this.buf.flip();
162                 mutex.notifyAll();
163             }
164         }
165     }
166 
167     public void throwException(IOException e) {
168         synchronized (mutex) {
169             if (exception == null) {
170                 exception = e;
171 
172                 mutex.notifyAll();
173             }
174         }
175     }
176 }