1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28
29 import org.apache.mina.core.RuntimeIoException;
30 import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
31 import org.apache.mina.core.service.IoAcceptor;
32 import org.apache.mina.core.service.IoProcessor;
33 import org.apache.mina.core.service.IoService;
34 import org.apache.mina.core.service.SimpleIoProcessorPool;
35 import org.apache.mina.core.service.TransportMetadata;
36 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
37 import org.apache.mina.transport.socket.SocketAcceptor;
38 import org.apache.mina.transport.socket.SocketSessionConfig;
39 import org.apache.mina.util.CircularQueue;
40 import org.apache.tomcat.jni.Address;
41 import org.apache.tomcat.jni.Poll;
42 import org.apache.tomcat.jni.Pool;
43 import org.apache.tomcat.jni.Socket;
44 import org.apache.tomcat.jni.Status;
45
46
47
48
49
50
51 public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> implements SocketAcceptor {
52
53
54
55
56 private static final int APR_TIMEUP_ERROR = -120001;
57
58 private static final int POLLSET_SIZE = 1024;
59
60 private final Object wakeupLock = new Object();
61 private volatile long wakeupSocket;
62 private volatile boolean toBeWakenUp;
63
64 private int backlog = 50;
65 private boolean reuseAddress = false;
66
67 private volatile long pool;
68 private volatile long pollset;
69 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
70 private final List<Long> polledHandles =
71 new CircularQueue<Long>(POLLSET_SIZE);
72
73
74
75
76 public AprSocketAcceptor() {
77 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
78 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79 }
80
81
82
83
84
85
86
87
88 public AprSocketAcceptor(int processorCount) {
89 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
90 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91 }
92
93
94
95
96
97
98
99 public AprSocketAcceptor(IoProcessor<AprSession> processor) {
100 super(new DefaultSocketSessionConfig(), processor);
101 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
102 }
103
104
105
106
107
108
109
110
111 public AprSocketAcceptor(Executor executor,
112 IoProcessor<AprSession> processor) {
113 super(new DefaultSocketSessionConfig(), executor, processor);
114 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
115 }
116
117
118
119
120 @Override
121 protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
122 long s = Socket.accept(handle);
123 boolean success = false;
124 try {
125 AprSession result = new AprSocketSession(this, processor, s);
126 success = true;
127 return result;
128 } finally {
129 if (!success) {
130 Socket.close(s);
131 }
132 }
133 }
134
135
136
137
138 @Override
139 protected Long open(SocketAddress localAddress) throws Exception {
140 InetSocketAddress la = (InetSocketAddress) localAddress;
141 long handle = Socket.create(
142 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
143
144 boolean success = false;
145 try {
146 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
147 if (result != Status.APR_SUCCESS) {
148 throwException(result);
149 }
150 result = Socket.timeoutSet(handle, 0);
151 if (result != Status.APR_SUCCESS) {
152 throwException(result);
153 }
154
155
156 result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress()? 1 : 0);
157 if (result != Status.APR_SUCCESS) {
158 throwException(result);
159 }
160 result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
161 if (result != Status.APR_SUCCESS) {
162 throwException(result);
163 }
164
165
166 long sa;
167 if (la != null) {
168 if (la.getAddress() == null) {
169 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
170 } else {
171 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
172 }
173 } else {
174 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
175 }
176
177 result = Socket.bind(handle, sa);
178 if (result != Status.APR_SUCCESS) {
179 throwException(result);
180 }
181 result = Socket.listen(handle, getBacklog());
182 if (result != Status.APR_SUCCESS) {
183 throwException(result);
184 }
185
186 result = Poll.add(pollset, handle, Poll.APR_POLLIN);
187 if (result != Status.APR_SUCCESS) {
188 throwException(result);
189 }
190 success = true;
191 } finally {
192 if (!success) {
193 close(handle);
194 }
195 }
196 return handle;
197 }
198
199
200
201
202 @Override
203 protected void init() throws Exception {
204
205 pool = Pool.create(AprLibrary.getInstance().getRootPool());
206
207 wakeupSocket = Socket.create(
208 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
209
210 pollset = Poll.create(
211 POLLSET_SIZE,
212 pool,
213 Poll.APR_POLLSET_THREADSAFE,
214 Long.MAX_VALUE);
215
216 if (pollset <= 0) {
217 pollset = Poll.create(
218 62,
219 pool,
220 Poll.APR_POLLSET_THREADSAFE,
221 Long.MAX_VALUE);
222 }
223
224 if (pollset <= 0) {
225 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
226 throw new RuntimeIoException(
227 "Thread-safe pollset is not supported in this platform.");
228 }
229 }
230 }
231
232
233
234
235 @Override
236 protected void destroy() throws Exception {
237 if (wakeupSocket > 0) {
238 Socket.close(wakeupSocket);
239 }
240 if (pollset > 0) {
241 Poll.destroy(pollset);
242 }
243 if (pool > 0) {
244 Pool.destroy(pool);
245 }
246 }
247
248
249
250
251 @Override
252 protected SocketAddress localAddress(Long handle) throws Exception {
253 long la = Address.get(Socket.APR_LOCAL, handle);
254 return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
255 }
256
257
258
259
260 @Override
261 protected int select() throws Exception {
262 int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
263 if (rv <= 0) {
264
265
266 if (rv != APR_TIMEUP_ERROR) {
267
268 throwException(rv);
269 }
270
271 rv = Poll.maintain(pollset, polledSockets, true);
272 if (rv > 0) {
273 for (int i = 0; i < rv; i ++) {
274 Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
275 }
276 } else if (rv < 0) {
277 throwException(rv);
278 }
279
280 return 0;
281 } else {
282 rv <<= 1;
283 if (!polledHandles.isEmpty()) {
284 polledHandles.clear();
285 }
286
287 for (int i = 0; i < rv; i ++) {
288 long flag = polledSockets[i];
289 long socket = polledSockets[++i];
290 if (socket == wakeupSocket) {
291 synchronized (wakeupLock) {
292 Poll.remove(pollset, wakeupSocket);
293 toBeWakenUp = false;
294 }
295 continue;
296 }
297
298 if ((flag & Poll.APR_POLLIN) != 0) {
299 polledHandles.add(socket);
300 }
301 }
302 return polledHandles.size();
303 }
304 }
305
306
307
308
309 @Override
310 protected Iterator<Long> selectedHandles() {
311 return polledHandles.iterator();
312 }
313
314
315
316
317 @Override
318 protected void close(Long handle) throws Exception {
319 Poll.remove(pollset, handle);
320 int result = Socket.close(handle);
321 if (result != Status.APR_SUCCESS) {
322 throwException(result);
323 }
324 }
325
326
327
328
329 @Override
330 protected void wakeup() {
331 if (toBeWakenUp) {
332 return;
333 }
334
335
336 synchronized (wakeupLock) {
337 toBeWakenUp = true;
338 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
339 }
340 }
341
342
343
344
345 public int getBacklog() {
346 return backlog;
347 }
348
349
350
351
352 public boolean isReuseAddress() {
353 return reuseAddress;
354 }
355
356
357
358
359 public void setBacklog(int backlog) {
360 synchronized (bindLock) {
361 if (isActive()) {
362 throw new IllegalStateException(
363 "backlog can't be set while the acceptor is bound.");
364 }
365
366 this.backlog = backlog;
367 }
368 }
369
370
371
372
373 @Override
374 public InetSocketAddress getLocalAddress() {
375 return (InetSocketAddress) super.getLocalAddress();
376 }
377
378
379
380
381 @Override
382 public InetSocketAddress getDefaultLocalAddress() {
383 return (InetSocketAddress) super.getDefaultLocalAddress();
384 }
385
386
387
388
389 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
390 super.setDefaultLocalAddress(localAddress);
391 }
392
393 public void setReuseAddress(boolean reuseAddress) {
394 synchronized (bindLock) {
395 if (isActive()) {
396 throw new IllegalStateException(
397 "backlog can't be set while the acceptor is bound.");
398 }
399
400 this.reuseAddress = reuseAddress;
401 }
402 }
403
404
405
406
407 public TransportMetadata getTransportMetadata() {
408 return AprSocketSession.METADATA;
409 }
410
411
412
413
414 @Override
415 public SocketSessionConfig getSessionConfig() {
416 return (SocketSessionConfig) super.getSessionConfig();
417 }
418
419
420
421
422
423
424 private void throwException(int code) throws IOException {
425 throw new IOException(
426 org.apache.tomcat.jni.Error.strerror(-code) +
427 " (code: " + code + ")");
428 }
429 }