1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.Queue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.RuntimeIoException;
32 import org.apache.mina.core.buffer.IoBuffer;
33 import org.apache.mina.core.file.FileRegion;
34 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35 import org.apache.mina.core.session.SessionState;
36 import org.apache.tomcat.jni.File;
37 import org.apache.tomcat.jni.Poll;
38 import org.apache.tomcat.jni.Pool;
39 import org.apache.tomcat.jni.Socket;
40 import org.apache.tomcat.jni.Status;
41
42
43
44
45
46
47
48 public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
49 private static final int POLLSET_SIZE = 1024;
50
51 private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
52
53 private final Object wakeupLock = new Object();
54
55 private final long wakeupSocket;
56
57 private volatile boolean toBeWakenUp;
58
59 private final long pool;
60
61 private final long bufferPool;
62
63 private final long pollset;
64
65 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
66
67 private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>();
68
69
70
71
72
73
74
75
76 public AprIoProcessor(Executor executor) {
77 super(executor);
78
79
80 pool = Pool.create(AprLibrary.getInstance().getRootPool());
81 bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
82
83 try {
84 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
85 } catch (RuntimeException e) {
86 throw e;
87 } catch (Error e) {
88 throw e;
89 } catch (Exception e) {
90 throw new RuntimeIoException("Failed to create a wakeup socket.", e);
91 }
92
93 boolean success = false;
94 long newPollset;
95 try {
96 newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
97
98 if (newPollset == 0) {
99 newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
100 }
101
102 pollset = newPollset;
103 if (pollset < 0) {
104 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
105 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
106 }
107 }
108 success = true;
109 } catch (RuntimeException e) {
110 throw e;
111 } catch (Error e) {
112 throw e;
113 } catch (Exception e) {
114 throw new RuntimeIoException("Failed to create a pollset.", e);
115 } finally {
116 if (!success) {
117 dispose();
118 }
119 }
120 }
121
122
123
124
125 @Override
126 protected void doDispose() {
127 Poll.destroy(pollset);
128 Socket.close(wakeupSocket);
129 Pool.destroy(bufferPool);
130 Pool.destroy(pool);
131 }
132
133
134
135
136 @Override
137 protected int select() throws Exception {
138 return select(Integer.MAX_VALUE);
139 }
140
141
142
143
144 @Override
145 protected int select(long timeout) throws Exception {
146 int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
147 if (rv <= 0) {
148 if (rv != -120001) {
149 throwException(rv);
150 }
151
152 rv = Poll.maintain(pollset, polledSockets, true);
153 if (rv > 0) {
154 for (int i = 0; i < rv; i++) {
155 long socket = polledSockets[i];
156 AprSession session = allSessions.get(socket);
157 if (session == null) {
158 continue;
159 }
160
161 int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
162 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
163
164 Poll.add(pollset, socket, flag);
165 }
166 } else if (rv < 0) {
167 throwException(rv);
168 }
169
170 return 0;
171 } else {
172 rv <<= 1;
173 if (!polledSessions.isEmpty()) {
174 polledSessions.clear();
175 }
176 for (int i = 0; i < rv; i++) {
177 long flag = polledSockets[i];
178 long socket = polledSockets[++i];
179 if (socket == wakeupSocket) {
180 synchronized (wakeupLock) {
181 Poll.remove(pollset, wakeupSocket);
182 toBeWakenUp = false;
183 wakeupCalled.set(true);
184 }
185 continue;
186 }
187 AprSession session = allSessions.get(socket);
188 if (session == null) {
189 continue;
190 }
191
192 session.setReadable((flag & Poll.APR_POLLIN) != 0);
193 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
194
195 polledSessions.add(session);
196 }
197
198 return polledSessions.size();
199 }
200 }
201
202
203
204
205 @Override
206 protected boolean isSelectorEmpty() {
207 return allSessions.isEmpty();
208 }
209
210
211
212
213 @Override
214 protected void wakeup() {
215 if (toBeWakenUp) {
216 return;
217 }
218
219
220 synchronized (wakeupLock) {
221 toBeWakenUp = true;
222 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
223 }
224 }
225
226
227
228
229 @Override
230 protected Iterator<AprSession> allSessions() {
231 return allSessions.values().iterator();
232 }
233
234
235
236
237 @Override
238 protected int allSessionsCount() {
239 return allSessions.size();
240 }
241
242
243
244
245 @Override
246 protected Iterator<AprSession> selectedSessions() {
247 return polledSessions.iterator();
248 }
249
250 @Override
251 protected void init(AprSession session) throws Exception {
252 long s = session.getDescriptor();
253 Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
254 Socket.timeoutSet(s, 0);
255
256 int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
257 if (rv != Status.APR_SUCCESS) {
258 throwException(rv);
259 }
260
261 session.setInterestedInRead(true);
262 allSessions.put(s, session);
263 }
264
265
266
267
268 @Override
269 protected void destroy(AprSession session) throws Exception {
270 if (allSessions.remove(session.getDescriptor()) == null) {
271
272 return;
273 }
274
275 int ret = Poll.remove(pollset, session.getDescriptor());
276 try {
277 if (ret != Status.APR_SUCCESS) {
278 throwException(ret);
279 }
280 } finally {
281 ret = Socket.close(session.getDescriptor());
282
283
284
285 Socket.destroy(session.getDescriptor());
286 session.setDescriptor(0);
287
288 if (ret != Status.APR_SUCCESS) {
289 throwException(ret);
290 }
291 }
292 }
293
294
295
296
297 @Override
298 protected SessionState getState(AprSession session) {
299 long socket = session.getDescriptor();
300
301 if (socket != 0) {
302 return SessionState.OPENED;
303 } else if (allSessions.get(socket) != null) {
304 return SessionState.OPENING;
305 } else {
306 return SessionState.CLOSING;
307 }
308 }
309
310
311
312
313 @Override
314 protected boolean isReadable(AprSession session) {
315 return session.isReadable();
316 }
317
318
319
320
321 @Override
322 protected boolean isWritable(AprSession session) {
323 return session.isWritable();
324 }
325
326
327
328
329 @Override
330 protected boolean isInterestedInRead(AprSession session) {
331 return session.isInterestedInRead();
332 }
333
334
335
336
337 @Override
338 protected boolean isInterestedInWrite(AprSession session) {
339 return session.isInterestedInWrite();
340 }
341
342
343
344
345 @Override
346 protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
347 if (session.isInterestedInRead() == isInterested) {
348 return;
349 }
350
351 int rv = Poll.remove(pollset, session.getDescriptor());
352
353 if (rv != Status.APR_SUCCESS) {
354 throwException(rv);
355 }
356
357 int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
358
359 rv = Poll.add(pollset, session.getDescriptor(), flags);
360
361 if (rv == Status.APR_SUCCESS) {
362 session.setInterestedInRead(isInterested);
363 } else {
364 throwException(rv);
365 }
366 }
367
368
369
370
371 @Override
372 protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
373 if (session.isInterestedInWrite() == isInterested) {
374 return;
375 }
376
377 int rv = Poll.remove(pollset, session.getDescriptor());
378
379 if (rv != Status.APR_SUCCESS) {
380 throwException(rv);
381 }
382
383 int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
384
385 rv = Poll.add(pollset, session.getDescriptor(), flags);
386
387 if (rv == Status.APR_SUCCESS) {
388 session.setInterestedInWrite(isInterested);
389 } else {
390 throwException(rv);
391 }
392 }
393
394
395
396
397 @Override
398 protected int read(AprSession session, IoBuffer buffer) throws Exception {
399 int bytes;
400 int capacity = buffer.remaining();
401
402 ByteBuffer b = Pool.alloc(bufferPool, capacity);
403
404 try {
405 bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
406
407 if (bytes > 0) {
408 b.position(0);
409 b.limit(bytes);
410 buffer.put(b);
411 } else if (bytes < 0) {
412 if (Status.APR_STATUS_IS_EOF(-bytes)) {
413 bytes = -1;
414 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
415 bytes = 0;
416 } else {
417 throwException(bytes);
418 }
419 }
420 } finally {
421 Pool.clear(bufferPool);
422 }
423
424 return bytes;
425 }
426
427
428
429
430 @Override
431 protected int write(AprSession session, IoBuffer buf, int length) throws IOException {
432 int writtenBytes;
433 if (buf.isDirect()) {
434 writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
435 } else {
436 writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
437 if (writtenBytes > 0) {
438 buf.skip(writtenBytes);
439 }
440 }
441
442 if (writtenBytes < 0) {
443 if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
444 writtenBytes = 0;
445 } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
446 writtenBytes = 0;
447 } else {
448 throwException(writtenBytes);
449 }
450 }
451 return writtenBytes;
452 }
453
454
455
456
457 @Override
458 protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
459 if (region.getFilename() == null) {
460 throw new UnsupportedOperationException();
461 }
462
463 long fd = File.open(region.getFilename(), File.APR_FOPEN_READ | File.APR_FOPEN_SENDFILE_ENABLED
464 | File.APR_FOPEN_BINARY, 0, Socket.pool(session.getDescriptor()));
465 long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0);
466 File.close(fd);
467
468 if (numWritten < 0) {
469 if (numWritten == -Status.EAGAIN) {
470 return 0;
471 }
472 throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten
473 + ")");
474 }
475 return (int) numWritten;
476 }
477
478 private void throwException(int code) throws IOException {
479 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
480 }
481
482
483
484
485 @Override
486 protected void registerNewSelector() {
487
488 }
489
490
491
492
493 @Override
494 protected boolean isBrokenConnection() throws IOException {
495
496 return true;
497 }
498 }