1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.ByteBuffer;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.Executor;
33
34 import org.apache.mina.core.RuntimeIoException;
35 import org.apache.mina.core.polling.AbstractPollingIoConnector;
36 import org.apache.mina.core.service.IoProcessor;
37 import org.apache.mina.core.service.TransportMetadata;
38 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39 import org.apache.mina.transport.socket.SocketConnector;
40 import org.apache.mina.transport.socket.SocketSessionConfig;
41 import org.apache.mina.util.CircularQueue;
42 import org.apache.tomcat.jni.Address;
43 import org.apache.tomcat.jni.Poll;
44 import org.apache.tomcat.jni.Pool;
45 import org.apache.tomcat.jni.Socket;
46 import org.apache.tomcat.jni.Status;
47
48
49
50
51
52
53
54 public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
55
56 private static final int POLLSET_SIZE = 1024;
57
58 private final Map<Long, ConnectionRequest> requests =
59 new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
60
61 private final Object wakeupLock = new Object();
62 private volatile long wakeupSocket;
63 private volatile boolean toBeWakenUp;
64
65 private volatile long pool;
66 private volatile long pollset;
67 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
68 private final List<Long> polledHandles = new CircularQueue<Long>(POLLSET_SIZE);
69 private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
70 private volatile ByteBuffer dummyBuffer;
71
72
73
74
75 public AprSocketConnector() {
76 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
77 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
78 }
79
80
81
82
83 public AprSocketConnector(int processorCount) {
84 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
85 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
86 }
87
88
89
90
91 public AprSocketConnector(IoProcessor<AprSession> processor) {
92 super(new DefaultSocketSessionConfig(), processor);
93 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
94 }
95
96
97
98
99 public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
100 super(new DefaultSocketSessionConfig(), executor, processor);
101 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
102 }
103
104
105
106
107 @Override
108 protected void init() throws Exception {
109
110 pool = Pool.create(AprLibrary.getInstance().getRootPool());
111
112 wakeupSocket = Socket.create(
113 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
114
115 dummyBuffer = Pool.alloc(pool, 1);
116
117 pollset = Poll.create(
118 POLLSET_SIZE,
119 pool,
120 Poll.APR_POLLSET_THREADSAFE,
121 Long.MAX_VALUE);
122
123 if (pollset <= 0) {
124 pollset = Poll.create(
125 62,
126 pool,
127 Poll.APR_POLLSET_THREADSAFE,
128 Long.MAX_VALUE);
129 }
130
131 if (pollset <= 0) {
132 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
133 throw new RuntimeIoException(
134 "Thread-safe pollset is not supported in this platform.");
135 }
136 }
137 }
138
139
140
141
142 @Override
143 protected void destroy() throws Exception {
144 if (wakeupSocket > 0) {
145 Socket.close(wakeupSocket);
146 }
147 if (pollset > 0) {
148 Poll.destroy(pollset);
149 }
150 if (pool > 0) {
151 Pool.destroy(pool);
152 }
153 }
154
155
156
157
158 @Override
159 protected Iterator<Long> allHandles() {
160 return polledHandles.iterator();
161 }
162
163
164
165
166 @Override
167 protected boolean connect(Long handle, SocketAddress remoteAddress)
168 throws Exception {
169 InetSocketAddress ra = (InetSocketAddress) remoteAddress;
170 long sa;
171 if (ra != null) {
172 if (ra.getAddress() == null) {
173 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
174 } else {
175 sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
176 }
177 } else {
178 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
179 }
180
181 int rv = Socket.connect(handle, sa);
182 if (rv == Status.APR_SUCCESS) {
183 return true;
184 }
185
186 if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
187 return false;
188 }
189
190 throwException(rv);
191 throw new InternalError();
192 }
193
194
195
196
197 @Override
198 protected ConnectionRequest connectionRequest(Long handle) {
199 return requests.get(handle);
200 }
201
202
203
204
205 @Override
206 protected void close(Long handle) throws Exception {
207 finishConnect(handle);
208 int rv = Socket.close(handle);
209 if (rv != Status.APR_SUCCESS) {
210 throwException(rv);
211 }
212 }
213
214
215
216
217 @Override
218 protected boolean finishConnect(Long handle) throws Exception {
219 Poll.remove(pollset, handle);
220 requests.remove(handle);
221 if (failedHandles.remove(handle)) {
222 int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
223 throwException(rv);
224 throw new InternalError("Shouldn't reach here.");
225 }
226 return true;
227 }
228
229
230
231
232 @Override
233 protected Long newHandle(SocketAddress localAddress) throws Exception {
234 long handle = Socket.create(
235 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
236 boolean success = false;
237 try {
238 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
239 if (result != Status.APR_SUCCESS) {
240 throwException(result);
241 }
242 result = Socket.timeoutSet(handle, 0);
243 if (result != Status.APR_SUCCESS) {
244 throwException(result);
245 }
246
247 if (localAddress != null) {
248 InetSocketAddress la = (InetSocketAddress) localAddress;
249 long sa;
250
251 if (la.getAddress() == null) {
252 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
253 } else {
254 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
255 }
256
257 result = Socket.bind(handle, sa);
258 if (result != Status.APR_SUCCESS) {
259 throwException(result);
260 }
261 }
262
263 success = true;
264 return handle;
265 } finally {
266 if (!success) {
267 int rv = Socket.close(handle);
268 if (rv != Status.APR_SUCCESS) {
269 throwException(rv);
270 }
271 }
272 }
273 }
274
275
276
277
278 @Override
279 protected AprSession newSession(IoProcessor<AprSession> processor,
280 Long handle) throws Exception {
281 return new AprSocketSession(this, processor, handle);
282 }
283
284
285
286
287 @Override
288 protected void register(Long handle, ConnectionRequest request)
289 throws Exception {
290 int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
291 if (rv != Status.APR_SUCCESS) {
292 throwException(rv);
293 }
294
295 requests.put(handle, request);
296 }
297
298
299
300
301 @Override
302 protected boolean select(int timeout) throws Exception {
303 int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
304 if (rv <= 0) {
305 if (rv != -120001) {
306 throwException(rv);
307 }
308
309 rv = Poll.maintain(pollset, polledSockets, true);
310 if (rv > 0) {
311 for (int i = 0; i < rv; i ++) {
312 Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
313 }
314 } else if (rv < 0) {
315 throwException(rv);
316 }
317
318 return false;
319 } else {
320 rv <<= 1;
321 if (!polledHandles.isEmpty()) {
322 polledHandles.clear();
323 }
324
325 for (int i = 0; i < rv; i ++) {
326 long flag = polledSockets[i];
327 long socket = polledSockets[++i];
328 if (socket == wakeupSocket) {
329 synchronized (wakeupLock) {
330 Poll.remove(pollset, wakeupSocket);
331 toBeWakenUp = false;
332 }
333 continue;
334 }
335 polledHandles.add(socket);
336 if ((flag & Poll.APR_POLLOUT) == 0) {
337 failedHandles.add(socket);
338 }
339 }
340 return !polledHandles.isEmpty();
341 }
342 }
343
344
345
346
347 @Override
348 protected Iterator<Long> selectedHandles() {
349 return polledHandles.iterator();
350 }
351
352
353
354
355 @Override
356 protected void wakeup() {
357 if (toBeWakenUp) {
358 return;
359 }
360
361
362 synchronized (wakeupLock) {
363 toBeWakenUp = true;
364 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
365 }
366 }
367
368
369
370
371 public TransportMetadata getTransportMetadata() {
372 return AprSocketSession.METADATA;
373 }
374
375
376
377
378 @Override
379 public SocketSessionConfig getSessionConfig() {
380 return (SocketSessionConfig) super.getSessionConfig();
381 }
382
383
384
385
386 @Override
387 public InetSocketAddress getDefaultRemoteAddress() {
388 return (InetSocketAddress) super.getDefaultRemoteAddress();
389 }
390
391
392
393
394 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
395 super.setDefaultRemoteAddress(defaultRemoteAddress);
396 }
397
398
399
400
401
402
403 private void throwException(int code) throws IOException {
404 throw new IOException(
405 org.apache.tomcat.jni.Error.strerror(-code) +
406 " (code: " + code + ")");
407 }
408 }