1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.spi.SelectorProvider;
26 import java.util.Iterator;
27 import java.util.Queue;
28 import java.util.concurrent.ConcurrentLinkedQueue;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.RuntimeIoException;
32 import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
33 import org.apache.mina.core.service.IoAcceptor;
34 import org.apache.mina.core.service.IoProcessor;
35 import org.apache.mina.core.service.IoService;
36 import org.apache.mina.core.service.SimpleIoProcessorPool;
37 import org.apache.mina.core.service.TransportMetadata;
38 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39 import org.apache.tomcat.jni.Address;
40 import org.apache.tomcat.jni.Poll;
41 import org.apache.tomcat.jni.Pool;
42 import org.apache.tomcat.jni.Socket;
43 import org.apache.tomcat.jni.Status;
44
45
46
47
48
49
50 public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> {
51
52
53
54
55 private static final int APR_TIMEUP_ERROR = -120001;
56
57 private static final int POLLSET_SIZE = 1024;
58
59 private final Object wakeupLock = new Object();
60
61 private volatile long wakeupSocket;
62
63 private volatile boolean toBeWakenUp;
64
65 private volatile long pool;
66
67 private volatile long pollset;
68
69 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
70
71 private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
72
73
74
75
76 public AprSocketAcceptor() {
77 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
78 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79 }
80
81
82
83
84
85
86
87
88 public AprSocketAcceptor(int processorCount) {
89 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
90 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91 }
92
93
94
95
96
97
98
99 public AprSocketAcceptor(IoProcessor<AprSession> processor) {
100 super(new DefaultSocketSessionConfig(), processor);
101 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
102 }
103
104
105
106
107
108
109
110
111 public AprSocketAcceptor(Executor executor, IoProcessor<AprSession> processor) {
112 super(new DefaultSocketSessionConfig(), executor, processor);
113 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
114 }
115
116
117
118
119 @Override
120 protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
121 long s = Socket.accept(handle);
122 boolean success = false;
123 try {
124 AprSession result = new AprSocketSession(this, processor, s);
125 success = true;
126 return result;
127 } finally {
128 if (!success) {
129 Socket.close(s);
130 }
131 }
132 }
133
134
135
136
137 @Override
138 protected Long open(SocketAddress localAddress) throws Exception {
139 InetSocketAddress la = (InetSocketAddress) localAddress;
140 long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
141
142 boolean success = false;
143 try {
144 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
145 if (result != Status.APR_SUCCESS) {
146 throwException(result);
147 }
148 result = Socket.timeoutSet(handle, 0);
149 if (result != Status.APR_SUCCESS) {
150 throwException(result);
151 }
152
153
154 result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress() ? 1 : 0);
155 if (result != Status.APR_SUCCESS) {
156 throwException(result);
157 }
158 result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
159 if (result != Status.APR_SUCCESS) {
160 throwException(result);
161 }
162
163
164 long sa;
165 if (la != null) {
166 if (la.getAddress() == null) {
167 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
168 } else {
169 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
170 }
171 } else {
172 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
173 }
174
175 result = Socket.bind(handle, sa);
176 if (result != Status.APR_SUCCESS) {
177 throwException(result);
178 }
179 result = Socket.listen(handle, getBacklog());
180 if (result != Status.APR_SUCCESS) {
181 throwException(result);
182 }
183
184 result = Poll.add(pollset, handle, Poll.APR_POLLIN);
185 if (result != Status.APR_SUCCESS) {
186 throwException(result);
187 }
188 success = true;
189 } finally {
190 if (!success) {
191 close(handle);
192 }
193 }
194 return handle;
195 }
196
197
198
199
200 @Override
201 protected void init() throws Exception {
202
203 pool = Pool.create(AprLibrary.getInstance().getRootPool());
204
205 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
206
207 pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
208
209 if (pollset <= 0) {
210 pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
211 }
212
213 if (pollset <= 0) {
214 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
215 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
216 }
217 }
218 }
219
220
221
222
223 @Override
224 protected void destroy() throws Exception {
225 if (wakeupSocket > 0) {
226 Socket.close(wakeupSocket);
227 }
228 if (pollset > 0) {
229 Poll.destroy(pollset);
230 }
231 if (pool > 0) {
232 Pool.destroy(pool);
233 }
234 }
235
236
237
238
239 @Override
240 protected SocketAddress localAddress(Long handle) throws Exception {
241 long la = Address.get(Socket.APR_LOCAL, handle);
242 return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
243 }
244
245
246
247
248 @Override
249 protected int select() throws Exception {
250 int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
251 if (rv <= 0) {
252
253
254 if (rv != APR_TIMEUP_ERROR) {
255
256 throwException(rv);
257 }
258
259 rv = Poll.maintain(pollset, polledSockets, true);
260 if (rv > 0) {
261 for (int i = 0; i < rv; i++) {
262 Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
263 }
264 } else if (rv < 0) {
265 throwException(rv);
266 }
267
268 return 0;
269 } else {
270 rv <<= 1;
271 if (!polledHandles.isEmpty()) {
272 polledHandles.clear();
273 }
274
275 for (int i = 0; i < rv; i++) {
276 long flag = polledSockets[i];
277 long socket = polledSockets[++i];
278 if (socket == wakeupSocket) {
279 synchronized (wakeupLock) {
280 Poll.remove(pollset, wakeupSocket);
281 toBeWakenUp = false;
282 }
283 continue;
284 }
285
286 if ((flag & Poll.APR_POLLIN) != 0) {
287 polledHandles.add(socket);
288 }
289 }
290 return polledHandles.size();
291 }
292 }
293
294
295
296
297 @Override
298 protected Iterator<Long> selectedHandles() {
299 return polledHandles.iterator();
300 }
301
302
303
304
305 @Override
306 protected void close(Long handle) throws Exception {
307 Poll.remove(pollset, handle);
308 int result = Socket.close(handle);
309 if (result != Status.APR_SUCCESS) {
310 throwException(result);
311 }
312 }
313
314
315
316
317 @Override
318 protected void wakeup() {
319 if (toBeWakenUp) {
320 return;
321 }
322
323
324 synchronized (wakeupLock) {
325 toBeWakenUp = true;
326 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
327 }
328 }
329
330
331
332
333 @Override
334 public InetSocketAddress getLocalAddress() {
335 return (InetSocketAddress) super.getLocalAddress();
336 }
337
338
339
340
341 @Override
342 public InetSocketAddress getDefaultLocalAddress() {
343 return (InetSocketAddress) super.getDefaultLocalAddress();
344 }
345
346
347
348
349
350
351 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
352 super.setDefaultLocalAddress(localAddress);
353 }
354
355
356
357
358 public TransportMetadata getTransportMetadata() {
359 return AprSocketSession.METADATA;
360 }
361
362
363
364
365
366
367 private void throwException(int code) throws IOException {
368 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
369 }
370
371 @Override
372 protected void init(SelectorProvider selectorProvider) throws Exception {
373 init();
374 }
375 }