1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.handler.stream;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.service.IoHandler;
27 import org.apache.mina.core.session.IoSession;
28
29
30
31
32
33
34
35
36 class IoSessionInputStream extends InputStream {
37 private final Object mutex = new Object();
38
39 private final IoBuffer buf;
40
41 private volatile boolean closed;
42
43 private volatile boolean released;
44
45 private IOException exception;
46
47 public IoSessionInputStream() {
48 buf = IoBuffer.allocate(16);
49 buf.setAutoExpand(true);
50 buf.limit(0);
51 }
52
53 @Override
54 public int available() {
55 if (released) {
56 return 0;
57 } else {
58 synchronized (mutex) {
59 return buf.remaining();
60 }
61 }
62 }
63
64 @Override
65 public void close() {
66 if (closed) {
67 return;
68 }
69
70 synchronized (mutex) {
71 closed = true;
72 releaseBuffer();
73
74 mutex.notifyAll();
75 }
76 }
77
78 @Override
79 public int read() throws IOException {
80 synchronized (mutex) {
81 if (!waitForData()) {
82 return -1;
83 }
84
85 return buf.get() & 0xff;
86 }
87 }
88
89 @Override
90 public int read(byte[] b, int off, int len) throws IOException {
91 synchronized (mutex) {
92 if (!waitForData()) {
93 return -1;
94 }
95
96 int readBytes;
97
98 if (len > buf.remaining()) {
99 readBytes = buf.remaining();
100 } else {
101 readBytes = len;
102 }
103
104 buf.get(b, off, readBytes);
105
106 return readBytes;
107 }
108 }
109
110 private boolean waitForData() throws IOException {
111 if (released) {
112 return false;
113 }
114
115 synchronized (mutex) {
116 while (!released && buf.remaining() == 0 && exception == null) {
117 try {
118 mutex.wait();
119 } catch (InterruptedException e) {
120 IOException ioe = new IOException(
121 "Interrupted while waiting for more data");
122 ioe.initCause(e);
123 throw ioe;
124 }
125 }
126 }
127
128 if (exception != null) {
129 releaseBuffer();
130 throw exception;
131 }
132
133 if (closed && buf.remaining() == 0) {
134 releaseBuffer();
135
136 return false;
137 }
138
139 return true;
140 }
141
142 private void releaseBuffer() {
143 if (released) {
144 return;
145 }
146
147 released = true;
148 }
149
150 public void write(IoBuffer src) {
151 synchronized (mutex) {
152 if (closed) {
153 return;
154 }
155
156 if (buf.hasRemaining()) {
157 this.buf.compact();
158 this.buf.put(src);
159 this.buf.flip();
160 } else {
161 this.buf.clear();
162 this.buf.put(src);
163 this.buf.flip();
164 mutex.notifyAll();
165 }
166 }
167 }
168
169 public void throwException(IOException e) {
170 synchronized (mutex) {
171 if (exception == null) {
172 exception = e;
173
174 mutex.notifyAll();
175 }
176 }
177 }
178 }