1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.concurrent.Executor;
30
31 import org.apache.mina.core.polling.AbstractPollingIoConnector;
32 import org.apache.mina.core.service.IoConnector;
33 import org.apache.mina.core.service.IoProcessor;
34 import org.apache.mina.core.service.SimpleIoProcessorPool;
35 import org.apache.mina.core.service.TransportMetadata;
36 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
37 import org.apache.mina.transport.socket.SocketConnector;
38 import org.apache.mina.transport.socket.SocketSessionConfig;
39
40
41
42
43
44
45
46 public final class NioSocketConnector
47 extends AbstractPollingIoConnector<NioSession, SocketChannel>
48 implements SocketConnector {
49
50 private volatile Selector selector;
51
52
53
54
55 public NioSocketConnector() {
56 super(new DefaultSocketSessionConfig(), NioProcessor.class);
57 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
58 }
59
60
61
62
63
64
65
66 public NioSocketConnector(int processorCount) {
67 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
68 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
69 }
70
71
72
73
74
75
76
77 public NioSocketConnector(IoProcessor<NioSession> processor) {
78 super(new DefaultSocketSessionConfig(), processor);
79 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
80 }
81
82
83
84
85
86
87
88
89 public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
90 super(new DefaultSocketSessionConfig(), executor, processor);
91 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
92 }
93
94
95
96
97 @Override
98 protected void init() throws Exception {
99 this.selector = Selector.open();
100 }
101
102
103
104
105 @Override
106 protected void destroy() throws Exception {
107 if (selector != null) {
108 selector.close();
109 }
110 }
111
112
113
114
115 public TransportMetadata getTransportMetadata() {
116 return NioSocketSession.METADATA;
117 }
118
119
120
121
122 @Override
123 public SocketSessionConfig getSessionConfig() {
124 return (SocketSessionConfig) super.getSessionConfig();
125 }
126
127
128
129
130 @Override
131 public InetSocketAddress getDefaultRemoteAddress() {
132 return (InetSocketAddress) super.getDefaultRemoteAddress();
133 }
134
135
136
137
138 public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
139 super.setDefaultRemoteAddress(defaultRemoteAddress);
140 }
141
142
143
144
145 @Override
146 protected Iterator<SocketChannel> allHandles() {
147 return new SocketChannelIterator(selector.keys());
148 }
149
150
151
152
153 @Override
154 protected boolean connect(SocketChannel handle, SocketAddress remoteAddress)
155 throws Exception {
156 return handle.connect(remoteAddress);
157 }
158
159
160
161
162 @Override
163 protected ConnectionRequest connectionRequest(SocketChannel handle) {
164 SelectionKey key = handle.keyFor(selector);
165 if (key == null) {
166 return null;
167 }
168
169 return (ConnectionRequest) key.attachment();
170 }
171
172
173
174
175 @Override
176 protected void close(SocketChannel handle) throws Exception {
177 SelectionKey key = handle.keyFor(selector);
178 if (key != null) {
179 key.cancel();
180 }
181 handle.close();
182 }
183
184
185
186
187 @Override
188 protected boolean finishConnect(SocketChannel handle) throws Exception {
189 SelectionKey key = handle.keyFor(selector);
190 if (handle.finishConnect()) {
191 if (key != null) {
192 key.cancel();
193 }
194 return true;
195 }
196
197 return false;
198 }
199
200
201
202
203 @Override
204 protected SocketChannel newHandle(SocketAddress localAddress)
205 throws Exception {
206 SocketChannel ch = SocketChannel.open();
207
208 int receiveBufferSize =
209 (getSessionConfig()).getReceiveBufferSize();
210 if (receiveBufferSize > 65535) {
211 ch.socket().setReceiveBufferSize(receiveBufferSize);
212 }
213
214 if (localAddress != null) {
215 ch.socket().bind(localAddress);
216 }
217 ch.configureBlocking(false);
218 return ch;
219 }
220
221
222
223
224 @Override
225 protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
226 return new NioSocketSession(this, processor, handle);
227 }
228
229
230
231
232 @Override
233 protected void register(SocketChannel handle, ConnectionRequest request)
234 throws Exception {
235 handle.register(selector, SelectionKey.OP_CONNECT, request);
236 }
237
238
239
240
241 @Override
242 protected boolean select(int timeout) throws Exception {
243 return selector.select(timeout) > 0;
244 }
245
246
247
248
249 @Override
250 protected Iterator<SocketChannel> selectedHandles() {
251 return new SocketChannelIterator(selector.selectedKeys());
252 }
253
254
255
256
257 @Override
258 protected void wakeup() {
259 selector.wakeup();
260 }
261
262 private static class SocketChannelIterator implements Iterator<SocketChannel> {
263
264 private final Iterator<SelectionKey> i;
265
266 private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
267 this.i = selectedKeys.iterator();
268 }
269
270
271
272
273 public boolean hasNext() {
274 return i.hasNext();
275 }
276
277
278
279
280 public SocketChannel next() {
281 SelectionKey key = i.next();
282 return (SocketChannel) key.channel();
283 }
284
285
286
287
288 public void remove() {
289 i.remove();
290 }
291 }
292 }