1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.nio.channels.ByteChannel;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Set;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.RuntimeIoException;
32 import org.apache.mina.core.buffer.IoBuffer;
33 import org.apache.mina.core.file.FileRegion;
34 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35
36
37
38
39
40
41 public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
42
43 private final Selector selector;
44
45
46
47
48
49
50
51 public NioProcessor(Executor executor) {
52 super(executor);
53 try {
54
55 selector = Selector.open();
56 } catch (IOException e) {
57 throw new RuntimeIoException("Failed to open a selector.", e);
58 }
59 }
60
61 @Override
62 protected void dispose0() throws Exception {
63 selector.close();
64 }
65
66 @Override
67 protected int select(long timeout) throws Exception {
68 return selector.select(timeout);
69 }
70
71 @Override
72 protected int select() throws Exception {
73 return selector.select();
74 }
75
76 @Override
77 protected boolean isSelectorEmpty() {
78 return selector.keys().isEmpty();
79 }
80
81 @Override
82 protected void wakeup() {
83 selector.wakeup();
84 }
85
86 @Override
87 protected Iterator<NioSession> allSessions() {
88 return new IoSessionIterator(selector.keys());
89 }
90
91 @Override
92 protected Iterator<NioSession> selectedSessions() {
93 return new IoSessionIterator(selector.selectedKeys());
94 }
95
96 @Override
97 protected void init(NioSession session) throws Exception {
98 SelectableChannel ch = (SelectableChannel) session.getChannel();
99 ch.configureBlocking(false);
100 session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
101 }
102
103 @Override
104 protected void destroy(NioSession session) throws Exception {
105 ByteChannel ch = session.getChannel();
106 SelectionKey key = session.getSelectionKey();
107 if (key != null) {
108 key.cancel();
109 }
110 ch.close();
111 }
112
113 @Override
114 protected SessionState state(NioSession session) {
115 SelectionKey key = session.getSelectionKey();
116 if (key == null) {
117 return SessionState.PREPARING;
118 }
119
120 return key.isValid()? SessionState.OPEN : SessionState.CLOSED;
121 }
122
123 @Override
124 protected boolean isReadable(NioSession session) {
125 SelectionKey key = session.getSelectionKey();
126 return key.isValid() && key.isReadable();
127 }
128
129 @Override
130 protected boolean isWritable(NioSession session) {
131 SelectionKey key = session.getSelectionKey();
132 return key.isValid() && key.isWritable();
133 }
134
135 @Override
136 protected boolean isInterestedInRead(NioSession session) {
137 SelectionKey key = session.getSelectionKey();
138 return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
139 }
140
141 @Override
142 protected boolean isInterestedInWrite(NioSession session) {
143 SelectionKey key = session.getSelectionKey();
144 return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
145 }
146
147 @Override
148 protected void setInterestedInRead(NioSession session, boolean value) throws Exception {
149 SelectionKey key = session.getSelectionKey();
150 int oldInterestOps = key.interestOps();
151 int newInterestOps;
152 if (value) {
153 newInterestOps = oldInterestOps | SelectionKey.OP_READ;
154 } else {
155 newInterestOps = oldInterestOps & ~SelectionKey.OP_READ;
156 }
157 if (oldInterestOps != newInterestOps) {
158 key.interestOps(newInterestOps);
159 }
160 }
161
162 @Override
163 protected void setInterestedInWrite(NioSession session, boolean value) throws Exception {
164 SelectionKey key = session.getSelectionKey();
165 int oldInterestOps = key.interestOps();
166 int newInterestOps;
167 if (value) {
168 newInterestOps = oldInterestOps | SelectionKey.OP_WRITE;
169 } else {
170 newInterestOps = oldInterestOps & ~SelectionKey.OP_WRITE;
171 }
172 if (oldInterestOps != newInterestOps) {
173 key.interestOps(newInterestOps);
174 }
175 }
176
177 @Override
178 protected int read(NioSession session, IoBuffer buf) throws Exception {
179 return session.getChannel().read(buf.buf());
180 }
181
182 @Override
183 protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
184 if (buf.remaining() <= length) {
185 return session.getChannel().write(buf.buf());
186 }
187
188 int oldLimit = buf.limit();
189 buf.limit(buf.position() + length);
190 try {
191 return session.getChannel().write(buf.buf());
192 } finally {
193 buf.limit(oldLimit);
194 }
195 }
196
197 @Override
198 protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
199 try {
200 return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
201 } catch (IOException e) {
202
203
204 String message = e.getMessage();
205 if (message != null && message.contains("temporarily unavailable")) {
206 return 0;
207 }
208
209 throw e;
210 }
211 }
212
213
214
215
216
217 protected static class IoSessionIterator implements Iterator<NioSession> {
218 private final Iterator<SelectionKey> iterator;
219
220
221
222
223
224
225 private IoSessionIterator(Set<SelectionKey> keys) {
226 iterator = keys.iterator();
227 }
228
229
230
231
232 public boolean hasNext() {
233 return iterator.hasNext();
234 }
235
236
237
238
239 public NioSession next() {
240 SelectionKey key = iterator.next();
241 NioSession nioSession = (NioSession) key.attachment();
242 return nioSession;
243 }
244
245
246
247
248 public void remove() {
249 iterator.remove();
250 }
251 }
252 }