1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.Collection;
29 import java.util.Iterator;
30 import java.util.concurrent.Executor;
31
32 import org.apache.mina.core.polling.AbstractPollingIoConnector;
33 import org.apache.mina.core.service.IoConnector;
34 import org.apache.mina.core.service.IoProcessor;
35 import org.apache.mina.core.service.IoService;
36 import org.apache.mina.core.service.SimpleIoProcessorPool;
37 import org.apache.mina.core.service.TransportMetadata;
38 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
39 import org.apache.mina.transport.socket.SocketConnector;
40 import org.apache.mina.transport.socket.SocketSessionConfig;
41
42
43
44
45
46
47 public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
48 SocketConnector {
49
50 private volatile Selector selector;
51
52
53
54
55 public NioSocketConnector() {
56 super(new DefaultSocketSessionConfig(), NioProcessor.class);
57 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
58 }
59
60
61
62
63
64
65
66 public NioSocketConnector(int processorCount) {
67 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
68 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69 }
70
71
72
73
74
75
76
77 public NioSocketConnector(IoProcessor<NioSession> processor) {
78 super(new DefaultSocketSessionConfig(), processor);
79 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80 }
81
82
83
84
85
86
87
88
89 public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
90 super(new DefaultSocketSessionConfig(), executor, processor);
91 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
106 super(new DefaultSocketSessionConfig(), processorClass, processorCount);
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
121 super(new DefaultSocketSessionConfig(), processorClass);
122 }
123
124
125
126
127 @Override
128 protected void init() throws Exception {
129 this.selector = Selector.open();
130 }
131
132
133
134
135 @Override
136 protected void destroy() throws Exception {
137 if (selector != null) {
138 selector.close();
139 }
140 }
141
142
143
144
145 @Override
146 public TransportMetadata getTransportMetadata() {
147 return NioSocketSession.METADATA;
148 }
149
150
151
152
153 @Override
154 public SocketSessionConfig getSessionConfig() {
155 return (SocketSessionConfig) sessionConfig;
156 }
157
158
159
160
161 @Override
162 public InetSocketAddress getDefaultRemoteAddress() {
163 return (InetSocketAddress) super.getDefaultRemoteAddress();
164 }
165
166
167
168
169 @Override
170 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
171 super.setDefaultRemoteAddress(defaultRemoteAddress);
172 }
173
174
175
176
177 @Override
178 protected Iterator<SocketChannel> allHandles() {
179 return new SocketChannelIterator(selector.keys());
180 }
181
182
183
184
185 @Override
186 protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
187 return handle.connect(remoteAddress);
188 }
189
190
191
192
193 @Override
194 protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
195 SelectionKey key = handle.keyFor(selector);
196
197 if ((key == null) || (!key.isValid())) {
198 return null;
199 }
200
201 return (ConnectionRequest) key.attachment();
202 }
203
204
205
206
207 @Override
208 protected void close(SocketChannel handle) throws Exception {
209 SelectionKey key = handle.keyFor(selector);
210
211 if (key != null) {
212 key.cancel();
213 }
214
215 handle.close();
216 }
217
218
219
220
221 @Override
222 protected boolean finishConnect(SocketChannel handle) throws Exception {
223 if (handle.finishConnect()) {
224 SelectionKey key = handle.keyFor(selector);
225
226 if (key != null) {
227 key.cancel();
228 }
229
230 return true;
231 }
232
233 return false;
234 }
235
236
237
238
239 @Override
240 protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
241 SocketChannel ch = SocketChannel.open();
242
243 int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
244
245 if (receiveBufferSize > 65535) {
246 ch.socket().setReceiveBufferSize(receiveBufferSize);
247 }
248
249 if (localAddress != null) {
250 try {
251 ch.socket().bind(localAddress);
252 } catch (IOException ioe) {
253
254
255 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
256 + ioe.getMessage();
257 Exception e = new IOException(newMessage);
258 e.initCause(ioe.getCause());
259
260
261 ch.close();
262 throw e;
263 }
264 }
265
266 ch.configureBlocking(false);
267
268 return ch;
269 }
270
271
272
273
274 @Override
275 protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
276 return new NioSocketSession(this, processor, handle);
277 }
278
279
280
281
282 @Override
283 protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
284 handle.register(selector, SelectionKey.OP_CONNECT, request);
285 }
286
287
288
289
290 @Override
291 protected int select(int timeout) throws Exception {
292 return selector.select(timeout);
293 }
294
295
296
297
298 @Override
299 protected Iterator<SocketChannel> selectedHandles() {
300 return new SocketChannelIterator(selector.selectedKeys());
301 }
302
303
304
305
306 @Override
307 protected void wakeup() {
308 selector.wakeup();
309 }
310
311 private static class SocketChannelIterator implements Iterator<SocketChannel> {
312
313 private final Iterator<SelectionKey> i;
314
315 private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
316 this.i = selectedKeys.iterator();
317 }
318
319
320
321
322 @Override
323 public boolean hasNext() {
324 return i.hasNext();
325 }
326
327
328
329
330 @Override
331 public SocketChannel next() {
332 SelectionKey key = i.next();
333 return (SocketChannel) key.channel();
334 }
335
336
337
338
339 @Override
340 public void remove() {
341 i.remove();
342 }
343 }
344 }