View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.handler.stream;
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.service.IoHandler;
27  
28  /**
29   * An {@link InputStream} that buffers data read from
30   * {@link IoHandler#messageReceived(IoSession, Object)} events.
31   *
32   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
33   */
34  class IoSessionInputStream extends InputStream {
35      private final Object mutex = new Object();
36  
37      private final IoBuffer buf;
38  
39      private volatile boolean closed;
40  
41      private volatile boolean released;
42  
43      private IOException exception;
44  
45      public IoSessionInputStream() {
46          buf = IoBuffer.allocate(16);
47          buf.setAutoExpand(true);
48          buf.limit(0);
49      }
50  
51      @Override
52      public int available() {
53          if (released) {
54              return 0;
55          }
56  
57          synchronized (mutex) {
58              return buf.remaining();
59          }
60      }
61  
62      @Override
63      public void close() {
64          if (closed) {
65              return;
66          }
67  
68          synchronized (mutex) {
69              closed = true;
70              releaseBuffer();
71  
72              mutex.notifyAll();
73          }
74      }
75  
76      @Override
77      public int read() throws IOException {
78          synchronized (mutex) {
79              if (!waitForData()) {
80                  return -1;
81              }
82  
83              return buf.get() & 0xff;
84          }
85      }
86  
87      @Override
88      public int read(byte[] b, int off, int len) throws IOException {
89          synchronized (mutex) {
90              if (!waitForData()) {
91                  return -1;
92              }
93  
94              int readBytes;
95  
96              if (len > buf.remaining()) {
97                  readBytes = buf.remaining();
98              } else {
99                  readBytes = len;
100             }
101 
102             buf.get(b, off, readBytes);
103 
104             return readBytes;
105         }
106     }
107 
108     private boolean waitForData() throws IOException {
109         if (released) {
110             return false;
111         }
112 
113         synchronized (mutex) {
114             while (!released && buf.remaining() == 0 && exception == null) {
115                 try {
116                     mutex.wait();
117                 } catch (InterruptedException e) {
118                     IOException ioe = new IOException("Interrupted while waiting for more data");
119                     ioe.initCause(e);
120                     throw ioe;
121                 }
122             }
123         }
124 
125         if (exception != null) {
126             releaseBuffer();
127             throw exception;
128         }
129 
130         if (closed && buf.remaining() == 0) {
131             releaseBuffer();
132 
133             return false;
134         }
135 
136         return true;
137     }
138 
139     private void releaseBuffer() {
140         if (released) {
141             return;
142         }
143 
144         released = true;
145     }
146 
147     public void write(IoBuffer src) {
148         synchronized (mutex) {
149             if (closed) {
150                 return;
151             }
152 
153             if (buf.hasRemaining()) {
154                 this.buf.compact();
155                 this.buf.put(src);
156                 this.buf.flip();
157             } else {
158                 this.buf.clear();
159                 this.buf.put(src);
160                 this.buf.flip();
161                 mutex.notifyAll();
162             }
163         }
164     }
165 
166     public void throwException(IOException e) {
167         synchronized (mutex) {
168             if (exception == null) {
169                 exception = e;
170 
171                 mutex.notifyAll();
172             }
173         }
174     }
175 }