1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.polling.AbstractPollingIoConnector;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoProcessor;
34 import org.apache.mina.core.service.SimpleIoProcessorPool;
35 import org.apache.mina.core.service.TransportMetadata;
36 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
37 import org.apache.mina.transport.socket.SocketConnector;
38 import org.apache.mina.transport.socket.SocketSessionConfig;
39
40
41
42
43
44
45 public final class NioSocketConnector
46 extends AbstractPollingIoConnector<NioSession, SocketChannel>
47 implements SocketConnector {
48
49 private volatile Selector selector;
50
51
52
53
54 public NioSocketConnector() {
55 super(new DefaultSocketSessionConfig(), NioProcessor.class);
56 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
57 }
58
59
60
61
62
63
64
65 public NioSocketConnector(int processorCount) {
66 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
67 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
68 }
69
70
71
72
73
74
75
76 public NioSocketConnector(IoProcessor<NioSession> processor) {
77 super(new DefaultSocketSessionConfig(), processor);
78 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79 }
80
81
82
83
84
85
86
87
88 public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
89 super(new DefaultSocketSessionConfig(), executor, processor);
90 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91 }
92
93
94
95
96
97
98
99
100
101
102
103
104 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass,
105 int processorCount) {
106 super(new DefaultSocketSessionConfig(), processorClass, processorCount);
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121 public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
122 super(new DefaultSocketSessionConfig(), processorClass);
123 }
124
125
126
127
128 @Override
129 protected void init() throws Exception {
130 this.selector = Selector.open();
131 }
132
133
134
135
136 @Override
137 protected void destroy() throws Exception {
138 if (selector != null) {
139 selector.close();
140 }
141 }
142
143
144
145
146 public TransportMetadata getTransportMetadata() {
147 return NioSocketSession.METADATA;
148 }
149
150
151
152
153 @Override
154 public SocketSessionConfig getSessionConfig() {
155 return (SocketSessionConfig) super.getSessionConfig();
156 }
157
158
159
160
161 @Override
162 public InetSocketAddress getDefaultRemoteAddress() {
163 return (InetSocketAddress) super.getDefaultRemoteAddress();
164 }
165
166
167
168
169 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
170 super.setDefaultRemoteAddress(defaultRemoteAddress);
171 }
172
173
174
175
176 @Override
177 protected Iterator<SocketChannel> allHandles() {
178 return new SocketChannelIterator(selector.keys());
179 }
180
181
182
183
184 @Override
185 protected boolean connect(SocketChannel handle, SocketAddress remoteAddress)
186 throws Exception {
187 return handle.connect(remoteAddress);
188 }
189
190
191
192
193 @Override
194 protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
195 SelectionKey key = handle.keyFor(selector);
196
197 if ((key == null) || (!key.isValid())) {
198 return null;
199 }
200
201 return (ConnectionRequest) key.attachment();
202 }
203
204
205
206
207 @Override
208 protected void close(SocketChannel handle) throws Exception {
209 SelectionKey key = handle.keyFor(selector);
210
211 if (key != null) {
212 key.cancel();
213 }
214
215 handle.close();
216 }
217
218
219
220
221 @Override
222 protected boolean finishConnect(SocketChannel handle) throws Exception {
223 if (handle.finishConnect()) {
224 SelectionKey key = handle.keyFor(selector);
225
226 if (key != null) {
227 key.cancel();
228 }
229
230 return true;
231 }
232
233 return false;
234 }
235
236
237
238
239 @Override
240 protected SocketChannel newHandle(SocketAddress localAddress)
241 throws Exception {
242 SocketChannel ch = SocketChannel.open();
243
244 int receiveBufferSize =
245 (getSessionConfig()).getReceiveBufferSize();
246 if (receiveBufferSize > 65535) {
247 ch.socket().setReceiveBufferSize(receiveBufferSize);
248 }
249
250 if (localAddress != null) {
251 ch.socket().bind(localAddress);
252 }
253 ch.configureBlocking(false);
254 return ch;
255 }
256
257
258
259
260 @Override
261 protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
262 return new NioSocketSession(this, processor, handle);
263 }
264
265
266
267
268 @Override
269 protected void register(SocketChannel handle, ConnectionRequest request)
270 throws Exception {
271 handle.register(selector, SelectionKey.OP_CONNECT, request);
272 }
273
274
275
276
277 @Override
278 protected int select(int timeout) throws Exception {
279 return selector.select(timeout);
280 }
281
282
283
284
285 @Override
286 protected Iterator<SocketChannel> selectedHandles() {
287 return new SocketChannelIterator(selector.selectedKeys());
288 }
289
290
291
292
293 @Override
294 protected void wakeup() {
295 selector.wakeup();
296 }
297
298 private static class SocketChannelIterator implements Iterator<SocketChannel> {
299
300 private final Iterator<SelectionKey> i;
301
302 private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
303 this.i = selectedKeys.iterator();
304 }
305
306
307
308
309 public boolean hasNext() {
310 return i.hasNext();
311 }
312
313
314
315
316 public SocketChannel next() {
317 SelectionKey key = i.next();
318 return (SocketChannel) key.channel();
319 }
320
321
322
323
324 public void remove() {
325 i.remove();
326 }
327 }
328 }