1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.ServerSocket;
25 import java.net.SocketAddress;
26 import java.net.StandardSocketOptions;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.nio.channels.ServerSocketChannel;
30 import java.nio.channels.SocketChannel;
31 import java.nio.channels.spi.SelectorProvider;
32 import java.util.Collection;
33 import java.util.Iterator;
34 import java.util.concurrent.Executor;
35
36 import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
37 import org.apache.mina.core.service.IoAcceptor;
38 import org.apache.mina.core.service.IoProcessor;
39 import org.apache.mina.core.service.IoService;
40 import org.apache.mina.core.service.SimpleIoProcessorPool;
41 import org.apache.mina.core.service.TransportMetadata;
42 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
43 import org.apache.mina.transport.socket.SocketAcceptor;
44 import org.apache.mina.transport.socket.SocketSessionConfig;
45
46
47
48
49
50
51
52 public class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
53 implements SocketAcceptor {
54
55 protected volatile Selector selector;
56 protected volatile SelectorProvider selectorProvider = null;
57
58
59
60
61 public NioSocketAcceptor() {
62 super(new DefaultSocketSessionConfig(), NioProcessor.class);
63 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
64 }
65
66
67
68
69
70
71
72
73 public NioSocketAcceptor(int processorCount) {
74 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
75 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
76 }
77
78
79
80
81
82
83
84 public NioSocketAcceptor(IoProcessor<NioSession> processor) {
85 super(new DefaultSocketSessionConfig(), processor);
86 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
87 }
88
89
90
91
92
93
94
95
96 public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
97 super(new DefaultSocketSessionConfig(), executor, processor);
98 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
99 }
100
101
102
103
104
105
106
107
108
109
110 public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
111 super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
112 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
113 this.selectorProvider = selectorProvider;
114 }
115
116
117
118
119 @Override
120 protected void init() throws Exception {
121 selector = Selector.open();
122 }
123
124
125
126
127 @Override
128 protected void init(SelectorProvider selectorProvider) throws Exception {
129 this.selectorProvider = selectorProvider;
130
131 if (selectorProvider == null) {
132 selector = Selector.open();
133 } else {
134 selector = selectorProvider.openSelector();
135 }
136 }
137
138
139
140
141 @Override
142 protected void destroy() throws Exception {
143 if (selector != null) {
144 selector.close();
145 }
146 }
147
148
149
150
151 public TransportMetadata getTransportMetadata() {
152 return NioSocketSession.METADATA;
153 }
154
155
156
157
158 @Override
159 public InetSocketAddress getLocalAddress() {
160 return (InetSocketAddress) super.getLocalAddress();
161 }
162
163
164
165
166 @Override
167 public InetSocketAddress getDefaultLocalAddress() {
168 return (InetSocketAddress) super.getDefaultLocalAddress();
169 }
170
171
172
173
174 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
175 setDefaultLocalAddress((SocketAddress) localAddress);
176 }
177
178
179
180
181 @Override
182 protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
183
184 SelectionKey key = null;
185
186 if (handle != null) {
187 key = handle.keyFor(selector);
188 }
189
190 if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
191 return null;
192 }
193
194
195 try {
196 SocketChannel ch = handle.accept();
197
198 if (ch == null) {
199 return null;
200 }
201
202 return new NioSocketSession(this, processor, ch);
203 } catch (Throwable t) {
204 if(t.getMessage().equals("Too many open files")) {
205 LOGGER.error("Error Calling Accept on Socket - Sleeping Acceptor Thread. Check the ulimit parameter", t);
206 try {
207
208
209
210
211 Thread.sleep(50L);
212 } catch (InterruptedException ie) {
213
214 }
215 } else {
216 throw t;
217 }
218
219
220 return null;
221 }
222 }
223
224
225
226
227 @Override
228 protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
229
230
231 SocketSessionConfig config = this.getSessionConfig();
232
233 ServerSocketChannel channel = null;
234
235 if (selectorProvider != null) {
236 channel = selectorProvider.openServerSocketChannel();
237 } else {
238 channel = ServerSocketChannel.open();
239 }
240
241 boolean success = false;
242
243 try {
244
245 channel.configureBlocking(false);
246
247
248 ServerSocket socket = channel.socket();
249
250
251 socket.setReuseAddress(isReuseAddress());
252
253
254 if (config.getSendBufferSize() != -1) {
255 channel.setOption(StandardSocketOptions.SO_SNDBUF, config.getSendBufferSize());
256 }
257
258
259 if (config.getReceiveBufferSize() != -1) {
260 channel.setOption(StandardSocketOptions.SO_RCVBUF, config.getReceiveBufferSize());
261 }
262
263
264 try {
265 socket.bind(localAddress, getBacklog());
266 } catch (IOException ioe) {
267
268
269 String newMessage = "Error while binding on " + localAddress;
270 Exception e = new IOException(newMessage, ioe);
271
272
273 channel.close();
274
275 throw e;
276 }
277
278
279 channel.register(selector, SelectionKey.OP_ACCEPT);
280 success = true;
281 } finally {
282 if (!success) {
283 close(channel);
284 }
285 }
286 return channel;
287 }
288
289
290
291
292 @Override
293 protected SocketAddress localAddress(ServerSocketChannel handle) throws Exception {
294 return handle.socket().getLocalSocketAddress();
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308
309 @Override
310 protected int select() throws Exception {
311 return selector.select();
312 }
313
314
315
316
317 @Override
318 protected Iterator<ServerSocketChannel> selectedHandles() {
319 return new ServerSocketChannelIterator(selector.selectedKeys());
320 }
321
322
323
324
325 @Override
326 protected void close(ServerSocketChannel handle) throws Exception {
327 SelectionKey key = handle.keyFor(selector);
328
329 if (key != null) {
330 key.cancel();
331 }
332
333 handle.close();
334 }
335
336
337
338
339 @Override
340 protected void wakeup() {
341 selector.wakeup();
342 }
343
344
345
346
347
348 private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
349
350 private final Iterator<SelectionKey> iterator;
351
352
353
354
355
356
357
358 private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
359 iterator = selectedKeys.iterator();
360 }
361
362
363
364
365
366
367 public boolean hasNext() {
368 return iterator.hasNext();
369 }
370
371
372
373
374
375
376
377 public ServerSocketChannel next() {
378 SelectionKey key = iterator.next();
379
380 if (key.isValid() && key.isAcceptable()) {
381 return (ServerSocketChannel) key.channel();
382 }
383
384 return null;
385 }
386
387
388
389
390 public void remove() {
391 iterator.remove();
392 }
393 }
394 }