1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.buffer.IoBuffer;
32 import org.apache.mina.core.polling.AbstractPollingConnectionlessIoAcceptor;
33 import org.apache.mina.core.service.IoAcceptor;
34 import org.apache.mina.core.service.IoProcessor;
35 import org.apache.mina.core.service.TransportMetadata;
36 import org.apache.mina.transport.socket.DatagramAcceptor;
37 import org.apache.mina.transport.socket.DatagramSessionConfig;
38 import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
39
40
41
42
43
44
45
46 public final class NioDatagramAcceptor
47 extends AbstractPollingConnectionlessIoAcceptor<NioSession, DatagramChannel>
48 implements DatagramAcceptor {
49
50 private volatile Selector selector;
51
52
53
54
55 public NioDatagramAcceptor() {
56 super(new DefaultDatagramSessionConfig());
57 }
58
59
60
61
62 public NioDatagramAcceptor(Executor executor) {
63 super(new DefaultDatagramSessionConfig(), executor);
64 }
65
66 @Override
67 protected void init() throws Exception {
68 this.selector = Selector.open();
69 }
70
71 @Override
72 protected void destroy() throws Exception {
73 if (selector != null) {
74 selector.close();
75 }
76 }
77
78 public TransportMetadata getTransportMetadata() {
79 return NioDatagramSession.METADATA;
80 }
81
82 @Override
83 public DatagramSessionConfig getSessionConfig() {
84 return (DatagramSessionConfig) super.getSessionConfig();
85 }
86
87 @Override
88 public InetSocketAddress getLocalAddress() {
89 return (InetSocketAddress) super.getLocalAddress();
90 }
91
92 @Override
93 public InetSocketAddress getDefaultLocalAddress() {
94 return (InetSocketAddress) super.getDefaultLocalAddress();
95 }
96
97 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
98 setDefaultLocalAddress((SocketAddress) localAddress);
99 }
100
101 @Override
102 protected DatagramChannel open(SocketAddress localAddress) throws Exception {
103 final DatagramChannel c = DatagramChannel.open();
104 boolean success = false;
105 try {
106 new NioDatagramSessionConfig(c).setAll(getSessionConfig());
107 c.configureBlocking(false);
108 c.socket().bind(localAddress);
109 c.register(selector, SelectionKey.OP_READ);
110 success = true;
111 } finally {
112 if (!success) {
113 close(c);
114 }
115 }
116
117 return c;
118 }
119
120 @Override
121 protected boolean isReadable(DatagramChannel handle) {
122 SelectionKey key = handle.keyFor(selector);
123 if (key == null) {
124 return false;
125 }
126 if (!key.isValid()) {
127 return false;
128 }
129 return key.isReadable();
130 }
131
132 @Override
133 protected boolean isWritable(DatagramChannel handle) {
134 SelectionKey key = handle.keyFor(selector);
135 if (key == null) {
136 return false;
137 }
138 if (!key.isValid()) {
139 return false;
140 }
141 return key.isWritable();
142 }
143
144 @Override
145 protected SocketAddress localAddress(DatagramChannel handle)
146 throws Exception {
147 return handle.socket().getLocalSocketAddress();
148 }
149
150 @Override
151 protected NioSession newSession(
152 IoProcessor<NioSession> processor, DatagramChannel handle,
153 SocketAddress remoteAddress) {
154 SelectionKey key = handle.keyFor(selector);
155 if (key == null) {
156 return null;
157 }
158 NioDatagramSession newSession = new NioDatagramSession(
159 this, handle, processor, remoteAddress);
160 newSession.setSelectionKey(key);
161
162 return newSession;
163 }
164
165 @Override
166 protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer)
167 throws Exception {
168 return handle.receive(buffer.buf());
169 }
170
171 @Override
172 protected boolean select(int timeout) throws Exception {
173 return selector.select(timeout) > 0;
174 }
175
176 @Override
177 protected Iterator<DatagramChannel> selectedHandles() {
178 return new DatagramChannelIterator(selector.selectedKeys());
179 }
180
181 @Override
182 protected int send(NioSession session, IoBuffer buffer,
183 SocketAddress remoteAddress) throws Exception {
184 return ((DatagramChannel) session.getChannel()).send(
185 buffer.buf(), remoteAddress);
186 }
187
188 @Override
189 protected void setInterestedInWrite(NioSession session, boolean interested)
190 throws Exception {
191 SelectionKey key = session.getSelectionKey();
192 if (key == null) {
193 return;
194 }
195
196 if (interested) {
197 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
198 } else {
199 key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
200 }
201 }
202
203 @Override
204 protected void close(DatagramChannel handle) throws Exception {
205 SelectionKey key = handle.keyFor(selector);
206 if (key != null) {
207 key.cancel();
208 }
209 handle.disconnect();
210 handle.close();
211 }
212
213 @Override
214 protected void wakeup() {
215 selector.wakeup();
216 }
217
218 private static class DatagramChannelIterator implements Iterator<DatagramChannel> {
219
220 private final Iterator<SelectionKey> i;
221
222 private DatagramChannelIterator(Collection<SelectionKey> keys) {
223 this.i = keys.iterator();
224 }
225
226 public boolean hasNext() {
227 return i.hasNext();
228 }
229
230 public DatagramChannel next() {
231 return (DatagramChannel) i.next().channel();
232 }
233
234 public void remove() {
235 i.remove();
236 }
237
238 }
239 }