1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.polling;
21
22 import java.net.Inet4Address;
23 import java.net.Inet6Address;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.SocketAddress;
27 import java.nio.channels.ClosedSelectorException;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Queue;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.Executor;
38
39 import org.apache.mina.core.RuntimeIoException;
40 import org.apache.mina.core.buffer.IoBuffer;
41 import org.apache.mina.core.service.AbstractIoAcceptor;
42 import org.apache.mina.core.service.IoAcceptor;
43 import org.apache.mina.core.service.IoProcessor;
44 import org.apache.mina.core.session.AbstractIoSession;
45 import org.apache.mina.core.session.ExpiringSessionRecycler;
46 import org.apache.mina.core.session.IoSession;
47 import org.apache.mina.core.session.IoSessionConfig;
48 import org.apache.mina.core.session.IoSessionRecycler;
49 import org.apache.mina.core.write.WriteRequest;
50 import org.apache.mina.core.write.WriteRequestQueue;
51 import org.apache.mina.util.ExceptionMonitor;
52
53
54
55
56
57
58
59
60
61 public abstract class AbstractPollingConnectionlessIoAcceptor<S extends AbstractIoSession, H>
62 extends AbstractIoAcceptor {
63
64 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
65
66
67
68
69
70 private static final long SELECT_TIMEOUT = 1000L;
71
72 private final Object lock = new Object();
73
74 private final IoProcessor<S> processor = new ConnectionlessAcceptorProcessor();
75 private final Queue<AcceptorOperationFuture> registerQueue =
76 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
77 private final Queue<AcceptorOperationFuture> cancelQueue =
78 new ConcurrentLinkedQueue<AcceptorOperationFuture>();
79
80 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
81 private final Map<String, H> boundHandles =
82 Collections.synchronizedMap(new HashMap<String, H>());
83
84 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
85
86 private final ServiceOperationFuture disposalFuture =
87 new ServiceOperationFuture();
88 private volatile boolean selectable;
89
90
91 private Acceptor acceptor;
92
93 private long lastIdleCheckTime;
94
95 private String getAddressAsString(SocketAddress address) {
96 InetAddress inetAddress = ((InetSocketAddress)address).getAddress();
97 int port = ((InetSocketAddress)address).getPort();
98
99 String result = null;
100
101 if ( inetAddress instanceof Inet4Address ) {
102 result = "/" + inetAddress.getHostAddress() + ":" + port;
103 } else {
104
105 if ( ((Inet6Address)inetAddress).isIPv4CompatibleAddress() ) {
106 byte[] bytes = inetAddress.getAddress();
107
108 result = "/" + bytes[12] + "." + bytes[13] + "." + bytes[14] + "." + bytes[15] + ":" + port;
109 } else {
110 result = inetAddress.toString();
111 }
112 }
113
114 return result;
115 }
116
117
118
119
120 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig) {
121 this(sessionConfig, null);
122 }
123
124
125
126
127 protected AbstractPollingConnectionlessIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
128 super(sessionConfig, executor);
129
130 try {
131 init();
132 selectable = true;
133 } catch (RuntimeException e) {
134 throw e;
135 } catch (Exception e) {
136 throw new RuntimeIoException("Failed to initialize.", e);
137 } finally {
138 if (!selectable) {
139 try {
140 destroy();
141 } catch (Exception e) {
142 ExceptionMonitor.getInstance().exceptionCaught(e);
143 }
144 }
145 }
146 }
147
148 protected abstract void init() throws Exception;
149 protected abstract void destroy() throws Exception;
150 protected abstract int select() throws Exception;
151 protected abstract int select(long timeout) throws Exception;
152 protected abstract void wakeup();
153 protected abstract Iterator<H> selectedHandles();
154 protected abstract H open(SocketAddress localAddress) throws Exception;
155 protected abstract void close(H handle) throws Exception;
156 protected abstract SocketAddress localAddress(H handle) throws Exception;
157 protected abstract boolean isReadable(H handle);
158 protected abstract boolean isWritable(H handle);
159 protected abstract SocketAddress receive(H handle, IoBuffer buffer) throws Exception;
160
161 protected abstract int send(S session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception;
162
163 protected abstract S newSession(IoProcessor<S> processor, H handle, SocketAddress remoteAddress) throws Exception;
164
165 protected abstract void setInterestedInWrite(S session, boolean interested) throws Exception;
166
167
168
169
170 @Override
171 protected void dispose0() throws Exception {
172 unbind();
173 startupAcceptor();
174 wakeup();
175 }
176
177
178
179
180 @Override
181 protected final Set<SocketAddress> bindInternal(
182 List<? extends SocketAddress> localAddresses) throws Exception {
183
184
185 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
186
187
188
189 registerQueue.add(request);
190
191
192
193 startupAcceptor();
194
195
196
197
198 wakeup();
199
200
201 request.awaitUninterruptibly();
202
203 if (request.getException() != null) {
204 throw request.getException();
205 }
206
207
208
209
210 Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
211
212 for (H handle : boundHandles.values()) {
213 newLocalAddresses.add(localAddress(handle));
214 }
215
216 return newLocalAddresses;
217 }
218
219
220
221
222 @Override
223 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
224 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
225
226 cancelQueue.add(request);
227 startupAcceptor();
228 wakeup();
229
230 request.awaitUninterruptibly();
231
232 if (request.getException() != null) {
233 throw request.getException();
234 }
235 }
236
237
238
239
240 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
241 if (isDisposing()) {
242 throw new IllegalStateException("Already disposed.");
243 }
244
245 if (remoteAddress == null) {
246 throw new IllegalArgumentException("remoteAddress");
247 }
248
249 synchronized (bindLock) {
250 if (!isActive()) {
251 throw new IllegalStateException(
252 "Can't create a session from a unbound service.");
253 }
254
255 try {
256 return newSessionWithoutLock(remoteAddress, localAddress);
257 } catch (RuntimeException e) {
258 throw e;
259 } catch (Error e) {
260 throw e;
261 } catch (Exception e) {
262 throw new RuntimeIoException("Failed to create a session.", e);
263 }
264 }
265 }
266
267 private IoSession newSessionWithoutLock(
268 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
269 H handle = boundHandles.get(getAddressAsString(localAddress));
270
271 if (handle == null) {
272 throw new IllegalArgumentException("Unknown local address: " + localAddress);
273 }
274
275 IoSession session;
276 IoSessionRecycler sessionRecycler = getSessionRecycler();
277
278 synchronized (sessionRecycler) {
279 session = sessionRecycler.recycle(localAddress, remoteAddress);
280
281 if (session != null) {
282 return session;
283 }
284
285
286 S newSession = newSession(processor, handle, remoteAddress);
287 getSessionRecycler().put(newSession);
288 session = newSession;
289 }
290
291 initSession(session, null, null);
292
293 try {
294 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
295 getListeners().fireSessionCreated(session);
296 } catch (Throwable t) {
297 ExceptionMonitor.getInstance().exceptionCaught(t);
298 }
299
300 return session;
301 }
302
303 public final IoSessionRecycler getSessionRecycler() {
304 return sessionRecycler;
305 }
306
307 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
308 synchronized (bindLock) {
309 if (isActive()) {
310 throw new IllegalStateException(
311 "sessionRecycler can't be set while the acceptor is bound.");
312 }
313
314 if (sessionRecycler == null) {
315 sessionRecycler = DEFAULT_RECYCLER;
316 }
317
318 this.sessionRecycler = sessionRecycler;
319 }
320 }
321
322 private class ConnectionlessAcceptorProcessor implements IoProcessor<S> {
323
324 public void add(S session) {
325 }
326
327 public void flush(S session) {
328 if (scheduleFlush(session)) {
329 wakeup();
330 }
331 }
332
333 public void remove(S session) {
334 getSessionRecycler().remove(session);
335 getListeners().fireSessionDestroyed(session);
336 }
337
338 public void updateTrafficControl(S session) {
339 throw new UnsupportedOperationException();
340 }
341
342 public void dispose() {
343 }
344
345 public boolean isDisposed() {
346 return false;
347 }
348
349 public boolean isDisposing() {
350 return false;
351 }
352 }
353
354
355
356
357 private void startupAcceptor() {
358 if (!selectable) {
359 registerQueue.clear();
360 cancelQueue.clear();
361 flushingSessions.clear();
362 }
363
364 synchronized (lock) {
365 if (acceptor == null) {
366 acceptor = new Acceptor();
367 executeWorker(acceptor);
368 }
369 }
370 }
371
372 private boolean scheduleFlush(S session) {
373
374
375
376 if (session.setScheduledForFlush(true)) {
377 flushingSessions.add(session);
378 return true;
379 } else {
380 return false;
381 }
382 }
383
384
385
386
387
388
389 private class Acceptor implements Runnable {
390 public void run() {
391 int nHandles = 0;
392 lastIdleCheckTime = System.currentTimeMillis();
393
394 while (selectable) {
395 try {
396 int selected = select(SELECT_TIMEOUT);
397
398 nHandles += registerHandles();
399
400 if (selected > 0) {
401 processReadySessions(selectedHandles());
402 }
403
404 long currentTime = System.currentTimeMillis();
405 flushSessions(currentTime);
406 nHandles -= unregisterHandles();
407
408 notifyIdleSessions(currentTime);
409
410 if (nHandles == 0) {
411 synchronized (lock) {
412 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
413 acceptor = null;
414 break;
415 }
416 }
417 }
418 } catch (ClosedSelectorException cse) {
419
420 break;
421 } catch (Exception e) {
422 ExceptionMonitor.getInstance().exceptionCaught(e);
423
424 try {
425 Thread.sleep(1000);
426 } catch (InterruptedException e1) {
427 }
428 }
429 }
430
431 if (selectable && isDisposing()) {
432 selectable = false;
433 try {
434 destroy();
435 } catch (Exception e) {
436 ExceptionMonitor.getInstance().exceptionCaught(e);
437 } finally {
438 disposalFuture.setValue(true);
439 }
440 }
441 }
442 }
443
444 @SuppressWarnings("unchecked")
445 private void processReadySessions(Iterator<H> handles) {
446 while (handles.hasNext()) {
447 H h = handles.next();
448 handles.remove();
449
450 try {
451 if (isReadable(h)) {
452 readHandle(h);
453 }
454
455 if (isWritable(h)) {
456 for (IoSession session : getManagedSessions().values()) {
457 scheduleFlush((S) session);
458 }
459 }
460 } catch (Throwable t) {
461 ExceptionMonitor.getInstance().exceptionCaught(t);
462 }
463 }
464 }
465
466 private void readHandle(H handle) throws Exception {
467 IoBuffer readBuf = IoBuffer.allocate(
468 getSessionConfig().getReadBufferSize());
469
470 SocketAddress remoteAddress = receive(handle, readBuf);
471
472 if (remoteAddress != null) {
473 IoSession session = newSessionWithoutLock(
474 remoteAddress, localAddress(handle));
475
476 readBuf.flip();
477
478 IoBuffer newBuf = IoBuffer.allocate(readBuf.limit());
479 newBuf.put(readBuf);
480 newBuf.flip();
481
482 session.getFilterChain().fireMessageReceived(newBuf);
483 }
484 }
485
486 private void flushSessions(long currentTime) {
487 for (;;) {
488 S session = flushingSessions.poll();
489
490 if (session == null) {
491 break;
492 }
493
494
495
496 session.unscheduledForFlush();
497
498 try {
499 boolean flushedAll = flush(session, currentTime);
500 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) &&
501 !session.isScheduledForFlush()) {
502 scheduleFlush(session);
503 }
504 } catch (Exception e) {
505 session.getFilterChain().fireExceptionCaught(e);
506 }
507 }
508 }
509
510 private boolean flush(S session, long currentTime) throws Exception {
511
512 setInterestedInWrite(session, false);
513
514 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
515 final int maxWrittenBytes =
516 session.getConfig().getMaxReadBufferSize() +
517 (session.getConfig().getMaxReadBufferSize() >>> 1);
518
519 int writtenBytes = 0;
520
521 try {
522 for (;;) {
523 WriteRequest req = session.getCurrentWriteRequest();
524
525 if (req == null) {
526 req = writeRequestQueue.poll(session);
527 if (req == null) {
528 break;
529 }
530 session.setCurrentWriteRequest(req);
531 }
532
533 IoBuffer buf = (IoBuffer) req.getMessage();
534
535 if (buf.remaining() == 0) {
536
537 session.setCurrentWriteRequest(null);
538 buf.reset();
539 session.getFilterChain().fireMessageSent(req);
540 continue;
541 }
542
543 SocketAddress destination = req.getDestination();
544
545 if (destination == null) {
546 destination = session.getRemoteAddress();
547 }
548
549 int localWrittenBytes = send(session, buf, destination);
550
551 if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
552
553 setInterestedInWrite(session, true);
554 return false;
555 } else {
556 setInterestedInWrite(session, false);
557
558
559 session.setCurrentWriteRequest(null);
560 writtenBytes += localWrittenBytes;
561 buf.reset();
562 session.getFilterChain().fireMessageSent(req);
563 }
564 }
565 } finally {
566 session.increaseWrittenBytes(writtenBytes, currentTime);
567 }
568
569 return true;
570 }
571
572 private int registerHandles() {
573 for (;;) {
574 AcceptorOperationFuture req = registerQueue.poll();
575
576 if (req == null) {
577 break;
578 }
579
580 Map<String, H> newHandles = new HashMap<String, H>();
581 List<SocketAddress> localAddresses = req.getLocalAddresses();
582
583 try {
584 for (SocketAddress socketAddress : localAddresses) {
585 H handle = open(socketAddress);
586 newHandles.put(getAddressAsString(localAddress(handle)), handle);
587 }
588
589 boundHandles.putAll(newHandles);
590
591 getListeners().fireServiceActivated();
592 req.setDone();
593
594 return newHandles.size();
595 } catch (Exception e) {
596 req.setException(e);
597 } finally {
598
599 if (req.getException() != null) {
600 for (H handle : newHandles.values()) {
601 try {
602 close(handle);
603 } catch (Exception e) {
604 ExceptionMonitor.getInstance().exceptionCaught(e);
605 }
606 }
607
608 wakeup();
609 }
610 }
611 }
612
613 return 0;
614 }
615
616 private int unregisterHandles() {
617 int nHandles = 0;
618
619 for (;;) {
620 AcceptorOperationFuture request = cancelQueue.poll();
621 if (request == null) {
622 break;
623 }
624
625
626 for (SocketAddress socketAddress : request.getLocalAddresses()) {
627 H handle = boundHandles.remove(getAddressAsString(socketAddress));
628
629 if (handle == null) {
630 continue;
631 }
632
633 try {
634 close(handle);
635 wakeup();
636 } catch (Throwable e) {
637 ExceptionMonitor.getInstance().exceptionCaught(e);
638 } finally {
639 nHandles++;
640 }
641 }
642
643 request.setDone();
644 }
645
646 return nHandles;
647 }
648
649 private void notifyIdleSessions(long currentTime) {
650
651 if (currentTime - lastIdleCheckTime >= 1000) {
652 lastIdleCheckTime = currentTime;
653 AbstractIoSession.notifyIdleness(
654 getListeners().getManagedSessions().values().iterator(),
655 currentTime);
656 }
657 }
658 }