1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.ByteChannel;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectableChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.Iterator;
30 import java.util.Set;
31 import java.util.concurrent.Executor;
32
33 import org.apache.mina.core.RuntimeIoException;
34 import org.apache.mina.core.buffer.IoBuffer;
35 import org.apache.mina.core.file.FileRegion;
36 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
37 import org.apache.mina.core.session.SessionState;
38
39
40
41
42
43
44 public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
45
46 private Selector selector;
47
48
49
50
51
52
53
54 public NioProcessor(Executor executor) {
55 super(executor);
56
57 try {
58
59 selector = Selector.open();
60 } catch (IOException e) {
61 throw new RuntimeIoException("Failed to open a selector.", e);
62 }
63 }
64
65 @Override
66 protected void doDispose() throws Exception {
67 selector.close();
68 }
69
70 @Override
71 protected int select(long timeout) throws Exception {
72 return selector.select(timeout);
73 }
74
75 @Override
76 protected int select() throws Exception {
77 return selector.select();
78 }
79
80 @Override
81 protected boolean isSelectorEmpty() {
82 return selector.keys().isEmpty();
83 }
84
85 @Override
86 protected void wakeup() {
87 wakeupCalled.getAndSet(true);
88 selector.wakeup();
89 }
90
91 @Override
92 protected Iterator<NioSession> allSessions() {
93 return new IoSessionIterator(selector.keys());
94 }
95
96 @SuppressWarnings("synthetic-access")
97 @Override
98 protected Iterator<NioSession> selectedSessions() {
99 return new IoSessionIterator(selector.selectedKeys());
100 }
101
102 @Override
103 protected void init(NioSession session) throws Exception {
104 SelectableChannel ch = (SelectableChannel) session.getChannel();
105 ch.configureBlocking(false);
106 session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,
107 session));
108 }
109
110 @Override
111 protected void destroy(NioSession session) throws Exception {
112 ByteChannel ch = session.getChannel();
113 SelectionKey key = session.getSelectionKey();
114 if (key != null) {
115 key.cancel();
116 }
117 ch.close();
118 }
119
120
121
122
123
124
125
126 @Override
127 protected void registerNewSelector() throws IOException {
128 synchronized (selector) {
129 Set<SelectionKey> keys = selector.keys();
130
131
132 Selector newSelector = Selector.open();
133
134
135 for (SelectionKey key : keys) {
136 SelectableChannel ch = key.channel();
137
138
139 NioSession session = (NioSession)key.attachment();
140 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
141 session.setSelectionKey( newKey );
142 }
143
144
145 selector.close();
146 selector = newSelector;
147 }
148 }
149
150
151
152
153 @Override
154 protected boolean isBrokenConnection() throws IOException {
155
156 boolean brokenSession = false;
157
158 synchronized (selector) {
159
160 Set<SelectionKey> keys = selector.keys();
161
162
163
164 for (SelectionKey key : keys) {
165 SelectableChannel channel = key.channel();
166
167 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel)
168 .isConnected()))
169 || ((channel instanceof SocketChannel) && !((SocketChannel) channel)
170 .isConnected())) {
171
172
173 key.cancel();
174
175
176 brokenSession = true;
177 }
178 }
179 }
180
181 return brokenSession;
182 }
183
184
185
186
187 @Override
188 protected SessionState getState(NioSession session) {
189 SelectionKey key = session.getSelectionKey();
190
191 if (key == null) {
192
193 return SessionState.OPENING;
194 }
195
196 if (key.isValid()) {
197
198 return SessionState.OPENED;
199 } else {
200
201 return SessionState.CLOSING;
202 }
203 }
204
205 @Override
206 protected boolean isReadable(NioSession session) {
207 SelectionKey key = session.getSelectionKey();
208 return key.isValid() && key.isReadable();
209 }
210
211 @Override
212 protected boolean isWritable(NioSession session) {
213 SelectionKey key = session.getSelectionKey();
214 return key.isValid() && key.isWritable();
215 }
216
217 @Override
218 protected boolean isInterestedInRead(NioSession session) {
219 SelectionKey key = session.getSelectionKey();
220 return key.isValid() && ( (key.interestOps() & SelectionKey.OP_READ) != 0 );
221 }
222
223 @Override
224 protected boolean isInterestedInWrite(NioSession session) {
225 SelectionKey key = session.getSelectionKey();
226 return key.isValid()
227 && ( (key.interestOps() & SelectionKey.OP_WRITE) != 0 );
228 }
229
230
231
232
233 @Override
234 protected void setInterestedInRead(NioSession session, boolean isInterested)
235 throws Exception {
236 SelectionKey key = session.getSelectionKey();
237 int oldInterestOps = key.interestOps();
238 int newInterestOps = oldInterestOps;
239
240 if (isInterested) {
241 newInterestOps |= SelectionKey.OP_READ;
242 } else {
243 newInterestOps &= ~SelectionKey.OP_READ;
244 }
245
246 if (oldInterestOps != newInterestOps) {
247 key.interestOps(newInterestOps);
248 }
249 }
250
251
252
253
254 @Override
255 protected void setInterestedInWrite(NioSession session, boolean isInterested)
256 throws Exception {
257 SelectionKey key = session.getSelectionKey();
258
259 if (key == null) {
260 return;
261 }
262
263 int newInterestOps = key.interestOps();
264
265 if (isInterested) {
266 newInterestOps |= SelectionKey.OP_WRITE;
267
268 } else {
269 newInterestOps &= ~SelectionKey.OP_WRITE;
270
271 }
272
273 key.interestOps(newInterestOps);
274 }
275
276 @Override
277 protected int read(NioSession session, IoBuffer buf) throws Exception {
278 ByteChannel channel = session.getChannel();
279
280 return channel.read(buf.buf());
281 }
282
283 @Override
284 protected int write(NioSession session, IoBuffer buf, int length)
285 throws Exception {
286 if (buf.remaining() <= length) {
287 return session.getChannel().write(buf.buf());
288 }
289
290 int oldLimit = buf.limit();
291 buf.limit(buf.position() + length);
292 try {
293 return session.getChannel().write(buf.buf());
294 } finally {
295 buf.limit(oldLimit);
296 }
297 }
298
299 @Override
300 protected int transferFile(NioSession session, FileRegion region, int length)
301 throws Exception {
302 try {
303 return (int) region.getFileChannel().transferTo(
304 region.getPosition(), length, session.getChannel());
305 } catch (IOException e) {
306
307
308 String message = e.getMessage();
309 if (( message != null ) && message.contains("temporarily unavailable")) {
310 return 0;
311 }
312
313 throw e;
314 }
315 }
316
317
318
319
320
321 protected static class IoSessionIterator<NioSession> implements
322 Iterator<NioSession> {
323 private final Iterator<SelectionKey> iterator;
324
325
326
327
328
329
330
331 private IoSessionIterator(Set<SelectionKey> keys) {
332 iterator = keys.iterator();
333 }
334
335
336
337
338 public boolean hasNext() {
339 return iterator.hasNext();
340 }
341
342
343
344
345 public NioSession next() {
346 SelectionKey key = iterator.next();
347 NioSession nioSession = (NioSession) key.attachment();
348 return nioSession;
349 }
350
351
352
353
354 public void remove() {
355 iterator.remove();
356 }
357 }
358 }