1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.handler.stream;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.service.IoHandler;
27
28
29
30
31
32
33
34 class IoSessionInputStream extends InputStream {
35 private final Object mutex = new Object();
36
37 private final IoBuffer buf;
38
39 private volatile boolean closed;
40
41 private volatile boolean released;
42
43 private IOException exception;
44
45 public IoSessionInputStream() {
46 buf = IoBuffer.allocate(16);
47 buf.setAutoExpand(true);
48 buf.limit(0);
49 }
50
51 @Override
52 public int available() {
53 if (released) {
54 return 0;
55 }
56
57 synchronized (mutex) {
58 return buf.remaining();
59 }
60 }
61
62 @Override
63 public void close() {
64 if (closed) {
65 return;
66 }
67
68 synchronized (mutex) {
69 closed = true;
70 releaseBuffer();
71
72 mutex.notifyAll();
73 }
74 }
75
76 @Override
77 public int read() throws IOException {
78 synchronized (mutex) {
79 if (!waitForData()) {
80 return -1;
81 }
82
83 return buf.get() & 0xff;
84 }
85 }
86
87 @Override
88 public int read(byte[] b, int off, int len) throws IOException {
89 synchronized (mutex) {
90 if (!waitForData()) {
91 return -1;
92 }
93
94 int readBytes;
95
96 if (len > buf.remaining()) {
97 readBytes = buf.remaining();
98 } else {
99 readBytes = len;
100 }
101
102 buf.get(b, off, readBytes);
103
104 return readBytes;
105 }
106 }
107
108 private boolean waitForData() throws IOException {
109 if (released) {
110 return false;
111 }
112
113 synchronized (mutex) {
114 while (!released && buf.remaining() == 0 && exception == null) {
115 try {
116 mutex.wait();
117 } catch (InterruptedException e) {
118 IOException ioe = new IOException(
119 "Interrupted while waiting for more data");
120 ioe.initCause(e);
121 throw ioe;
122 }
123 }
124 }
125
126 if (exception != null) {
127 releaseBuffer();
128 throw exception;
129 }
130
131 if (closed && buf.remaining() == 0) {
132 releaseBuffer();
133
134 return false;
135 }
136
137 return true;
138 }
139
140 private void releaseBuffer() {
141 if (released) {
142 return;
143 }
144
145 released = true;
146 }
147
148 public void write(IoBuffer src) {
149 synchronized (mutex) {
150 if (closed) {
151 return;
152 }
153
154 if (buf.hasRemaining()) {
155 this.buf.compact();
156 this.buf.put(src);
157 this.buf.flip();
158 } else {
159 this.buf.clear();
160 this.buf.put(src);
161 this.buf.flip();
162 mutex.notifyAll();
163 }
164 }
165 }
166
167 public void throwException(IOException e) {
168 synchronized (mutex) {
169 if (exception == null) {
170 exception = e;
171
172 mutex.notifyAll();
173 }
174 }
175 }
176 }