1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.Queue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.RuntimeIoException;
32 import org.apache.mina.core.buffer.IoBuffer;
33 import org.apache.mina.core.file.FileRegion;
34 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35 import org.apache.mina.core.session.SessionState;
36 import org.apache.tomcat.jni.File;
37 import org.apache.tomcat.jni.Poll;
38 import org.apache.tomcat.jni.Pool;
39 import org.apache.tomcat.jni.Socket;
40 import org.apache.tomcat.jni.Status;
41
42
43
44
45
46
47
48 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
49 private static final int POLLSET_SIZE = 1024;
50
51 private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
52
53 private final Object wakeupLock = new Object();
54 private final long wakeupSocket;
55 private volatile boolean toBeWakenUp;
56
57 private final long pool;
58 private final long bufferPool;
59 private final long pollset;
60 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
61 private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>();
62
63
64
65
66
67
68
69
70 public AprIoProcessor(Executor executor) {
71 super(executor);
72
73
74 pool = Pool.create(AprLibrary.getInstance().getRootPool());
75 bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
76
77 try {
78 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
79 } catch (RuntimeException e) {
80 throw e;
81 } catch (Error e) {
82 throw e;
83 } catch (Exception e) {
84 throw new RuntimeIoException("Failed to create a wakeup socket.", e);
85 }
86
87 boolean success = false;
88 long newPollset;
89 try {
90 newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
91
92 if (newPollset == 0) {
93 newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
94 }
95
96 pollset = newPollset;
97 if (pollset < 0) {
98 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
99 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
100 }
101 }
102 success = true;
103 } catch (RuntimeException e) {
104 throw e;
105 } catch (Error e) {
106 throw e;
107 } catch (Exception e) {
108 throw new RuntimeIoException("Failed to create a pollset.", e);
109 } finally {
110 if (!success) {
111 dispose();
112 }
113 }
114 }
115
116
117
118
119 @Override
120 protected void dispose0() {
121 Poll.destroy(pollset);
122 Socket.close(wakeupSocket);
123 Pool.destroy(bufferPool);
124 Pool.destroy(pool);
125 }
126
127
128
129
130 @Override
131 protected int select() throws Exception {
132 return select(Integer.MAX_VALUE);
133 }
134
135
136
137
138 @Override
139 protected int select(long timeout) throws Exception {
140 int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
141 if (rv <= 0) {
142 if (rv != -120001) {
143 throwException(rv);
144 }
145
146 rv = Poll.maintain(pollset, polledSockets, true);
147 if (rv > 0) {
148 for (int i = 0; i < rv; i++) {
149 long socket = polledSockets[i];
150 AprSession session = allSessions.get(socket);
151 if (session == null) {
152 continue;
153 }
154
155 int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
156 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
157
158 Poll.add(pollset, socket, flag);
159 }
160 } else if (rv < 0) {
161 throwException(rv);
162 }
163
164 return 0;
165 } else {
166 rv <<= 1;
167 if (!polledSessions.isEmpty()) {
168 polledSessions.clear();
169 }
170 for (int i = 0; i < rv; i++) {
171 long flag = polledSockets[i];
172 long socket = polledSockets[++i];
173 if (socket == wakeupSocket) {
174 synchronized (wakeupLock) {
175 Poll.remove(pollset, wakeupSocket);
176 toBeWakenUp = false;
177 }
178 continue;
179 }
180 AprSession session = allSessions.get(socket);
181 if (session == null) {
182 continue;
183 }
184
185 session.setReadable((flag & Poll.APR_POLLIN) != 0);
186 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
187
188 polledSessions.add(session);
189 }
190
191 return polledSessions.size();
192 }
193 }
194
195
196
197
198 @Override
199 protected boolean isSelectorEmpty() {
200 return allSessions.isEmpty();
201 }
202
203
204
205
206 @Override
207 protected void wakeup() {
208 if (toBeWakenUp) {
209 return;
210 }
211
212
213 synchronized (wakeupLock) {
214 toBeWakenUp = true;
215 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
216 }
217 }
218
219
220
221
222 @Override
223 protected Iterator<AprSession> allSessions() {
224 return allSessions.values().iterator();
225 }
226
227
228
229
230 @Override
231 protected Iterator<AprSession> selectedSessions() {
232 return polledSessions.iterator();
233 }
234
235 @Override
236 protected void init(AprSession session) throws Exception {
237 long s = session.getDescriptor();
238 Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
239 Socket.timeoutSet(s, 0);
240
241 int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
242 if (rv != Status.APR_SUCCESS) {
243 throwException(rv);
244 }
245
246 session.setInterestedInRead(true);
247 allSessions.put(s, session);
248 }
249
250
251
252
253 @Override
254 protected void destroy(AprSession session) throws Exception {
255 if (allSessions.remove(session.getDescriptor()) == null) {
256
257 return;
258 }
259
260 int ret = Poll.remove(pollset, session.getDescriptor());
261 try {
262 if (ret != Status.APR_SUCCESS) {
263 throwException(ret);
264 }
265 } finally {
266 ret = Socket.close(session.getDescriptor());
267
268
269
270 Socket.destroy(session.getDescriptor());
271 session.setDescriptor(0);
272
273 if (ret != Status.APR_SUCCESS) {
274 throwException(ret);
275 }
276 }
277 }
278
279
280
281
282 @Override
283 protected SessionState getState(AprSession session) {
284 long socket = session.getDescriptor();
285
286 if (socket != 0) {
287 return SessionState.OPENED;
288 } else if (allSessions.get(socket) != null) {
289 return SessionState.OPENING;
290 } else {
291 return SessionState.CLOSING;
292 }
293 }
294
295
296
297
298 @Override
299 protected boolean isReadable(AprSession session) {
300 return session.isReadable();
301 }
302
303
304
305
306 @Override
307 protected boolean isWritable(AprSession session) {
308 return session.isWritable();
309 }
310
311
312
313
314 @Override
315 protected boolean isInterestedInRead(AprSession session) {
316 return session.isInterestedInRead();
317 }
318
319
320
321
322 @Override
323 protected boolean isInterestedInWrite(AprSession session) {
324 return session.isInterestedInWrite();
325 }
326
327
328
329
330 @Override
331 protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
332 if (session.isInterestedInRead() == isInterested) {
333 return;
334 }
335
336 int rv = Poll.remove(pollset, session.getDescriptor());
337
338 if (rv != Status.APR_SUCCESS) {
339 throwException(rv);
340 }
341
342 int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
343
344 rv = Poll.add(pollset, session.getDescriptor(), flags);
345
346 if (rv == Status.APR_SUCCESS) {
347 session.setInterestedInRead(isInterested);
348 } else {
349 throwException(rv);
350 }
351 }
352
353
354
355
356 @Override
357 protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
358 if (session.isInterestedInWrite() == isInterested) {
359 return;
360 }
361
362 int rv = Poll.remove(pollset, session.getDescriptor());
363
364 if (rv != Status.APR_SUCCESS) {
365 throwException(rv);
366 }
367
368 int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
369
370 rv = Poll.add(pollset, session.getDescriptor(), flags);
371
372 if (rv == Status.APR_SUCCESS) {
373 session.setInterestedInWrite(isInterested);
374 } else {
375 throwException(rv);
376 }
377 }
378
379
380
381
382 @Override
383 protected int read(AprSession session, IoBuffer buffer) throws Exception {
384 int bytes;
385 int capacity = buffer.remaining();
386
387 ByteBuffer b = Pool.alloc(bufferPool, capacity);
388
389 try {
390 bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
391
392 if (bytes > 0) {
393 b.position(0);
394 b.limit(bytes);
395 buffer.put(b);
396 } else if (bytes < 0) {
397 if (Status.APR_STATUS_IS_EOF(-bytes)) {
398 bytes = -1;
399 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
400 bytes = 0;
401 } else {
402 throwException(bytes);
403 }
404 }
405 } finally {
406 Pool.clear(bufferPool);
407 }
408
409 return bytes;
410 }
411
412
413
414
415 @Override
416 protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
417 int writtenBytes;
418 if (buf.isDirect()) {
419 writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
420 } else {
421 writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
422 if (writtenBytes > 0) {
423 buf.skip(writtenBytes);
424 }
425 }
426
427 if (writtenBytes < 0) {
428 if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
429 writtenBytes = 0;
430 } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
431 writtenBytes = 0;
432 } else {
433 throwException(writtenBytes);
434 }
435 }
436 return writtenBytes;
437 }
438
439
440
441
442 @Override
443 protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
444 if (region.getFilename() == null) {
445 throw new UnsupportedOperationException();
446 }
447
448 long fd = File.open(region.getFilename(),
449 File.APR_FOPEN_READ
450 | File.APR_FOPEN_SENDFILE_ENABLED
451 | File.APR_FOPEN_BINARY,
452 0,
453 Socket.pool(session.getDescriptor()));
454 long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0);
455 File.close(fd);
456
457 if (numWritten < 0) {
458 if (numWritten == -Status.EAGAIN) {
459 return 0;
460 }
461 throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten + ")");
462 }
463 return (int) numWritten;
464 }
465
466 private void throwException(int code) throws IOException {
467 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
468 }
469
470
471
472
473 protected void registerNewSelector() {
474
475 }
476
477
478
479
480 protected boolean isBrokenConnection() throws IOException {
481
482 return true;
483 }
484 }