1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.polling.AbstractPollingIoConnector;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoProcessor;
34 import org.apache.mina.core.service.IoService;
35 import org.apache.mina.core.service.SimpleIoProcessorPool;
36 import org.apache.mina.core.service.TransportMetadata;
37 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38 import org.apache.mina.transport.socket.SocketConnector;
39 import org.apache.mina.transport.socket.SocketSessionConfig;
40
41
42
43
44
45
46 public final class NioSocketConnector
47 extends AbstractPollingIoConnector<NioSession, SocketChannel>
48 implements SocketConnector {
49
50 private volatile Selector selector;
51
52
53
54
55 public NioSocketConnector() {
56 super(new DefaultSocketSessionConfig(), NioProcessor.class);
57 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
58 }
59
60
61
62
63
64
65
66 public NioSocketConnector(int processorCount) {
67 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
68 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69 }
70
71
72
73
74
75
76
77 public NioSocketConnector(IoProcessor<NioSession> processor) {
78 super(new DefaultSocketSessionConfig(), processor);
79 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80 }
81
82
83
84
85
86
87
88
89 public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
90 super(new DefaultSocketSessionConfig(), executor, processor);
91 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass,
106 int processorCount) {
107 super(new DefaultSocketSessionConfig(), processorClass, processorCount);
108 }
109
110
111
112
113
114
115
116
117
118
119
120
121
122 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
123 super(new DefaultSocketSessionConfig(), processorClass);
124 }
125
126
127
128
129 @Override
130 protected void init() throws Exception {
131 this.selector = Selector.open();
132 }
133
134
135
136
137 @Override
138 protected void destroy() throws Exception {
139 if (selector != null) {
140 selector.close();
141 }
142 }
143
144
145
146
147 public TransportMetadata getTransportMetadata() {
148 return NioSocketSession.METADATA;
149 }
150
151
152
153
154 @Override
155 public SocketSessionConfig getSessionConfig() {
156 return (SocketSessionConfig) super.getSessionConfig();
157 }
158
159
160
161
162 @Override
163 public InetSocketAddress getDefaultRemoteAddress() {
164 return (InetSocketAddress) super.getDefaultRemoteAddress();
165 }
166
167
168
169
170 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
171 super.setDefaultRemoteAddress(defaultRemoteAddress);
172 }
173
174
175
176
177 @Override
178 protected Iterator<SocketChannel> allHandles() {
179 return new SocketChannelIterator(selector.keys());
180 }
181
182
183
184
185 @Override
186 protected boolean connect(SocketChannel handle, SocketAddress remoteAddress)
187 throws Exception {
188 return handle.connect(remoteAddress);
189 }
190
191
192
193
194 @Override
195 protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
196 SelectionKey key = handle.keyFor(selector);
197
198 if ((key == null) || (!key.isValid())) {
199 return null;
200 }
201
202 return (ConnectionRequest) key.attachment();
203 }
204
205
206
207
208 @Override
209 protected void close(SocketChannel handle) throws Exception {
210 SelectionKey key = handle.keyFor(selector);
211
212 if (key != null) {
213 key.cancel();
214 }
215
216 handle.close();
217 }
218
219
220
221
222 @Override
223 protected boolean finishConnect(SocketChannel handle) throws Exception {
224 if (handle.finishConnect()) {
225 SelectionKey key = handle.keyFor(selector);
226
227 if (key != null) {
228 key.cancel();
229 }
230
231 return true;
232 }
233
234 return false;
235 }
236
237
238
239
240 @Override
241 protected SocketChannel newHandle(SocketAddress localAddress)
242 throws Exception {
243 SocketChannel ch = SocketChannel.open();
244
245 int receiveBufferSize =
246 (getSessionConfig()).getReceiveBufferSize();
247 if (receiveBufferSize > 65535) {
248 ch.socket().setReceiveBufferSize(receiveBufferSize);
249 }
250
251 if (localAddress != null) {
252 ch.socket().bind(localAddress);
253 }
254 ch.configureBlocking(false);
255 return ch;
256 }
257
258
259
260
261 @Override
262 protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
263 return new NioSocketSession(this, processor, handle);
264 }
265
266
267
268
269 @Override
270 protected void register(SocketChannel handle, ConnectionRequest request)
271 throws Exception {
272 handle.register(selector, SelectionKey.OP_CONNECT, request);
273 }
274
275
276
277
278 @Override
279 protected int select(int timeout) throws Exception {
280 return selector.select(timeout);
281 }
282
283
284
285
286 @Override
287 protected Iterator<SocketChannel> selectedHandles() {
288 return new SocketChannelIterator(selector.selectedKeys());
289 }
290
291
292
293
294 @Override
295 protected void wakeup() {
296 selector.wakeup();
297 }
298
299 private static class SocketChannelIterator implements Iterator<SocketChannel> {
300
301 private final Iterator<SelectionKey> i;
302
303 private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
304 this.i = selectedKeys.iterator();
305 }
306
307
308
309
310 public boolean hasNext() {
311 return i.hasNext();
312 }
313
314
315
316
317 public SocketChannel next() {
318 SelectionKey key = i.next();
319 return (SocketChannel) key.channel();
320 }
321
322
323
324
325 public void remove() {
326 i.remove();
327 }
328 }
329 }