1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.Inet4Address;
24 import java.net.Inet6Address;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.SocketAddress;
28 import java.nio.channels.ClosedSelectorException;
29 import java.nio.channels.DatagramChannel;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Queue;
39 import java.util.Set;
40 import java.util.concurrent.ConcurrentLinkedQueue;
41 import java.util.concurrent.Executor;
42 import java.util.concurrent.Semaphore;
43
44 import org.apache.mina.core.RuntimeIoException;
45 import org.apache.mina.core.buffer.IoBuffer;
46 import org.apache.mina.core.service.AbstractIoAcceptor;
47 import org.apache.mina.core.service.IoAcceptor;
48 import org.apache.mina.core.service.IoProcessor;
49 import org.apache.mina.core.service.TransportMetadata;
50 import org.apache.mina.core.session.AbstractIoSession;
51 import org.apache.mina.core.session.ExpiringSessionRecycler;
52 import org.apache.mina.core.session.IoSession;
53 import org.apache.mina.core.session.IoSessionConfig;
54 import org.apache.mina.core.session.IoSessionRecycler;
55 import org.apache.mina.core.write.WriteRequest;
56 import org.apache.mina.core.write.WriteRequestQueue;
57 import org.apache.mina.transport.socket.DatagramAcceptor;
58 import org.apache.mina.transport.socket.DatagramSessionConfig;
59 import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
60 import org.apache.mina.util.ExceptionMonitor;
61
62
63
64
65
66
67
68 public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
69
70
71
72 private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
73
74
75
76
77
78 private static final long SELECT_TIMEOUT = 1000L;
79
80
81 private final Semaphore lock = new Semaphore(1);
82
83
84 private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
85
86 private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
87
88 private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
89
90 private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
91 .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
92
93 private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
94
95 private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
96
97 private volatile boolean selectable;
98
99
100 private Acceptor acceptor;
101
102 private long lastIdleCheckTime;
103
104
105 private volatile Selector selector;
106
107
108
109
110 public NioDatagramAcceptor() {
111 this(new DefaultDatagramSessionConfig(), null);
112 }
113
114
115
116
117
118
119 public NioDatagramAcceptor(Executor executor) {
120 this(new DefaultDatagramSessionConfig(), executor);
121 }
122
123
124
125
126 private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127 super(sessionConfig, executor);
128
129 try {
130 init();
131 selectable = true;
132 } catch (RuntimeException e) {
133 throw e;
134 } catch (Exception e) {
135 throw new RuntimeIoException("Failed to initialize.", e);
136 } finally {
137 if (!selectable) {
138 try {
139 destroy();
140 } catch (Exception e) {
141 ExceptionMonitor.getInstance().exceptionCaught(e);
142 }
143 }
144 }
145 }
146
147
148
149
150
151
152 private class Acceptor implements Runnable {
153 @Override
154 public void run() {
155 int nHandles = 0;
156 lastIdleCheckTime = System.currentTimeMillis();
157
158
159 lock.release();
160
161 while (selectable) {
162 try {
163 int selected = select(SELECT_TIMEOUT);
164
165 nHandles += registerHandles();
166
167 if (nHandles == 0) {
168 try {
169 lock.acquire();
170
171 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
172 acceptor = null;
173 break;
174 }
175 } finally {
176 lock.release();
177 }
178 }
179
180 if (selected > 0) {
181 processReadySessions(selectedHandles());
182 }
183
184 long currentTime = System.currentTimeMillis();
185 flushSessions(currentTime);
186 nHandles -= unregisterHandles();
187
188 notifyIdleSessions(currentTime);
189 } catch (ClosedSelectorException cse) {
190
191 ExceptionMonitor.getInstance().exceptionCaught(cse);
192 break;
193 } catch (Exception e) {
194 ExceptionMonitor.getInstance().exceptionCaught(e);
195
196 try {
197 Thread.sleep(1000);
198 } catch (InterruptedException e1) {
199 }
200 }
201 }
202
203 if (selectable && isDisposing()) {
204 selectable = false;
205 try {
206 destroy();
207 } catch (Exception e) {
208 ExceptionMonitor.getInstance().exceptionCaught(e);
209 } finally {
210 disposalFuture.setValue(true);
211 }
212 }
213 }
214 }
215
216 private int registerHandles() {
217 for (;;) {
218 AcceptorOperationFuture req = registerQueue.poll();
219
220 if (req == null) {
221 break;
222 }
223
224 Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
225 List<SocketAddress> localAddresses = req.getLocalAddresses();
226
227 try {
228 for (SocketAddress socketAddress : localAddresses) {
229 DatagramChannel handle = open(socketAddress);
230 newHandles.put(localAddress(handle), handle);
231 }
232
233 boundHandles.putAll(newHandles);
234
235 getListeners().fireServiceActivated();
236 req.setDone();
237
238 return newHandles.size();
239 } catch (Exception e) {
240 req.setException(e);
241 } finally {
242
243 if (req.getException() != null) {
244 for (DatagramChannel handle : newHandles.values()) {
245 try {
246 close(handle);
247 } catch (Exception e) {
248 ExceptionMonitor.getInstance().exceptionCaught(e);
249 }
250 }
251
252 wakeup();
253 }
254 }
255 }
256
257 return 0;
258 }
259
260 private void processReadySessions(Set<SelectionKey> handles) {
261 final Iterator<SelectionKey> iterator = handles.iterator();
262
263 while (iterator.hasNext()) {
264 try {
265 final SelectionKey key = iterator.next();
266 final DatagramChannel handle = (DatagramChannel) key.channel();
267
268 if (key.isValid()) {
269 if (key.isReadable()) {
270 readHandle(handle);
271 }
272
273 if (key.isWritable()) {
274 for (IoSession session : getManagedSessions().values()) {
275 final NioSession"../../../../../../org/apache/mina/transport/socket/nio/NioSession.html#NioSession">NioSession x = (NioSession) session;
276 if (x.getChannel() == handle) {
277 scheduleFlush(x);
278 }
279 }
280 }
281 }
282
283 } catch (Exception e) {
284 ExceptionMonitor.getInstance().exceptionCaught(e);
285 } finally {
286 iterator.remove();
287 }
288 }
289 }
290
291 private boolean scheduleFlush(NioSession session) {
292
293
294
295 if (session.setScheduledForFlush(true)) {
296 flushingSessions.add(session);
297 return true;
298 } else {
299 return false;
300 }
301 }
302
303 private void readHandle(DatagramChannel handle) throws Exception {
304 IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
305
306 SocketAddress remoteAddress = receive(handle, readBuf);
307
308 if (remoteAddress != null) {
309 IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
310
311 readBuf.flip();
312
313 if (!session.isReadSuspended()) {
314 session.getFilterChain().fireMessageReceived(readBuf);
315 }
316 }
317 }
318
319 private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
320 DatagramChannel handle = boundHandles.get(localAddress);
321
322 if (handle == null) {
323 throw new IllegalArgumentException("Unknown local address: " + localAddress);
324 }
325
326 IoSession session;
327
328 synchronized (sessionRecycler) {
329 session = sessionRecycler.recycle(remoteAddress);
330
331 if (session != null) {
332 return session;
333 }
334
335
336 NioSession newSession = newSession(this, handle, remoteAddress);
337 getSessionRecycler().put(newSession);
338 session = newSession;
339 }
340
341 initSession(session, null, null);
342
343 try {
344 this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
345 getListeners().fireSessionCreated(session);
346 } catch (Exception e) {
347 ExceptionMonitor.getInstance().exceptionCaught(e);
348 }
349
350 return session;
351 }
352
353 private void flushSessions(long currentTime) {
354 for (;;) {
355 NioSession session = flushingSessions.poll();
356
357 if (session == null) {
358 break;
359 }
360
361
362
363 session.unscheduledForFlush();
364
365 try {
366 boolean flushedAll = flush(session, currentTime);
367
368 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
369 scheduleFlush(session);
370 }
371 } catch (Exception e) {
372 session.getFilterChain().fireExceptionCaught(e);
373 }
374 }
375 }
376
377 private boolean flush(NioSession session, long currentTime) throws Exception {
378 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
379 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
380 + (session.getConfig().getMaxReadBufferSize() >>> 1);
381
382 int writtenBytes = 0;
383
384 try {
385 for (;;) {
386 WriteRequest req = session.getCurrentWriteRequest();
387
388 if (req == null) {
389 req = writeRequestQueue.poll(session);
390
391 if (req == null) {
392 setInterestedInWrite(session, false);
393 break;
394 }
395
396 session.setCurrentWriteRequest(req);
397 }
398
399 IoBuffer"../../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) req.getMessage();
400
401 if (buf.remaining() == 0) {
402
403 session.setCurrentWriteRequest(null);
404 buf.reset();
405 session.getFilterChain().fireMessageSent(req);
406 continue;
407 }
408
409 SocketAddress destination = req.getDestination();
410
411 if (destination == null) {
412 destination = session.getRemoteAddress();
413 }
414
415 int localWrittenBytes = send(session, buf, destination);
416
417 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
418
419 setInterestedInWrite(session, true);
420
421 return false;
422 } else {
423 setInterestedInWrite(session, false);
424
425
426 session.setCurrentWriteRequest(null);
427 writtenBytes += localWrittenBytes;
428 buf.reset();
429 session.getFilterChain().fireMessageSent(req);
430 }
431 }
432 } finally {
433 session.increaseWrittenBytes(writtenBytes, currentTime);
434 }
435
436 return true;
437 }
438
439 private int unregisterHandles() {
440 int nHandles = 0;
441
442 for (;;) {
443 AcceptorOperationFuture request = cancelQueue.poll();
444 if (request == null) {
445 break;
446 }
447
448
449 for (SocketAddress socketAddress : request.getLocalAddresses()) {
450 DatagramChannel handle = boundHandles.remove(socketAddress);
451
452 if (handle == null) {
453 continue;
454 }
455
456 try {
457 close(handle);
458 wakeup();
459 } catch (Exception e) {
460 ExceptionMonitor.getInstance().exceptionCaught(e);
461 } finally {
462 nHandles++;
463 }
464 }
465
466 request.setDone();
467 }
468
469 return nHandles;
470 }
471
472 private void notifyIdleSessions(long currentTime) {
473
474 if (currentTime - lastIdleCheckTime >= 1000) {
475 lastIdleCheckTime = currentTime;
476 AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
477 }
478 }
479
480
481
482
483 private void startupAcceptor() throws InterruptedException {
484 if (!selectable) {
485 registerQueue.clear();
486 cancelQueue.clear();
487 flushingSessions.clear();
488 }
489
490 lock.acquire();
491
492 if (acceptor == null) {
493 acceptor = new Acceptor();
494 executeWorker(acceptor);
495 } else {
496 lock.release();
497 }
498 }
499
500 protected void init() throws Exception {
501 this.selector = Selector.open();
502 }
503
504
505
506
507 @Override
508 public void add(NioSession session) {
509
510 }
511
512
513
514
515 @Override
516 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
517
518
519 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
520
521
522
523 registerQueue.add(request);
524
525
526
527 startupAcceptor();
528
529
530
531
532 try {
533 lock.acquire();
534
535
536 Thread.sleep(10);
537 wakeup();
538 } finally {
539 lock.release();
540 }
541
542
543 request.awaitUninterruptibly();
544
545 if (request.getException() != null) {
546 throw request.getException();
547 }
548
549
550
551
552 Set<SocketAddress> newLocalAddresses = new HashSet<>();
553
554 for (DatagramChannel handle : boundHandles.values()) {
555 newLocalAddresses.add(localAddress(handle));
556 }
557
558 return newLocalAddresses;
559 }
560
561 protected void close(DatagramChannel handle) throws Exception {
562 SelectionKey key = handle.keyFor(selector);
563
564 if (key != null) {
565 key.cancel();
566 }
567
568 handle.disconnect();
569 handle.close();
570 }
571
572 protected void destroy() throws Exception {
573 if (selector != null) {
574 selector.close();
575 }
576 }
577
578
579
580
581 @Override
582 protected void dispose0() throws Exception {
583 unbind();
584 startupAcceptor();
585 wakeup();
586 }
587
588
589
590
591 @Override
592 public void flush(NioSession session) {
593 if (scheduleFlush(session)) {
594 wakeup();
595 }
596 }
597
598 @Override
599 public InetSocketAddress getDefaultLocalAddress() {
600 return (InetSocketAddress) super.getDefaultLocalAddress();
601 }
602
603 @Override
604 public InetSocketAddress getLocalAddress() {
605 return (InetSocketAddress) super.getLocalAddress();
606 }
607
608
609
610
611 @Override
612 public DatagramSessionConfig getSessionConfig() {
613 return (DatagramSessionConfig) sessionConfig;
614 }
615
616 @Override
617 public final IoSessionRecycler getSessionRecycler() {
618 return sessionRecycler;
619 }
620
621 @Override
622 public TransportMetadata getTransportMetadata() {
623 return NioDatagramSession.METADATA;
624 }
625
626 protected boolean isReadable(DatagramChannel handle) {
627 SelectionKey key = handle.keyFor(selector);
628
629 if ((key == null) || (!key.isValid())) {
630 return false;
631 }
632
633 return key.isReadable();
634 }
635
636 protected boolean isWritable(DatagramChannel handle) {
637 SelectionKey key = handle.keyFor(selector);
638
639 if ((key == null) || (!key.isValid())) {
640 return false;
641 }
642
643 return key.isWritable();
644 }
645
646 protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
647 InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
648 InetAddress inetAddress = inetSocketAddress.getAddress();
649
650 if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
651
652
653
654 byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
655 byte[] ipV4Address = new byte[4];
656
657 System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);
658
659 InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
660 return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
661 } else {
662 return inetSocketAddress;
663 }
664 }
665
666 protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
667 SocketAddress remoteAddress) {
668 SelectionKey key = handle.keyFor(selector);
669
670 if ((key == null) || (!key.isValid())) {
671 return null;
672 }
673
674 NioDatagramSessionioDatagramSession.html#NioDatagramSession">NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
675 newSession.setSelectionKey(key);
676
677 return newSession;
678 }
679
680
681
682
683 @Override
684 public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
685 if (isDisposing()) {
686 throw new IllegalStateException("The Acceptor is being disposed.");
687 }
688
689 if (remoteAddress == null) {
690 throw new IllegalArgumentException("remoteAddress");
691 }
692
693 synchronized (bindLock) {
694 if (!isActive()) {
695 throw new IllegalStateException("Can't create a session from a unbound service.");
696 }
697
698 try {
699 return newSessionWithoutLock(remoteAddress, localAddress);
700 } catch (RuntimeException | Error e) {
701 throw e;
702 } catch (Exception e) {
703 throw new RuntimeIoException("Failed to create a session.", e);
704 }
705 }
706 }
707
708 protected DatagramChannel open(SocketAddress localAddress) throws Exception {
709 final DatagramChannel ch = DatagramChannel.open();
710 boolean success = false;
711 try {
712 new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
713 ch.configureBlocking(false);
714
715 try {
716 ch.socket().bind(localAddress);
717 } catch (IOException ioe) {
718
719
720 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
721 + ioe.getMessage();
722 Exception e = new IOException(newMessage);
723 e.initCause(ioe.getCause());
724
725
726 ch.close();
727
728 throw e;
729 }
730
731 ch.register(selector, SelectionKey.OP_READ);
732 success = true;
733 } finally {
734 if (!success) {
735 close(ch);
736 }
737 }
738
739 return ch;
740 }
741
742 protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
743 return handle.receive(buffer.buf());
744 }
745
746
747
748
749 @Override
750 public void remove(NioSession session) {
751 getSessionRecycler().remove(session);
752 getListeners().fireSessionDestroyed(session);
753 }
754
755 protected int select() throws Exception {
756 return selector.select();
757 }
758
759 protected int select(long timeout) throws Exception {
760 return selector.select(timeout);
761 }
762
763 protected Set<SelectionKey> selectedHandles() {
764 return selector.selectedKeys();
765 }
766
767 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
768 return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
769 }
770
771 @Override
772 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
773 setDefaultLocalAddress((SocketAddress) localAddress);
774 }
775
776 protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
777 SelectionKey key = session.getSelectionKey();
778
779 if (key == null) {
780 return;
781 }
782
783 int newInterestOps = key.interestOps();
784
785 if (isInterested) {
786 newInterestOps |= SelectionKey.OP_WRITE;
787 } else {
788 newInterestOps &= ~SelectionKey.OP_WRITE;
789 }
790
791 key.interestOps(newInterestOps);
792 }
793
794 @Override
795 public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
796 synchronized (bindLock) {
797 if (isActive()) {
798 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
799 }
800
801 if (sessionRecycler == null) {
802 sessionRecycler = DEFAULT_RECYCLER;
803 }
804
805 this.sessionRecycler = sessionRecycler;
806 }
807 }
808
809
810
811
812 @Override
813 protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
814 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
815
816 cancelQueue.add(request);
817 startupAcceptor();
818 wakeup();
819
820 request.awaitUninterruptibly();
821
822 if (request.getException() != null) {
823 throw request.getException();
824 }
825 }
826
827
828
829
830 @Override
831 public void updateTrafficControl(NioSession session) {
832
833 }
834
835 protected void wakeup() {
836 selector.wakeup();
837 }
838
839
840
841
842 @Override
843 public void write(NioSession session, WriteRequest writeRequest) {
844
845 long currentTime = System.currentTimeMillis();
846 final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
847 final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
848 + (session.getConfig().getMaxReadBufferSize() >>> 1);
849
850 int writtenBytes = 0;
851
852
853
854 IoBuffer"../../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer buf = (IoBuffer) writeRequest.getMessage();
855
856 if (buf.remaining() == 0) {
857
858 session.setCurrentWriteRequest(null);
859 buf.reset();
860 session.getFilterChain().fireMessageSent(writeRequest);
861 return;
862 }
863
864
865 try {
866 for (;;) {
867 if (writeRequest == null) {
868 writeRequest = writeRequestQueue.poll(session);
869
870 if (writeRequest == null) {
871 setInterestedInWrite(session, false);
872 break;
873 }
874
875 session.setCurrentWriteRequest(writeRequest);
876 }
877
878 buf = (IoBuffer) writeRequest.getMessage();
879
880 if (buf.remaining() == 0) {
881
882 session.setCurrentWriteRequest(null);
883 session.getFilterChain().fireMessageSent(writeRequest);
884 continue;
885 }
886
887 SocketAddress destination = writeRequest.getDestination();
888
889 if (destination == null) {
890 destination = session.getRemoteAddress();
891 }
892
893 int localWrittenBytes = send(session, buf, destination);
894
895 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
896
897 setInterestedInWrite(session, true);
898
899 writeRequestQueue.offer(session, writeRequest);
900 scheduleFlush(session);
901
902 break;
903 } else {
904 setInterestedInWrite(session, false);
905
906
907 session.setCurrentWriteRequest(null);
908 writtenBytes += localWrittenBytes;
909 session.getFilterChain().fireMessageSent(writeRequest);
910
911 break;
912 }
913 }
914 } catch (Exception e) {
915 session.getFilterChain().fireExceptionCaught(e);
916 } finally {
917 session.increaseWrittenBytes(writtenBytes, currentTime);
918 }
919 }
920 }