1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.Inet4Address;
24 import java.net.Inet6Address;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.SocketAddress;
28 import java.nio.channels.ClosedSelectorException;
29 import java.nio.channels.DatagramChannel;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Queue;
39 import java.util.Set;
40 import java.util.concurrent.ConcurrentLinkedQueue;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.Semaphore;
43
44 import org.apache.mina.core.RuntimeIoException;
45 import org.apache.mina.core.buffer.IoBuffer;
46 import org.apache.mina.core.service.AbstractIoAcceptor;
47 import org.apache.mina.core.service.IoAcceptor;
48 import org.apache.mina.core.service.IoProcessor;
49 import org.apache.mina.core.service.TransportMetadata;
50 import org.apache.mina.core.session.AbstractIoSession;
51 import org.apache.mina.core.session.ExpiringSessionRecycler;
52 import org.apache.mina.core.session.IoSession;
53 import org.apache.mina.core.session.IoSessionConfig;
54 import org.apache.mina.core.session.IoSessionRecycler;
55 import org.apache.mina.core.write.WriteRequest;
56 import org.apache.mina.core.write.WriteRequestQueue;
57 import org.apache.mina.transport.socket.DatagramAcceptor;
58 import org.apache.mina.transport.socket.DatagramSessionConfig;
59 import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
60 import org.apache.mina.util.ExceptionMonitor;
61
62
63
64
65
66
67
68 public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
69
70
71
72 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
73
74
75
76
77
78 private static final long SELECT_TIMEOUT = 1000L;
79
80
81 private final Semaphore lock = new Semaphore(1);
82
83
84 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
85
86 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
87
88 private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
89
90 private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
91 .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
92
93 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
94
95 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
96
97 private volatile boolean selectable;
98
99
100 private Acceptor acceptor;
101
102 private long lastIdleCheckTime;
103
104
105 private volatile Selector selector;
106
107
108
109
110 public NioDatagramAcceptor() {
111 this(new DefaultDatagramSessionConfig(), null);
112 }
113
114
115
116
117
118
119 public NioDatagramAcceptor(Executor executor) {
120 this(new DefaultDatagramSessionConfig(), executor);
121 }
122
123
124
125
126 private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127 super(sessionConfig, executor);
128
129 try {
130 init();
131 selectable = true;
132 } catch (RuntimeException e) {
133 throw e;
134 } catch (Exception e) {
135 throw new RuntimeIoException("Failed to initialize.", e);
136 } finally {
137 if (!selectable) {
138 try {
139 destroy();
140 } catch (Exception e) {
141 ExceptionMonitor.getInstance().exceptionCaught(e);
142 }
143 }
144 }
145 }
146
147
148
149
150
151
152 private class Acceptor implements Runnable {
153 @Override
154 public void run() {
155 int nHandles = 0;
156 lastIdleCheckTime = System.currentTimeMillis();
157
158
159 lock.release();
160
161 while (selectable) {
162 try {
163 int selected = select(SELECT_TIMEOUT);
164
165 nHandles += registerHandles();
166
167 if (nHandles == 0) {
168 try {
169 lock.acquire();
170
171 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
172 acceptor = null;
173 break;
174 }
175 } finally {
176 lock.release();
177 }
178 }
179
180 if (selected > 0) {
181 processReadySessions(selectedHandles());
182 }
183
184 long currentTime = System.currentTimeMillis();
185 flushSessions(currentTime);
186 nHandles -= unregisterHandles();
187
188 notifyIdleSessions(currentTime);
189 } catch (ClosedSelectorException cse) {
190
191 ExceptionMonitor.getInstance().exceptionCaught(cse);
192 break;
193 } catch (Exception e) {
194 ExceptionMonitor.getInstance().exceptionCaught(e);
195
196 try {
197 Thread.sleep(1000);
198 } catch (InterruptedException e1) {
199 }
200 }
201 }
202
203 if (selectable && isDisposing()) {
204 selectable = false;
205 try {
206 destroy();
207 } catch (Exception e) {
208 ExceptionMonitor.getInstance().exceptionCaught(e);
209 } finally {
210 disposalFuture.setValue(true);
211 }
212 }
213 }
214 }
215
216 private int registerHandles() {
217 for (;;) {
218 AcceptorOperationFuture req = registerQueue.poll();
219
220 if (req == null) {
221 break;
222 }
223
224 Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
225 List<SocketAddress> localAddresses = req.getLocalAddresses();
226
227 try {
228 for (SocketAddress socketAddress : localAddresses) {
229 DatagramChannel handle = open(socketAddress);
230 newHandles.put(localAddress(handle), handle);
231 }
232
233 boundHandles.putAll(newHandles);
234
235 getListeners().fireServiceActivated();
236 req.setDone();
237
238 return newHandles.size();
239 } catch (Exception e) {
240 req.setException(e);
241 } finally {
242
243 if (req.getException() != null) {
244 for (DatagramChannel handle : newHandles.values()) {
245 try {
246 close(handle);
247 } catch (Exception e) {
248 ExceptionMonitor.getInstance().exceptionCaught(e);
249 }
250 }
251
252 wakeup();
253 }
254 }
255 }
256
257 return 0;
258 }
259
260 private void processReadySessions(Set<SelectionKey> handles) {
261 Iterator<SelectionKey> iterator = handles.iterator();
262
263 while (iterator.hasNext()) {
264 SelectionKey key = iterator.next();
265 DatagramChannel handle = (DatagramChannel) key.channel();
266 iterator.remove();
267
268 try {
269 if (key.isValid() && key.isReadable()) {
270 readHandle(handle);
271 }
272
273 if (key.isValid() && key.isWritable()) {
274 for (IoSession session : getManagedSessions().values()) {
275 scheduleFlush((NioSession) session);
276 }
277 }
278 } catch (Exception e) {
279 ExceptionMonitor.getInstance().exceptionCaught(e);
280 }
281 }
282 }
283
284 private boolean scheduleFlush(NioSession session) {
285
286
287
288 if (session.setScheduledForFlush(true)) {
289 flushingSessions.add(session);
290 return true;
291 } else {
292 return false;
293 }
294 }
295
296 private void readHandle(DatagramChannel handle) throws Exception {
297 IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
298
299 SocketAddress remoteAddress = receive(handle, readBuf);
300
301 if (remoteAddress != null) {
302 IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
303
304 readBuf.flip();
305
306 if (!session.isReadSuspended()) {
307 session.getFilterChain().fireMessageReceived(readBuf);
308 }
309 }
310 }
311
312 private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
313 DatagramChannel handle = boundHandles.get(localAddress);
314
315 if (handle == null) {
316 throw new IllegalArgumentException("Unknown local address: " + localAddress);
317 }
318
319 IoSession session;
320
321 synchronized (sessionRecycler) {
322 session = sessionRecycler.recycle(remoteAddress);
323
324 if (session != null) {
325 return session;
326 }
327
328
329 NioSession newSession = newSession(this, handle, remoteAddress);
330 getSessionRecycler().put(newSession);
331 session = newSession;
332 }
333
334 initSession(session, null, null);
335
336 try {
337 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
338 getListeners().fireSessionCreated(session);
339 } catch (Exception e) {
340 ExceptionMonitor.getInstance().exceptionCaught(e);
341 }
342
343 return session;
344 }
345
346 private void flushSessions(long currentTime) {
347 for (;;) {
348 NioSession session = flushingSessions.poll();
349
350 if (session == null) {
351 break;
352 }
353
354
355
356 session.unscheduledForFlush();
357
358 try {
359 boolean flushedAll = flush(session, currentTime);
360
361 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
362 scheduleFlush(session);
363 }
364 } catch (Exception e) {
365 session.getFilterChain().fireExceptionCaught(e);
366 }
367 }
368 }
369
370 private boolean flush(NioSession session, long currentTime) throws Exception {
371 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
372 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
373 + (session.getConfig().getMaxReadBufferSize() >>> 1);
374
375 int writtenBytes = 0;
376
377 try {
378 for (;;) {
379 WriteRequest req = session.getCurrentWriteRequest();
380
381 if (req == null) {
382 req = writeRequestQueue.poll(session);
383
384 if (req == null) {
385 setInterestedInWrite(session, false);
386 break;
387 }
388
389 session.setCurrentWriteRequest(req);
390 }
391
392 IoBuffer"../../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) req.getMessage();
393
394 if (buf.remaining() == 0) {
395
396 session.setCurrentWriteRequest(null);
397 buf.reset();
398 session.getFilterChain().fireMessageSent(req);
399 continue;
400 }
401
402 SocketAddress destination = req.getDestination();
403
404 if (destination == null) {
405 destination = session.getRemoteAddress();
406 }
407
408 int localWrittenBytes = send(session, buf, destination);
409
410 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
411
412 setInterestedInWrite(session, true);
413
414 return false;
415 } else {
416 setInterestedInWrite(session, false);
417
418
419 session.setCurrentWriteRequest(null);
420 writtenBytes += localWrittenBytes;
421 buf.reset();
422 session.getFilterChain().fireMessageSent(req);
423 }
424 }
425 } finally {
426 session.increaseWrittenBytes(writtenBytes, currentTime);
427 }
428
429 return true;
430 }
431
432 private int unregisterHandles() {
433 int nHandles = 0;
434
435 for (;;) {
436 AcceptorOperationFuture request = cancelQueue.poll();
437 if (request == null) {
438 break;
439 }
440
441
442 for (SocketAddress socketAddress : request.getLocalAddresses()) {
443 DatagramChannel handle = boundHandles.remove(socketAddress);
444
445 if (handle == null) {
446 continue;
447 }
448
449 try {
450 close(handle);
451 wakeup();
452 } catch (Exception e) {
453 ExceptionMonitor.getInstance().exceptionCaught(e);
454 } finally {
455 nHandles++;
456 }
457 }
458
459 request.setDone();
460 }
461
462 return nHandles;
463 }
464
465 private void notifyIdleSessions(long currentTime) {
466
467 if (currentTime - lastIdleCheckTime >= 1000) {
468 lastIdleCheckTime = currentTime;
469 AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
470 }
471 }
472
473
474
475
476 private void startupAcceptor() throws InterruptedException {
477 if (!selectable) {
478 registerQueue.clear();
479 cancelQueue.clear();
480 flushingSessions.clear();
481 }
482
483 lock.acquire();
484
485 if (acceptor == null) {
486 acceptor = new Acceptor();
487 executeWorker(acceptor);
488 } else {
489 lock.release();
490 }
491 }
492
493 protected void init() throws Exception {
494 this.selector = Selector.open();
495 }
496
497
498
499
500 @Override
501 public void add(NioSession session) {
502
503 }
504
505
506
507
508 @Override
509 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
510
511
512 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
513
514
515
516 registerQueue.add(request);
517
518
519
520 startupAcceptor();
521
522
523
524
525 try {
526 lock.acquire();
527
528
529 Thread.sleep(10);
530 wakeup();
531 } finally {
532 lock.release();
533 }
534
535
536 request.awaitUninterruptibly();
537
538 if (request.getException() != null) {
539 throw request.getException();
540 }
541
542
543
544
545 Set<SocketAddress> newLocalAddresses = new HashSet<>();
546
547 for (DatagramChannel handle : boundHandles.values()) {
548 newLocalAddresses.add(localAddress(handle));
549 }
550
551 return newLocalAddresses;
552 }
553
554 protected void close(DatagramChannel handle) throws Exception {
555 SelectionKey key = handle.keyFor(selector);
556
557 if (key != null) {
558 key.cancel();
559 }
560
561 handle.disconnect();
562 handle.close();
563 }
564
565 protected void destroy() throws Exception {
566 if (selector != null) {
567 selector.close();
568 }
569 }
570
571
572
573
574 @Override
575 protected void dispose0() throws Exception {
576 unbind();
577 startupAcceptor();
578 wakeup();
579 }
580
581
582
583
584 @Override
585 public void flush(NioSession session) {
586 if (scheduleFlush(session)) {
587 wakeup();
588 }
589 }
590
591 @Override
592 public InetSocketAddress getDefaultLocalAddress() {
593 return (InetSocketAddress) super.getDefaultLocalAddress();
594 }
595
596 @Override
597 public InetSocketAddress getLocalAddress() {
598 return (InetSocketAddress) super.getLocalAddress();
599 }
600
601
602
603
604 @Override
605 public DatagramSessionConfig getSessionConfig() {
606 return (DatagramSessionConfig) sessionConfig;
607 }
608
609 @Override
610 public final IoSessionRecycler getSessionRecycler() {
611 return sessionRecycler;
612 }
613
614 @Override
615 public TransportMetadata getTransportMetadata() {
616 return NioDatagramSession.METADATA;
617 }
618
619 protected boolean isReadable(DatagramChannel handle) {
620 SelectionKey key = handle.keyFor(selector);
621
622 if ((key == null) || (!key.isValid())) {
623 return false;
624 }
625
626 return key.isReadable();
627 }
628
629 protected boolean isWritable(DatagramChannel handle) {
630 SelectionKey key = handle.keyFor(selector);
631
632 if ((key == null) || (!key.isValid())) {
633 return false;
634 }
635
636 return key.isWritable();
637 }
638
639 protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
640 InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
641 InetAddress inetAddress = inetSocketAddress.getAddress();
642
643 if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
644
645
646
647 byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
648 byte[] ipV4Address = new byte[4];
649
650 System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);
651
652 InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
653 return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
654 } else {
655 return inetSocketAddress;
656 }
657 }
658
659 protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
660 SocketAddress remoteAddress) {
661 SelectionKey key = handle.keyFor(selector);
662
663 if ((key == null) || (!key.isValid())) {
664 return null;
665 }
666
667 NioDatagramSessionioDatagramSession.html#NioDatagramSession">NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
668 newSession.setSelectionKey(key);
669
670 return newSession;
671 }
672
673
674
675
676 @Override
677 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
678 if (isDisposing()) {
679 throw new IllegalStateException("The Acceptor is being disposed.");
680 }
681
682 if (remoteAddress == null) {
683 throw new IllegalArgumentException("remoteAddress");
684 }
685
686 synchronized (bindLock) {
687 if (!isActive()) {
688 throw new IllegalStateException("Can't create a session from a unbound service.");
689 }
690
691 try {
692 return newSessionWithoutLock(remoteAddress, localAddress);
693 } catch (RuntimeException | Error e) {
694 throw e;
695 } catch (Exception e) {
696 throw new RuntimeIoException("Failed to create a session.", e);
697 }
698 }
699 }
700
701 protected DatagramChannel open(SocketAddress localAddress) throws Exception {
702 final DatagramChannel ch = DatagramChannel.open();
703 boolean success = false;
704 try {
705 new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
706 ch.configureBlocking(false);
707
708 try {
709 ch.socket().bind(localAddress);
710 } catch (IOException ioe) {
711
712
713 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
714 + ioe.getMessage();
715 Exception e = new IOException(newMessage);
716 e.initCause(ioe.getCause());
717
718
719 ch.close();
720
721 throw e;
722 }
723
724 ch.register(selector, SelectionKey.OP_READ);
725 success = true;
726 } finally {
727 if (!success) {
728 close(ch);
729 }
730 }
731
732 return ch;
733 }
734
735 protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
736 return handle.receive(buffer.buf());
737 }
738
739
740
741
742 @Override
743 public void remove(NioSession session) {
744 getSessionRecycler().remove(session);
745 getListeners().fireSessionDestroyed(session);
746 }
747
748 protected int select() throws Exception {
749 return selector.select();
750 }
751
752 protected int select(long timeout) throws Exception {
753 return selector.select(timeout);
754 }
755
756 protected Set<SelectionKey> selectedHandles() {
757 return selector.selectedKeys();
758 }
759
760 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
761 return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
762 }
763
764 @Override
765 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
766 setDefaultLocalAddress((SocketAddress) localAddress);
767 }
768
769 protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
770 SelectionKey key = session.getSelectionKey();
771
772 if (key == null) {
773 return;
774 }
775
776 int newInterestOps = key.interestOps();
777
778 if (isInterested) {
779 newInterestOps |= SelectionKey.OP_WRITE;
780 } else {
781 newInterestOps &= ~SelectionKey.OP_WRITE;
782 }
783
784 key.interestOps(newInterestOps);
785 }
786
787 @Override
788 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
789 synchronized (bindLock) {
790 if (isActive()) {
791 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
792 }
793
794 if (sessionRecycler == null) {
795 sessionRecycler = DEFAULT_RECYCLER;
796 }
797
798 this.sessionRecycler = sessionRecycler;
799 }
800 }
801
802
803
804
805 @Override
806 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
807 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
808
809 cancelQueue.add(request);
810 startupAcceptor();
811 wakeup();
812
813 request.awaitUninterruptibly();
814
815 if (request.getException() != null) {
816 throw request.getException();
817 }
818 }
819
820
821
822
823 @Override
824 public void updateTrafficControl(NioSession session) {
825
826 }
827
828 protected void wakeup() {
829 selector.wakeup();
830 }
831
832
833
834
835 @Override
836 public void write(NioSession session, WriteRequest writeRequest) {
837
838 long currentTime = System.currentTimeMillis();
839 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
840 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
841 + (session.getConfig().getMaxReadBufferSize() >>> 1);
842
843 int writtenBytes = 0;
844
845
846
847 IoBuffer"../../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) writeRequest.getMessage();
848
849 if (buf.remaining() == 0) {
850
851 session.setCurrentWriteRequest(null);
852 buf.reset();
853 session.getFilterChain().fireMessageSent(writeRequest);
854 return;
855 }
856
857
858 try {
859 for (;;) {
860 if (writeRequest == null) {
861 writeRequest = writeRequestQueue.poll(session);
862
863 if (writeRequest == null) {
864 setInterestedInWrite(session, false);
865 break;
866 }
867
868 session.setCurrentWriteRequest(writeRequest);
869 }
870
871 buf = (IoBuffer) writeRequest.getMessage();
872
873 if (buf.remaining() == 0) {
874
875 session.setCurrentWriteRequest(null);
876 session.getFilterChain().fireMessageSent(writeRequest);
877 continue;
878 }
879
880 SocketAddress destination = writeRequest.getDestination();
881
882 if (destination == null) {
883 destination = session.getRemoteAddress();
884 }
885
886 int localWrittenBytes = send(session, buf, destination);
887
888 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
889
890 setInterestedInWrite(session, true);
891
892 session.getWriteRequestQueue().offer(session, writeRequest);
893 scheduleFlush(session);
894 } else {
895 setInterestedInWrite(session, false);
896
897
898 session.setCurrentWriteRequest(null);
899 writtenBytes += localWrittenBytes;
900 session.getFilterChain().fireMessageSent(writeRequest);
901
902 break;
903 }
904 }
905 } catch (Exception e) {
906 session.getFilterChain().fireExceptionCaught(e);
907 } finally {
908 session.increaseWrittenBytes(writtenBytes, currentTime);
909 }
910 }
911 }