1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.ByteBuffer;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.Executor;
34
35 import org.apache.mina.core.RuntimeIoException;
36 import org.apache.mina.core.polling.AbstractPollingIoConnector;
37 import org.apache.mina.core.service.IoConnector;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.service.IoService;
40 import org.apache.mina.core.service.SimpleIoProcessorPool;
41 import org.apache.mina.core.service.TransportMetadata;
42 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
43 import org.apache.mina.transport.socket.SocketConnector;
44 import org.apache.mina.transport.socket.SocketSessionConfig;
45 import org.apache.tomcat.jni.Address;
46 import org.apache.tomcat.jni.Poll;
47 import org.apache.tomcat.jni.Pool;
48 import org.apache.tomcat.jni.Socket;
49 import org.apache.tomcat.jni.Status;
50
51
52
53
54
55
56 public final class AprSocketConnector extends AbstractPollingIoConnector<AprSession, Long> implements SocketConnector {
57
58
59
60
61
62 private static final int APR_TIMEUP_ERROR = -120001;
63
64 private static final int POLLSET_SIZE = 1024;
65
66 private final Map<Long, ConnectionRequest> requests = new HashMap<Long, ConnectionRequest>(POLLSET_SIZE);
67
68 private final Object wakeupLock = new Object();
69
70 private volatile long wakeupSocket;
71
72 private volatile boolean toBeWakenUp;
73
74 private volatile long pool;
75
76 private volatile long pollset;
77
78 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
79
80 private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
81
82 private final Set<Long> failedHandles = new HashSet<Long>(POLLSET_SIZE);
83
84 private volatile ByteBuffer dummyBuffer;
85
86
87
88
89 public AprSocketConnector() {
90 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
91 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92 }
93
94
95
96
97
98
99
100 public AprSocketConnector(int processorCount) {
101 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
102 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
103 }
104
105
106
107
108
109
110
111 public AprSocketConnector(IoProcessor<AprSession> processor) {
112 super(new DefaultSocketSessionConfig(), processor);
113 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
114 }
115
116
117
118
119
120
121
122
123 public AprSocketConnector(Executor executor, IoProcessor<AprSession> processor) {
124 super(new DefaultSocketSessionConfig(), executor, processor);
125 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
126 }
127
128
129
130
131 @Override
132 protected void init() throws Exception {
133
134 pool = Pool.create(AprLibrary.getInstance().getRootPool());
135
136 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
137
138 dummyBuffer = Pool.alloc(pool, 1);
139
140 pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
141
142 if (pollset <= 0) {
143 pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
144 }
145
146 if (pollset <= 0) {
147 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
148 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
149 }
150 }
151 }
152
153
154
155
156 @Override
157 protected void destroy() throws Exception {
158 if (wakeupSocket > 0) {
159 Socket.close(wakeupSocket);
160 }
161 if (pollset > 0) {
162 Poll.destroy(pollset);
163 }
164 if (pool > 0) {
165 Pool.destroy(pool);
166 }
167 }
168
169
170
171
172 @Override
173 protected Iterator<Long> allHandles() {
174 return polledHandles.iterator();
175 }
176
177
178
179
180 @Override
181 protected boolean connect(Long handle, SocketAddress remoteAddress) throws Exception {
182 InetSocketAddress ra = (InetSocketAddress) remoteAddress;
183 long sa;
184 if (ra != null) {
185 if (ra.getAddress() == null) {
186 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, ra.getPort(), 0, pool);
187 } else {
188 sa = Address.info(ra.getAddress().getHostAddress(), Socket.APR_INET, ra.getPort(), 0, pool);
189 }
190 } else {
191 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
192 }
193
194 int rv = Socket.connect(handle, sa);
195 if (rv == Status.APR_SUCCESS) {
196 return true;
197 }
198
199 if (Status.APR_STATUS_IS_EINPROGRESS(rv)) {
200 return false;
201 }
202
203 throwException(rv);
204 throw new InternalError();
205 }
206
207
208
209
210 @Override
211 protected ConnectionRequest getConnectionRequest(Long handle) {
212 return requests.get(handle);
213 }
214
215
216
217
218 @Override
219 protected void close(Long handle) throws Exception {
220 finishConnect(handle);
221 int rv = Socket.close(handle);
222 if (rv != Status.APR_SUCCESS) {
223 throwException(rv);
224 }
225 }
226
227
228
229
230 @Override
231 protected boolean finishConnect(Long handle) throws Exception {
232 Poll.remove(pollset, handle);
233 requests.remove(handle);
234 if (failedHandles.remove(handle)) {
235 int rv = Socket.recvb(handle, dummyBuffer, 0, 1);
236 throwException(rv);
237 throw new InternalError("Shouldn't reach here.");
238 }
239 return true;
240 }
241
242
243
244
245 @Override
246 protected Long newHandle(SocketAddress localAddress) throws Exception {
247 long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
248 boolean success = false;
249 try {
250 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
251 if (result != Status.APR_SUCCESS) {
252 throwException(result);
253 }
254 result = Socket.timeoutSet(handle, 0);
255 if (result != Status.APR_SUCCESS) {
256 throwException(result);
257 }
258
259 if (localAddress != null) {
260 InetSocketAddress la = (InetSocketAddress) localAddress;
261 long sa;
262
263 if (la.getAddress() == null) {
264 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
265 } else {
266 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
267 }
268
269 result = Socket.bind(handle, sa);
270 if (result != Status.APR_SUCCESS) {
271 throwException(result);
272 }
273 }
274
275 success = true;
276 return handle;
277 } finally {
278 if (!success) {
279 int rv = Socket.close(handle);
280 if (rv != Status.APR_SUCCESS) {
281 throwException(rv);
282 }
283 }
284 }
285 }
286
287
288
289
290 @Override
291 protected AprSession newSession(IoProcessor<AprSession> processor, Long handle) throws Exception {
292 return new AprSocketSession(this, processor, handle);
293 }
294
295
296
297
298 @Override
299 protected void register(Long handle, ConnectionRequest request) throws Exception {
300 int rv = Poll.add(pollset, handle, Poll.APR_POLLOUT);
301 if (rv != Status.APR_SUCCESS) {
302 throwException(rv);
303 }
304
305 requests.put(handle, request);
306 }
307
308
309
310
311 @Override
312 protected int select(int timeout) throws Exception {
313 int rv = Poll.poll(pollset, timeout * 1000, polledSockets, false);
314 if (rv <= 0) {
315 if (rv != APR_TIMEUP_ERROR) {
316 throwException(rv);
317 }
318
319 rv = Poll.maintain(pollset, polledSockets, true);
320 if (rv > 0) {
321 for (int i = 0; i < rv; i++) {
322 Poll.add(pollset, polledSockets[i], Poll.APR_POLLOUT);
323 }
324 } else if (rv < 0) {
325 throwException(rv);
326 }
327
328 return 0;
329 } else {
330 rv <<= 1;
331 if (!polledHandles.isEmpty()) {
332 polledHandles.clear();
333 }
334
335 for (int i = 0; i < rv; i++) {
336 long flag = polledSockets[i];
337 long socket = polledSockets[++i];
338 if (socket == wakeupSocket) {
339 synchronized (wakeupLock) {
340 Poll.remove(pollset, wakeupSocket);
341 toBeWakenUp = false;
342 }
343 continue;
344 }
345 polledHandles.add(socket);
346 if ((flag & Poll.APR_POLLOUT) == 0) {
347 failedHandles.add(socket);
348 }
349 }
350 return polledHandles.size();
351 }
352 }
353
354
355
356
357 @Override
358 protected Iterator<Long> selectedHandles() {
359 return polledHandles.iterator();
360 }
361
362
363
364
365 @Override
366 protected void wakeup() {
367 if (toBeWakenUp) {
368 return;
369 }
370
371
372 synchronized (wakeupLock) {
373 toBeWakenUp = true;
374 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
375 }
376 }
377
378
379
380
381 public TransportMetadata getTransportMetadata() {
382 return AprSocketSession.METADATA;
383 }
384
385
386
387
388 public SocketSessionConfig getSessionConfig() {
389 return (SocketSessionConfig) sessionConfig;
390 }
391
392
393
394
395 @Override
396 public InetSocketAddress getDefaultRemoteAddress() {
397 return (InetSocketAddress) super.getDefaultRemoteAddress();
398 }
399
400
401
402
403 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
404 super.setDefaultRemoteAddress(defaultRemoteAddress);
405 }
406
407
408
409
410
411
412 private void throwException(int code) throws IOException {
413 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
414 }
415 }