1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.ByteChannel;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectableChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.nio.channels.spi.SelectorProvider;
30 import java.util.Iterator;
31 import java.util.Set;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.locks.ReadWriteLock;
34 import java.util.concurrent.locks.ReentrantReadWriteLock;
35
36 import org.apache.mina.core.RuntimeIoException;
37 import org.apache.mina.core.buffer.IoBuffer;
38 import org.apache.mina.core.file.FileRegion;
39 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
40 import org.apache.mina.core.session.SessionState;
41
42
43
44
45
46
47 public class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
48
49 protected Selector selector;
50
51
52 protected ReadWriteLock selectorLock = new ReentrantReadWriteLock();
53
54 protected SelectorProvider selectorProvider = null;
55
56
57
58
59
60
61
62 public NioProcessor(Executor executor) {
63 super(executor);
64
65 try {
66
67 selector = Selector.open();
68 } catch (IOException e) {
69 throw new RuntimeIoException("Failed to open a selector.", e);
70 }
71 }
72
73
74
75
76
77
78
79
80 public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
81 super(executor);
82
83 try {
84
85 if (selectorProvider == null) {
86 selector = Selector.open();
87 } else {
88 this.selectorProvider = selectorProvider;
89 selector = selectorProvider.openSelector();
90 }
91 } catch (IOException e) {
92 throw new RuntimeIoException("Failed to open a selector.", e);
93 }
94 }
95
96 @Override
97 protected void doDispose() throws Exception {
98 selectorLock.readLock().lock();
99
100 try {
101 selector.close();
102 } finally {
103 selectorLock.readLock().unlock();
104 }
105 }
106
107 @Override
108 protected int select(long timeout) throws Exception {
109 selectorLock.readLock().lock();
110
111 try {
112 return selector.select(timeout);
113 } finally {
114 selectorLock.readLock().unlock();
115 }
116 }
117
118 @Override
119 protected int select() throws Exception {
120 selectorLock.readLock().lock();
121
122 try {
123 return selector.select();
124 } finally {
125 selectorLock.readLock().unlock();
126 }
127 }
128
129 @Override
130 protected boolean isSelectorEmpty() {
131 selectorLock.readLock().lock();
132
133 try {
134 return selector.keys().isEmpty();
135 } finally {
136 selectorLock.readLock().unlock();
137 }
138 }
139
140 @Override
141 protected void wakeup() {
142 wakeupCalled.getAndSet(true);
143 selectorLock.readLock().lock();
144
145 try {
146 selector.wakeup();
147 } finally {
148 selectorLock.readLock().unlock();
149 }
150 }
151
152 @Override
153 protected Iterator<NioSession> allSessions() {
154 selectorLock.readLock().lock();
155
156 try {
157 return new IoSessionIterator(selector.keys());
158 } finally {
159 selectorLock.readLock().unlock();
160 }
161 }
162
163 @Override
164 protected int allSessionsCount()
165 {
166 return selector.keys().size();
167 }
168
169 @SuppressWarnings("synthetic-access")
170 @Override
171 protected Iterator<NioSession> selectedSessions() {
172 return new IoSessionIterator(selector.selectedKeys());
173 }
174
175 @Override
176 protected void init(NioSession session) throws Exception {
177 SelectableChannel ch = (SelectableChannel) session.getChannel();
178 ch.configureBlocking(false);
179 selectorLock.readLock().lock();
180
181 try {
182 session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
183 } finally {
184 selectorLock.readLock().unlock();
185 }
186 }
187
188 @Override
189 protected void destroy(NioSession session) throws Exception {
190 ByteChannel ch = session.getChannel();
191
192 SelectionKey key = session.getSelectionKey();
193
194 if (key != null) {
195 key.cancel();
196 }
197
198 if ( ch.isOpen() ) {
199 ch.close();
200 }
201 }
202
203
204
205
206
207
208 @Override
209 protected void registerNewSelector() throws IOException {
210 selectorLock.writeLock().lock();
211
212 try {
213 Set<SelectionKey> keys = selector.keys();
214 Selector newSelector;
215
216
217 if (selectorProvider == null) {
218 newSelector = Selector.open();
219 } else {
220 newSelector = selectorProvider.openSelector();
221 }
222
223
224 for (SelectionKey key : keys) {
225 SelectableChannel ch = key.channel();
226
227
228 NioSession/../../../../org/apache/mina/transport/socket/nio/NioSession.html#NioSession">NioSession session = (NioSession) key.attachment();
229 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
230 session.setSelectionKey(newKey);
231 }
232
233
234 selector.close();
235 selector = newSelector;
236 } finally {
237 selectorLock.writeLock().unlock();
238 }
239
240 }
241
242
243
244
245 @Override
246 protected boolean isBrokenConnection() throws IOException {
247
248 boolean brokenSession = false;
249
250 selectorLock.readLock().lock();
251
252 try {
253
254 Set<SelectionKey> keys = selector.keys();
255
256
257
258 for (SelectionKey key : keys) {
259 SelectableChannel channel = key.channel();
260
261 if (((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected())
262 || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
263
264
265 key.cancel();
266
267
268 brokenSession = true;
269 }
270 }
271 } finally {
272 selectorLock.readLock().unlock();
273 }
274
275 return brokenSession;
276 }
277
278
279
280
281 @Override
282 protected SessionState getState(NioSession session) {
283 SelectionKey key = session.getSelectionKey();
284
285 if (key == null) {
286
287 return SessionState.OPENING;
288 }
289
290 if (key.isValid()) {
291
292 return SessionState.OPENED;
293 } else {
294
295 return SessionState.CLOSING;
296 }
297 }
298
299 @Override
300 protected boolean isReadable(NioSession session) {
301 SelectionKey key = session.getSelectionKey();
302
303 return (key != null) && key.isValid() && key.isReadable();
304 }
305
306 @Override
307 protected boolean isWritable(NioSession session) {
308 SelectionKey key = session.getSelectionKey();
309
310 return (key != null) && key.isValid() && key.isWritable();
311 }
312
313 @Override
314 protected boolean isInterestedInRead(NioSession session) {
315 SelectionKey key = session.getSelectionKey();
316
317 return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
318 }
319
320 @Override
321 protected boolean isInterestedInWrite(NioSession session) {
322 SelectionKey key = session.getSelectionKey();
323
324 return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
325 }
326
327
328
329
330 @Override
331 protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
332 SelectionKey key = session.getSelectionKey();
333
334 if ((key == null) || !key.isValid()) {
335 return;
336 }
337
338 int oldInterestOps = key.interestOps();
339 int newInterestOps = oldInterestOps;
340
341 if (isInterested) {
342 newInterestOps |= SelectionKey.OP_READ;
343 } else {
344 newInterestOps &= ~SelectionKey.OP_READ;
345 }
346
347 if (oldInterestOps != newInterestOps) {
348 key.interestOps(newInterestOps);
349 }
350 }
351
352
353
354
355 @Override
356 protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
357 SelectionKey key = session.getSelectionKey();
358
359 if ((key == null) || !key.isValid()) {
360 return;
361 }
362
363 int newInterestOps = key.interestOps();
364
365 if (isInterested) {
366 newInterestOps |= SelectionKey.OP_WRITE;
367 } else {
368 newInterestOps &= ~SelectionKey.OP_WRITE;
369 }
370
371 key.interestOps(newInterestOps);
372 }
373
374 @Override
375 protected int read(NioSession session, IoBuffer buf) throws Exception {
376 ByteChannel channel = session.getChannel();
377
378 return channel.read(buf.buf());
379 }
380
381 @Override
382 protected int write(NioSession session, IoBuffer buf, int length) throws IOException {
383 if (buf.remaining() <= length) {
384 return session.getChannel().write(buf.buf());
385 }
386
387 int oldLimit = buf.limit();
388 buf.limit(buf.position() + length);
389 try {
390 return session.getChannel().write(buf.buf());
391 } finally {
392 buf.limit(oldLimit);
393 }
394 }
395
396 @Override
397 protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
398 try {
399 return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
400 } catch (IOException e) {
401
402
403 String message = e.getMessage();
404 if ((message != null) && message.contains("temporarily unavailable")) {
405 return 0;
406 }
407
408 throw e;
409 }
410 }
411
412
413
414
415
416 protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
417 private final Iterator<SelectionKey> iterator;
418
419
420
421
422
423
424
425 private IoSessionIterator(Set<SelectionKey> keys) {
426 iterator = keys.iterator();
427 }
428
429
430
431
432 @Override
433 public boolean hasNext() {
434 return iterator.hasNext();
435 }
436
437
438
439
440 @Override
441 public NioSession next() {
442 SelectionKey key = iterator.next();
443
444 return (NioSession) key.attachment();
445 }
446
447
448
449
450 @Override
451 public void remove() {
452 iterator.remove();
453 }
454 }
455 }