1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.polling.AbstractPollingIoConnector;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoProcessor;
34 import org.apache.mina.core.service.IoService;
35 import org.apache.mina.core.service.SimpleIoProcessorPool;
36 import org.apache.mina.core.service.TransportMetadata;
37 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38 import org.apache.mina.transport.socket.SocketConnector;
39 import org.apache.mina.transport.socket.SocketSessionConfig;
40
41
42
43
44
45
46 public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
47 SocketConnector {
48
49 private volatile Selector selector;
50
51
52
53
54 public NioSocketConnector() {
55 super(new DefaultSocketSessionConfig(), NioProcessor.class);
56 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
57 }
58
59
60
61
62
63
64
65 public NioSocketConnector(int processorCount) {
66 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
67 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
68 }
69
70
71
72
73
74
75
76 public NioSocketConnector(IoProcessor<NioSession> processor) {
77 super(new DefaultSocketSessionConfig(), processor);
78 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79 }
80
81
82
83
84
85
86
87
88 public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
89 super(new DefaultSocketSessionConfig(), executor, processor);
90 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
105 super(new DefaultSocketSessionConfig(), processorClass, processorCount);
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119
120 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
121 super(new DefaultSocketSessionConfig(), processorClass);
122 }
123
124
125
126
127 @Override
128 protected void init() throws Exception {
129 this.selector = Selector.open();
130 }
131
132
133
134
135 @Override
136 protected void destroy() throws Exception {
137 if (selector != null) {
138 selector.close();
139 }
140 }
141
142
143
144
145 public TransportMetadata getTransportMetadata() {
146 return NioSocketSession.METADATA;
147 }
148
149
150
151
152 @Override
153 public SocketSessionConfig getSessionConfig() {
154 return (SocketSessionConfig) super.getSessionConfig();
155 }
156
157
158
159
160 @Override
161 public InetSocketAddress getDefaultRemoteAddress() {
162 return (InetSocketAddress) super.getDefaultRemoteAddress();
163 }
164
165
166
167
168 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
169 super.setDefaultRemoteAddress(defaultRemoteAddress);
170 }
171
172
173
174
175 @Override
176 protected Iterator<SocketChannel> allHandles() {
177 return new SocketChannelIterator(selector.keys());
178 }
179
180
181
182
183 @Override
184 protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
185 return handle.connect(remoteAddress);
186 }
187
188
189
190
191 @Override
192 protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
193 SelectionKey key = handle.keyFor(selector);
194
195 if ((key == null) || (!key.isValid())) {
196 return null;
197 }
198
199 return (ConnectionRequest) key.attachment();
200 }
201
202
203
204
205 @Override
206 protected void close(SocketChannel handle) throws Exception {
207 SelectionKey key = handle.keyFor(selector);
208
209 if (key != null) {
210 key.cancel();
211 }
212
213 handle.close();
214 }
215
216
217
218
219 @Override
220 protected boolean finishConnect(SocketChannel handle) throws Exception {
221 if (handle.finishConnect()) {
222 SelectionKey key = handle.keyFor(selector);
223
224 if (key != null) {
225 key.cancel();
226 }
227
228 return true;
229 }
230
231 return false;
232 }
233
234
235
236
237 @Override
238 protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
239 SocketChannel ch = SocketChannel.open();
240
241 int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
242 if (receiveBufferSize > 65535) {
243 ch.socket().setReceiveBufferSize(receiveBufferSize);
244 }
245
246 if (localAddress != null) {
247 ch.socket().bind(localAddress);
248 }
249 ch.configureBlocking(false);
250 return ch;
251 }
252
253
254
255
256 @Override
257 protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
258 return new NioSocketSession(this, processor, handle);
259 }
260
261
262
263
264 @Override
265 protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
266 handle.register(selector, SelectionKey.OP_CONNECT, request);
267 }
268
269
270
271
272 @Override
273 protected int select(int timeout) throws Exception {
274 return selector.select(timeout);
275 }
276
277
278
279
280 @Override
281 protected Iterator<SocketChannel> selectedHandles() {
282 return new SocketChannelIterator(selector.selectedKeys());
283 }
284
285
286
287
288 @Override
289 protected void wakeup() {
290 selector.wakeup();
291 }
292
293 private static class SocketChannelIterator implements Iterator<SocketChannel> {
294
295 private final Iterator<SelectionKey> i;
296
297 private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
298 this.i = selectedKeys.iterator();
299 }
300
301
302
303
304 public boolean hasNext() {
305 return i.hasNext();
306 }
307
308
309
310
311 public SocketChannel next() {
312 SelectionKey key = i.next();
313 return (SocketChannel) key.channel();
314 }
315
316
317
318
319 public void remove() {
320 i.remove();
321 }
322 }
323 }