1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.handler.stream;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.service.IoHandler;
27
28
29
30
31
32
33
34 class IoSessionInputStream extends InputStream {
35 private final Object mutex = new Object();
36
37 private final IoBuffer buf;
38
39 private volatile boolean closed;
40
41 private volatile boolean released;
42
43 private IOException exception;
44
45 public IoSessionInputStream() {
46 buf = IoBuffer.allocate(16);
47 buf.setAutoExpand(true);
48 buf.limit(0);
49 }
50
51 @Override
52 public int available() {
53 if (released) {
54 return 0;
55 }
56
57 synchronized (mutex) {
58 return buf.remaining();
59 }
60 }
61
62 @Override
63 public void close() {
64 if (closed) {
65 return;
66 }
67
68 synchronized (mutex) {
69 closed = true;
70 releaseBuffer();
71
72 mutex.notifyAll();
73 }
74 }
75
76 @Override
77 public int read() throws IOException {
78 synchronized (mutex) {
79 if (!waitForData()) {
80 return -1;
81 }
82
83 return buf.get() & 0xff;
84 }
85 }
86
87 @Override
88 public int read(byte[] b, int off, int len) throws IOException {
89 synchronized (mutex) {
90 if (!waitForData()) {
91 return -1;
92 }
93
94 int readBytes;
95
96 if (len > buf.remaining()) {
97 readBytes = buf.remaining();
98 } else {
99 readBytes = len;
100 }
101
102 buf.get(b, off, readBytes);
103
104 return readBytes;
105 }
106 }
107
108 private boolean waitForData() throws IOException {
109 if (released) {
110 return false;
111 }
112
113 synchronized (mutex) {
114 while (!released && buf.remaining() == 0 && exception == null) {
115 try {
116 mutex.wait();
117 } catch (InterruptedException e) {
118 IOException ioe = new IOException("Interrupted while waiting for more data");
119 ioe.initCause(e);
120 throw ioe;
121 }
122 }
123 }
124
125 if (exception != null) {
126 releaseBuffer();
127 throw exception;
128 }
129
130 if (closed && buf.remaining() == 0) {
131 releaseBuffer();
132
133 return false;
134 }
135
136 return true;
137 }
138
139 private void releaseBuffer() {
140 if (released) {
141 return;
142 }
143
144 released = true;
145 }
146
147 public void write(IoBuffer src) {
148 synchronized (mutex) {
149 if (closed) {
150 return;
151 }
152
153 if (buf.hasRemaining()) {
154 this.buf.compact();
155 this.buf.put(src);
156 this.buf.flip();
157 } else {
158 this.buf.clear();
159 this.buf.put(src);
160 this.buf.flip();
161 mutex.notifyAll();
162 }
163 }
164 }
165
166 public void throwException(IOException e) {
167 synchronized (mutex) {
168 if (exception == null) {
169 exception = e;
170
171 mutex.notifyAll();
172 }
173 }
174 }
175 }