1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.Executor;
29
30 import org.apache.mina.core.RuntimeIoException;
31 import org.apache.mina.core.buffer.IoBuffer;
32 import org.apache.mina.core.file.FileRegion;
33 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
34 import org.apache.mina.util.CircularQueue;
35 import org.apache.tomcat.jni.Poll;
36 import org.apache.tomcat.jni.Pool;
37 import org.apache.tomcat.jni.Socket;
38 import org.apache.tomcat.jni.Status;
39
40
41
42
43
44
45
46 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
47 private static final int POLLSET_SIZE = 1024;
48
49 private final Map<Long, AprSession> allSessions =
50 new HashMap<Long, AprSession>(POLLSET_SIZE);
51
52 private final Object wakeupLock = new Object();
53 private final long wakeupSocket;
54 private volatile boolean toBeWakenUp;
55
56 private final long pool;
57 private final long bufferPool;
58 private final long pollset;
59 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
60 private final List<AprSession> polledSessions =
61 new CircularQueue<AprSession>(POLLSET_SIZE);
62
63 public AprIoProcessor(Executor executor) {
64 super(executor);
65
66
67 pool = Pool.create(AprLibrary.getInstance().getRootPool());
68 bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
69
70 try {
71 wakeupSocket = Socket.create(
72 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
73 } catch (RuntimeException e) {
74 throw e;
75 } catch (Error e) {
76 throw e;
77 } catch (Exception e) {
78 throw new RuntimeIoException("Failed to create a wakeup socket.", e);
79 }
80
81 boolean success = false;
82 long newPollset;
83 try {
84 newPollset = Poll.create(
85 POLLSET_SIZE,
86 pool,
87 Poll.APR_POLLSET_THREADSAFE,
88 Long.MAX_VALUE);
89
90 if (newPollset == 0) {
91 newPollset = Poll.create(
92 62,
93 pool,
94 Poll.APR_POLLSET_THREADSAFE,
95 Long.MAX_VALUE);
96 }
97
98 pollset = newPollset;
99 if (pollset < 0) {
100 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
101 throw new RuntimeIoException(
102 "Thread-safe pollset is not supported in this platform.");
103 }
104 }
105 success = true;
106 } catch (RuntimeException e) {
107 throw e;
108 } catch (Error e) {
109 throw e;
110 } catch (Exception e) {
111 throw new RuntimeIoException("Failed to create a pollset.", e);
112 } finally {
113 if (!success) {
114 dispose();
115 }
116 }
117 }
118
119 @Override
120 protected void dispose0() {
121 Poll.destroy(pollset);
122 Socket.close(wakeupSocket);
123 Pool.destroy(bufferPool);
124 Pool.destroy(pool);
125 }
126
127 @Override
128 protected boolean select(int timeout) throws Exception {
129 int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
130 if (rv <= 0) {
131 if (rv != -120001) {
132 throwException(rv);
133 }
134
135 rv = Poll.maintain(pollset, polledSockets, true);
136 if (rv > 0) {
137 for (int i = 0; i < rv; i ++) {
138 long socket = polledSockets[i];
139 AprSession session = allSessions.get(socket);
140 if (session == null) {
141 continue;
142 }
143
144 int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
145 (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
146
147 Poll.add(pollset, socket, flag);
148 }
149 } else if (rv < 0) {
150 throwException(rv);
151 }
152
153 return false;
154 } else {
155 rv <<= 1;
156 if (!polledSessions.isEmpty()) {
157 polledSessions.clear();
158 }
159 for (int i = 0; i < rv; i ++) {
160 long flag = polledSockets[i];
161 long socket = polledSockets[++i];
162 if (socket == wakeupSocket) {
163 synchronized (wakeupLock) {
164 Poll.remove(pollset, wakeupSocket);
165 toBeWakenUp = false;
166 }
167 continue;
168 }
169 AprSession session = allSessions.get(socket);
170 if (session == null) {
171 continue;
172 }
173
174 session.setReadable((flag & Poll.APR_POLLIN) != 0);
175 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
176
177 polledSessions.add(session);
178 }
179
180 return !polledSessions.isEmpty();
181 }
182 }
183
184 @Override
185 protected boolean isSelectorEmpty() {
186 return allSessions.isEmpty();
187 }
188
189 @Override
190 protected void wakeup() {
191 if (toBeWakenUp) {
192 return;
193 }
194
195
196 synchronized (wakeupLock) {
197 toBeWakenUp = true;
198 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
199 }
200 }
201
202 @Override
203 protected Iterator<AprSession> allSessions() {
204 return allSessions.values().iterator();
205 }
206
207 @Override
208 protected Iterator<AprSession> selectedSessions() {
209 return polledSessions.iterator();
210 }
211
212 @Override
213 protected void init(AprSession session) throws Exception {
214 long s = session.getDescriptor();
215 Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
216 Socket.timeoutSet(s, 0);
217
218 int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
219 if (rv != Status.APR_SUCCESS) {
220 throwException(rv);
221 }
222
223 session.setInterestedInRead(true);
224 allSessions.put(s, session);
225 }
226
227 @Override
228 protected void destroy(AprSession session) throws Exception {
229 if (allSessions.remove(session.getDescriptor()) == null) {
230
231 return;
232 }
233
234 int ret = Poll.remove(pollset, session.getDescriptor());
235 try {
236 if (ret != Status.APR_SUCCESS) {
237 throwException(ret);
238 }
239 } finally {
240 ret = Socket.close(session.getDescriptor());
241
242
243
244 Socket.destroy(session.getDescriptor());
245 session.setDescriptor(0);
246
247 if (ret != Status.APR_SUCCESS) {
248 throwException(ret);
249 }
250 }
251 }
252
253 @Override
254 protected SessionState state(AprSession session) {
255 long socket = session.getDescriptor();
256 if (socket != 0) {
257 return SessionState.OPEN;
258 } else if (allSessions.get(socket) != null) {
259 return SessionState.PREPARING;
260 } else {
261 return SessionState.CLOSED;
262 }
263 }
264
265 @Override
266 protected boolean isReadable(AprSession session) {
267 return session.isReadable();
268 }
269
270 @Override
271 protected boolean isWritable(AprSession session) {
272 return session.isWritable();
273 }
274
275 @Override
276 protected boolean isInterestedInRead(AprSession session) {
277 return session.isInterestedInRead();
278 }
279
280 @Override
281 protected boolean isInterestedInWrite(AprSession session) {
282 return session.isInterestedInWrite();
283 }
284
285 @Override
286 protected void setInterestedInRead(AprSession session, boolean value) throws Exception {
287 if (session.isInterestedInRead() == value) {
288 return;
289 }
290
291 int rv = Poll.remove(pollset, session.getDescriptor());
292 if (rv != Status.APR_SUCCESS) {
293 throwException(rv);
294 }
295
296 int flags = (value ? Poll.APR_POLLIN : 0)
297 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
298
299 rv = Poll.add(pollset, session.getDescriptor(), flags);
300 if (rv == Status.APR_SUCCESS) {
301 session.setInterestedInRead(value);
302 } else {
303 throwException(rv);
304 }
305 }
306
307 @Override
308 protected void setInterestedInWrite(AprSession session, boolean value) throws Exception {
309 if (session.isInterestedInWrite() == value) {
310 return;
311 }
312
313 int rv = Poll.remove(pollset, session.getDescriptor());
314 if (rv != Status.APR_SUCCESS) {
315 throwException(rv);
316 }
317
318 int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
319 | (value ? Poll.APR_POLLOUT : 0);
320
321 rv = Poll.add(pollset, session.getDescriptor(), flags);
322 if (rv == Status.APR_SUCCESS) {
323 session.setInterestedInWrite(value);
324 } else {
325 throwException(rv);
326 }
327 }
328
329 @Override
330 protected int read(AprSession session, IoBuffer buffer) throws Exception {
331 int bytes;
332 int capacity = buffer.remaining();
333
334 ByteBuffer b = Pool.alloc(bufferPool, capacity);
335 try {
336 bytes = Socket.recvb(
337 session.getDescriptor(), b, 0, capacity);
338 if (bytes > 0) {
339 b.position(0);
340 b.limit(bytes);
341 buffer.put(b);
342 } else if (bytes < 0) {
343 if (Status.APR_STATUS_IS_EOF(-bytes)) {
344 bytes = -1;
345 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
346 bytes = 0;
347 } else {
348 throwException(bytes);
349 }
350 }
351 } finally {
352 Pool.clear(bufferPool);
353 }
354 return bytes;
355 }
356
357 @Override
358 protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
359 int writtenBytes;
360 if (buf.isDirect()) {
361 writtenBytes = Socket.sendb(
362 session.getDescriptor(), buf.buf(), buf.position(), length);
363 } else {
364 writtenBytes = Socket.send(
365 session.getDescriptor(), buf.array(), buf.position(), length);
366 if (writtenBytes > 0) {
367 buf.skip(writtenBytes);
368 }
369 }
370
371 if (writtenBytes < 0) {
372 if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
373 writtenBytes = 0;
374 } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
375 writtenBytes = 0;
376 } else {
377 throwException(writtenBytes);
378 }
379 }
380 return writtenBytes;
381 }
382
383 @Override
384 protected int transferFile(AprSession session, FileRegion region, int length)
385 throws Exception {
386 throw new UnsupportedOperationException();
387 }
388
389 private void throwException(int code) throws IOException {
390 throw new IOException(
391 org.apache.tomcat.jni.Error.strerror(-code) +
392 " (code: " + code + ")");
393 }
394 }