1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.ByteBuffer;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.Executor;
33
34 import org.apache.mina.core.RuntimeIoException;
35 import org.apache.mina.core.polling.AbstractPollingIoConnector;
36 import org.apache.mina.core.service.IoConnector;
37 import org.apache.mina.core.service.IoProcessor;
38 import org.apache.mina.core.service.IoService;
39 import org.apache.mina.core.service.SimpleIoProcessorPool;
40 import org.apache.mina.core.service.TransportMetadata;
41 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
42 import org.apache.mina.transport.socket.SocketConnector;
43 import org.apache.mina.transport.socket.SocketSessionConfig;
44 import org.apache.mina.util.CircularQueue;
45 import org.apache.tomcat.jni.Address;
46 import org.apache.tomcat.jni.Poll;
47 import org.apache.tomcat.jni.Pool;
48 import org.apache.tomcat.jni.Socket;
49 import org.apache.tomcat.jni.Status;
50
51
52
53
54
55
56 public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
57
58
59
60
61
62 private static final int APR_TIMEUP_ERROR = -120001;
63
64 private static final int POLLSET_SIZE = 1024;
65
66 private final Map<Long, ConnectionRequest> requests =
67 new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
68
69 private final Object wakeupLock = new Object();
70 private volatile long wakeupSocket;
71 private volatile boolean toBeWakenUp;
72
73 private volatile long pool;
74 private volatile long pollset;
75 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
76 private final List<Long> polledHandles = new CircularQueue<Long>(POLLSET_SIZE);
77 private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
78 private volatile ByteBuffer dummyBuffer;
79
80
81
82
83 public AprSocketConnector() {
84 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
85 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
86 }
87
88
89
90
91
92
93
94 public AprSocketConnector(int processorCount) {
95 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
96 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
97 }
98
99
100
101
102
103
104
105 public AprSocketConnector(IoProcessor<AprSession> processor) {
106 super(new DefaultSocketSessionConfig(), processor);
107 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
108 }
109
110
111
112
113
114
115
116
117 public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
118 super(new DefaultSocketSessionConfig(), executor, processor);
119 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
120 }
121
122
123
124
125 @Override
126 protected void init() throws Exception {
127
128 pool = Pool.create(AprLibrary.getInstance().getRootPool());
129
130 wakeupSocket = Socket.create(
131 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
132
133 dummyBuffer = Pool.alloc(pool, 1);
134
135 pollset = Poll.create(
136 POLLSET_SIZE,
137 pool,
138 Poll.APR_POLLSET_THREADSAFE,
139 Long.MAX_VALUE);
140
141 if (pollset <= 0) {
142 pollset = Poll.create(
143 62,
144 pool,
145 Poll.APR_POLLSET_THREADSAFE,
146 Long.MAX_VALUE);
147 }
148
149 if (pollset <= 0) {
150 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
151 throw new RuntimeIoException(
152 "Thread-safe pollset is not supported in this platform.");
153 }
154 }
155 }
156
157
158
159
160 @Override
161 protected void destroy() throws Exception {
162 if (wakeupSocket > 0) {
163 Socket.close(wakeupSocket);
164 }
165 if (pollset > 0) {
166 Poll.destroy(pollset);
167 }
168 if (pool > 0) {
169 Pool.destroy(pool);
170 }
171 }
172
173
174
175
176 @Override
177 protected Iterator<Long> allHandles() {
178 return polledHandles.iterator();
179 }
180
181
182
183
184 @Override
185 protected boolean connect(Long handle, SocketAddress remoteAddress)
186 throws Exception {
187 InetSocketAddress ra = (InetSocketAddress) remoteAddress;
188 long sa;
189 if (ra != null) {
190 if (ra.getAddress() == null) {
191 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
192 } else {
193 sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
194 }
195 } else {
196 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
197 }
198
199 int rv = Socket.connect(handle, sa);
200 if (rv == Status.APR_SUCCESS) {
201 return true;
202 }
203
204 if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
205 return false;
206 }
207
208 throwException(rv);
209 throw new InternalError();
210 }
211
212
213
214
215 @Override
216 protected ConnectionRequest getConnectionRequest(Long handle) {
217 return requests.get(handle);
218 }
219
220
221
222
223 @Override
224 protected void close(Long handle) throws Exception {
225 finishConnect(handle);
226 int rv = Socket.close(handle);
227 if (rv != Status.APR_SUCCESS) {
228 throwException(rv);
229 }
230 }
231
232
233
234
235 @Override
236 protected boolean finishConnect(Long handle) throws Exception {
237 Poll.remove(pollset, handle);
238 requests.remove(handle);
239 if (failedHandles.remove(handle)) {
240 int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
241 throwException(rv);
242 throw new InternalError("Shouldn't reach here.");
243 }
244 return true;
245 }
246
247
248
249
250 @Override
251 protected Long newHandle(SocketAddress localAddress) throws Exception {
252 long handle = Socket.create(
253 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
254 boolean success = false;
255 try {
256 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
257 if (result != Status.APR_SUCCESS) {
258 throwException(result);
259 }
260 result = Socket.timeoutSet(handle, 0);
261 if (result != Status.APR_SUCCESS) {
262 throwException(result);
263 }
264
265 if (localAddress != null) {
266 InetSocketAddress la = (InetSocketAddress) localAddress;
267 long sa;
268
269 if (la.getAddress() == null) {
270 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
271 } else {
272 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
273 }
274
275 result = Socket.bind(handle, sa);
276 if (result != Status.APR_SUCCESS) {
277 throwException(result);
278 }
279 }
280
281 success = true;
282 return handle;
283 } finally {
284 if (!success) {
285 int rv = Socket.close(handle);
286 if (rv != Status.APR_SUCCESS) {
287 throwException(rv);
288 }
289 }
290 }
291 }
292
293
294
295
296 @Override
297 protected AprSession newSession(IoProcessor<AprSession> processor,
298 Long handle) throws Exception {
299 return new AprSocketSession(this, processor, handle);
300 }
301
302
303
304
305 @Override
306 protected void register(Long handle, ConnectionRequest request)
307 throws Exception {
308 int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
309 if (rv != Status.APR_SUCCESS) {
310 throwException(rv);
311 }
312
313 requests.put(handle, request);
314 }
315
316
317
318
319 @Override
320 protected int select(int timeout) throws Exception {
321 int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
322 if (rv <= 0) {
323 if (rv != APR_TIMEUP_ERROR) {
324 throwException(rv);
325 }
326
327 rv = Poll.maintain(pollset, polledSockets, true);
328 if (rv > 0) {
329 for (int i = 0; i < rv; i ++) {
330 Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
331 }
332 } else if (rv < 0) {
333 throwException(rv);
334 }
335
336 return 0;
337 } else {
338 rv <<= 1;
339 if (!polledHandles.isEmpty()) {
340 polledHandles.clear();
341 }
342
343 for (int i = 0; i < rv; i ++) {
344 long flag = polledSockets[i];
345 long socket = polledSockets[++i];
346 if (socket == wakeupSocket) {
347 synchronized (wakeupLock) {
348 Poll.remove(pollset, wakeupSocket);
349 toBeWakenUp = false;
350 }
351 continue;
352 }
353 polledHandles.add(socket);
354 if ((flag & Poll.APR_POLLOUT) == 0) {
355 failedHandles.add(socket);
356 }
357 }
358 return polledHandles.size();
359 }
360 }
361
362
363
364
365 @Override
366 protected Iterator<Long> selectedHandles() {
367 return polledHandles.iterator();
368 }
369
370
371
372
373 @Override
374 protected void wakeup() {
375 if (toBeWakenUp) {
376 return;
377 }
378
379
380 synchronized (wakeupLock) {
381 toBeWakenUp = true;
382 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
383 }
384 }
385
386
387
388
389 public TransportMetadata getTransportMetadata() {
390 return AprSocketSession.METADATA;
391 }
392
393
394
395
396 @Override
397 public SocketSessionConfig getSessionConfig() {
398 return (SocketSessionConfig) super.getSessionConfig();
399 }
400
401
402
403
404 @Override
405 public InetSocketAddress getDefaultRemoteAddress() {
406 return (InetSocketAddress) super.getDefaultRemoteAddress();
407 }
408
409
410
411
412 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
413 super.setDefaultRemoteAddress(defaultRemoteAddress);
414 }
415
416
417
418
419
420
421 private void throwException(int code) throws IOException {
422 throw new IOException(
423 org.apache.tomcat.jni.Error.strerror(-code) +
424 " (code: " + code + ")");
425 }
426 }