1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.concurrent.Executor;
28
29 import org.apache.mina.core.RuntimeIoException;
30 import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
31 import org.apache.mina.core.service.IoProcessor;
32 import org.apache.mina.core.service.TransportMetadata;
33 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
34 import org.apache.mina.transport.socket.SocketAcceptor;
35 import org.apache.mina.transport.socket.SocketSessionConfig;
36 import org.apache.mina.util.CircularQueue;
37 import org.apache.tomcat.jni.Address;
38 import org.apache.tomcat.jni.Poll;
39 import org.apache.tomcat.jni.Pool;
40 import org.apache.tomcat.jni.Socket;
41 import org.apache.tomcat.jni.Status;
42
43
44
45
46
47
48
49 public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> implements SocketAcceptor {
50
51 private static final int POLLSET_SIZE = 1024;
52
53 private final Object wakeupLock = new Object();
54 private volatile long wakeupSocket;
55 private volatile boolean toBeWakenUp;
56
57 private int backlog = 50;
58 private boolean reuseAddress = false;
59
60 private volatile long pool;
61 private volatile long pollset;
62 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
63 private final List<Long> polledHandles =
64 new CircularQueue<Long>(POLLSET_SIZE);
65
66 public AprSocketAcceptor() {
67 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
68 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69 }
70
71 public AprSocketAcceptor(int processorCount) {
72 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
73 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
74 }
75
76 public AprSocketAcceptor(IoProcessor<AprSession> processor) {
77 super(new DefaultSocketSessionConfig(), processor);
78 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79 }
80
81 public AprSocketAcceptor(Executor executor,
82 IoProcessor<AprSession> processor) {
83 super(new DefaultSocketSessionConfig(), executor, processor);
84 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
85 }
86
87 @Override
88 protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
89 long s = Socket.accept(handle);
90 boolean success = false;
91 try {
92 AprSession result = new AprSocketSession(this, processor, s);
93 success = true;
94 return result;
95 } finally {
96 if (!success) {
97 Socket.close(s);
98 }
99 }
100 }
101
102 @Override
103 protected Long open(SocketAddress localAddress) throws Exception {
104 InetSocketAddress la = (InetSocketAddress) localAddress;
105 long handle = Socket.create(
106 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
107
108 boolean success = false;
109 try {
110 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
111 if (result != Status.APR_SUCCESS) {
112 throwException(result);
113 }
114 result = Socket.timeoutSet(handle, 0);
115 if (result != Status.APR_SUCCESS) {
116 throwException(result);
117 }
118
119
120 result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress()? 1 : 0);
121 if (result != Status.APR_SUCCESS) {
122 throwException(result);
123 }
124 result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
125 if (result != Status.APR_SUCCESS) {
126 throwException(result);
127 }
128
129
130 long sa;
131 if (la != null) {
132 if (la.getAddress() == null) {
133 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
134 } else {
135 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
136 }
137 } else {
138 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
139 }
140
141 result = Socket.bind(handle, sa);
142 if (result != Status.APR_SUCCESS) {
143 throwException(result);
144 }
145 result = Socket.listen(handle, getBacklog());
146 if (result != Status.APR_SUCCESS) {
147 throwException(result);
148 }
149
150 result = Poll.add(pollset, handle, Poll.APR_POLLIN);
151 if (result != Status.APR_SUCCESS) {
152 throwException(result);
153 }
154 success = true;
155 } finally {
156 if (!success) {
157 close(handle);
158 }
159 }
160 return handle;
161 }
162
163 @Override
164 protected void init() throws Exception {
165
166 pool = Pool.create(AprLibrary.getInstance().getRootPool());
167
168 wakeupSocket = Socket.create(
169 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
170
171 pollset = Poll.create(
172 POLLSET_SIZE,
173 pool,
174 Poll.APR_POLLSET_THREADSAFE,
175 Long.MAX_VALUE);
176
177 if (pollset <= 0) {
178 pollset = Poll.create(
179 62,
180 pool,
181 Poll.APR_POLLSET_THREADSAFE,
182 Long.MAX_VALUE);
183 }
184
185 if (pollset <= 0) {
186 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
187 throw new RuntimeIoException(
188 "Thread-safe pollset is not supported in this platform.");
189 }
190 }
191 }
192
193 @Override
194 protected void destroy() throws Exception {
195 if (wakeupSocket > 0) {
196 Socket.close(wakeupSocket);
197 }
198 if (pollset > 0) {
199 Poll.destroy(pollset);
200 }
201 if (pool > 0) {
202 Pool.destroy(pool);
203 }
204 }
205
206 @Override
207 protected SocketAddress localAddress(Long handle) throws Exception {
208 long la = Address.get(Socket.APR_LOCAL, handle);
209 return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
210 }
211
212 @Override
213 protected boolean select() throws Exception {
214 int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
215 if (rv <= 0) {
216 if (rv != -120001) {
217 throwException(rv);
218 }
219
220 rv = Poll.maintain(pollset, polledSockets, true);
221 if (rv > 0) {
222 for (int i = 0; i < rv; i ++) {
223 Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
224 }
225 } else if (rv < 0) {
226 throwException(rv);
227 }
228
229 return false;
230 } else {
231 rv <<= 1;
232 if (!polledHandles.isEmpty()) {
233 polledHandles.clear();
234 }
235
236 for (int i = 0; i < rv; i ++) {
237 long flag = polledSockets[i];
238 long socket = polledSockets[++i];
239 if (socket == wakeupSocket) {
240 synchronized (wakeupLock) {
241 Poll.remove(pollset, wakeupSocket);
242 toBeWakenUp = false;
243 }
244 continue;
245 }
246
247 if ((flag & Poll.APR_POLLIN) != 0) {
248 polledHandles.add(socket);
249 }
250 }
251 return !polledHandles.isEmpty();
252 }
253 }
254
255 @Override
256 protected Iterator<Long> selectedHandles() {
257 return polledHandles.iterator();
258 }
259
260 @Override
261 protected void close(Long handle) throws Exception {
262 Poll.remove(pollset, handle);
263 int result = Socket.close(handle);
264 if (result != Status.APR_SUCCESS) {
265 throwException(result);
266 }
267 }
268
269 @Override
270 protected void wakeup() {
271 if (toBeWakenUp) {
272 return;
273 }
274
275
276 synchronized (wakeupLock) {
277 toBeWakenUp = true;
278 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
279 }
280 }
281
282 public int getBacklog() {
283 return backlog;
284 }
285
286 public boolean isReuseAddress() {
287 return reuseAddress;
288 }
289
290 public void setBacklog(int backlog) {
291 synchronized (bindLock) {
292 if (isActive()) {
293 throw new IllegalStateException(
294 "backlog can't be set while the acceptor is bound.");
295 }
296
297 this.backlog = backlog;
298 }
299 }
300
301 @Override
302 public InetSocketAddress getLocalAddress() {
303 return (InetSocketAddress) super.getLocalAddress();
304 }
305
306 @Override
307 public InetSocketAddress getDefaultLocalAddress() {
308 return (InetSocketAddress) super.getDefaultLocalAddress();
309 }
310
311 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
312 super.setDefaultLocalAddress(localAddress);
313 }
314
315 public void setReuseAddress(boolean reuseAddress) {
316 synchronized (bindLock) {
317 if (isActive()) {
318 throw new IllegalStateException(
319 "backlog can't be set while the acceptor is bound.");
320 }
321
322 this.reuseAddress = reuseAddress;
323 }
324 }
325
326 public TransportMetadata getTransportMetadata() {
327 return AprSocketSession.METADATA;
328 }
329
330 @Override
331 public SocketSessionConfig getSessionConfig() {
332 return (SocketSessionConfig) super.getSessionConfig();
333 }
334
335 private void throwException(int code) throws IOException {
336 throw new IOException(
337 org.apache.tomcat.jni.Error.strerror(-code) +
338 " (code: " + code + ")");
339 }
340 }