1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.socket;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.ServerSocketChannel;
27 import java.nio.channels.SocketChannel;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.Set;
32
33 import org.apache.mina.common.BaseSessionManager;
34 import org.apache.mina.io.IoAcceptor;
35 import org.apache.mina.io.IoFilterChain;
36 import org.apache.mina.io.IoHandler;
37 import org.apache.mina.io.IoSession;
38 import org.apache.mina.io.IoSessionManagerFilterChain;
39 import org.apache.mina.util.Queue;
40
41 /***
42 * {@link IoAcceptor} for socket transport (TCP/IP).
43 *
44 * @author The Apache Directory Project (dev@directory.apache.org)
45 * @version $Rev: 327113 $, $Date: 2005-10-21 15:59:15 +0900 $
46 */
47 public class SocketAcceptor extends BaseSessionManager implements IoAcceptor
48 {
49 private static volatile int nextId = 0;
50
51 private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
52
53 private final int id = nextId ++ ;
54
55 private Selector selector;
56
57 private final Map channels = new HashMap();
58
59 private final Queue registerQueue = new Queue();
60
61 private final Queue cancelQueue = new Queue();
62
63 private int backlog = 50;
64
65 private Worker worker;
66
67
68 /***
69 * Creates a new instance.
70 */
71 public SocketAcceptor()
72 {
73 }
74
75 /***
76 * Binds to the specified <code>address</code> and handles incoming
77 * connections with the specified <code>handler</code>. Backlog value
78 * is configured to the value of <code>backlog</code> property.
79 *
80 * @throws IOException if failed to bind
81 */
82 public void bind( SocketAddress address, IoHandler handler ) throws IOException
83 {
84 if( address == null )
85 {
86 throw new NullPointerException( "address" );
87 }
88
89 if( handler == null )
90 {
91 throw new NullPointerException( "handler" );
92 }
93
94 if( !( address instanceof InetSocketAddress ) )
95 {
96 throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
97 }
98
99 if( ( ( InetSocketAddress ) address ).getPort() == 0 )
100 {
101 throw new IllegalArgumentException( "Unsupported port number: 0" );
102 }
103
104 RegistrationRequest request = new RegistrationRequest( address, backlog, handler );
105
106 synchronized( this )
107 {
108 synchronized( registerQueue )
109 {
110 registerQueue.push( request );
111 }
112 startupWorker();
113 }
114
115 selector.wakeup();
116
117 synchronized( request )
118 {
119 while( !request.done )
120 {
121 try
122 {
123 request.wait();
124 }
125 catch( InterruptedException e )
126 {
127 }
128 }
129 }
130
131 if( request.exception != null )
132 {
133 throw request.exception;
134 }
135 }
136
137
138 private synchronized void startupWorker() throws IOException
139 {
140 if( worker == null )
141 {
142 selector = Selector.open();
143 worker = new Worker();
144
145 worker.start();
146 }
147 }
148
149
150 public void unbind( SocketAddress address )
151 {
152 if( address == null )
153 {
154 throw new NullPointerException( "address" );
155 }
156
157 CancellationRequest request = new CancellationRequest( address );
158 synchronized( this )
159 {
160 try
161 {
162 startupWorker();
163 }
164 catch( IOException e )
165 {
166
167
168
169
170 throw new IllegalArgumentException( "Address not bound: " + address );
171 }
172
173 synchronized( cancelQueue )
174 {
175 cancelQueue.push( request );
176 }
177 }
178
179 selector.wakeup();
180
181 synchronized( request )
182 {
183 while( !request.done )
184 {
185 try
186 {
187 request.wait();
188 }
189 catch( InterruptedException e )
190 {
191 }
192 }
193 }
194
195 if( request.exception != null )
196 {
197 request.exception.fillInStackTrace();
198
199 throw request.exception;
200 }
201 }
202
203 /***
204 * Returns the default backlog value which is used when user binds.
205 */
206 public int getBacklog()
207 {
208 return backlog;
209 }
210
211 /***
212 * Sets the default backlog value which is used when user binds.
213 */
214 public void setBacklog( int defaultBacklog )
215 {
216 if( defaultBacklog <= 0 )
217 {
218 throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog );
219 }
220 this.backlog = defaultBacklog;
221 }
222
223
224 private class Worker extends Thread
225 {
226 public Worker()
227 {
228 super( "SocketAcceptor-" + id );
229 }
230
231 public void run()
232 {
233 for( ;; )
234 {
235 try
236 {
237 int nKeys = selector.select();
238
239 registerNew();
240 cancelKeys();
241
242 if( nKeys > 0 )
243 {
244 processSessions( selector.selectedKeys() );
245 }
246
247 if( selector.keys().isEmpty() )
248 {
249 synchronized( SocketAcceptor.this )
250 {
251 if( selector.keys().isEmpty() &&
252 registerQueue.isEmpty() &&
253 cancelQueue.isEmpty() )
254 {
255 worker = null;
256 try
257 {
258 selector.close();
259 }
260 catch( IOException e )
261 {
262 exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
263 }
264 finally
265 {
266 selector = null;
267 }
268 break;
269 }
270 }
271 }
272 }
273 catch( IOException e )
274 {
275 exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
276
277 try
278 {
279 Thread.sleep( 1000 );
280 }
281 catch( InterruptedException e1 )
282 {
283 }
284 }
285 }
286 }
287
288 private void processSessions( Set keys ) throws IOException
289 {
290 Iterator it = keys.iterator();
291 while( it.hasNext() )
292 {
293 SelectionKey key = ( SelectionKey ) it.next();
294
295 it.remove();
296
297 if( !key.isAcceptable() )
298 {
299 continue;
300 }
301
302 ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
303
304 SocketChannel ch = ssc.accept();
305
306 if( ch == null )
307 {
308 continue;
309 }
310
311 boolean success = false;
312 try
313 {
314 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
315 SocketSession session = new SocketSession( filters, ch, req.handler );
316 req.handler.sessionCreated( session );
317 SocketIoProcessor.getInstance().addSession( session );
318 success = true;
319 }
320 catch( Throwable t )
321 {
322 exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
323 }
324 finally
325 {
326 if( !success )
327 {
328 ch.close();
329 }
330 }
331 }
332 }
333 }
334
335
336 private void registerNew()
337 {
338 if( registerQueue.isEmpty() )
339 {
340 return;
341 }
342
343 for( ;; )
344 {
345 RegistrationRequest req;
346
347 synchronized( registerQueue )
348 {
349 req = ( RegistrationRequest ) registerQueue.pop();
350 }
351
352 if( req == null )
353 {
354 break;
355 }
356
357 ServerSocketChannel ssc = null;
358
359 try
360 {
361 ssc = ServerSocketChannel.open();
362 ssc.configureBlocking( false );
363 ssc.socket().bind( req.address, req.backlog );
364 ssc.register( selector, SelectionKey.OP_ACCEPT, req );
365
366 channels.put( req.address, ssc );
367 }
368 catch( IOException e )
369 {
370 req.exception = e;
371 }
372 finally
373 {
374 synchronized( req )
375 {
376 req.done = true;
377
378 req.notify();
379 }
380
381 if( ssc != null && req.exception != null )
382 {
383 try
384 {
385 ssc.close();
386 }
387 catch( IOException e )
388 {
389 exceptionMonitor.exceptionCaught( this, e );
390 }
391 }
392 }
393 }
394 }
395
396
397 private void cancelKeys()
398 {
399 if( cancelQueue.isEmpty() )
400 {
401 return;
402 }
403
404 for( ;; )
405 {
406 CancellationRequest request;
407
408 synchronized( cancelQueue )
409 {
410 request = ( CancellationRequest ) cancelQueue.pop();
411 }
412
413 if( request == null )
414 {
415 break;
416 }
417
418 ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
419
420
421 try
422 {
423 if( ssc == null )
424 {
425 request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
426 }
427 else
428 {
429 SelectionKey key = ssc.keyFor( selector );
430
431 key.cancel();
432
433 selector.wakeup();
434
435 ssc.close();
436 }
437 }
438 catch( IOException e )
439 {
440 exceptionMonitor.exceptionCaught( this, e );
441 }
442 finally
443 {
444 synchronized( request )
445 {
446 request.done = true;
447
448 request.notify();
449 }
450 }
451 }
452 }
453
454 public IoFilterChain getFilterChain()
455 {
456 return filters;
457 }
458
459 private static class RegistrationRequest
460 {
461 private final SocketAddress address;
462
463 private final int backlog;
464
465 private final IoHandler handler;
466
467 private IOException exception;
468
469 private boolean done;
470
471 private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler )
472 {
473 this.address = address;
474 this.backlog = backlog;
475 this.handler = handler;
476 }
477 }
478
479
480 private static class CancellationRequest
481 {
482 private final SocketAddress address;
483
484 private boolean done;
485
486 private RuntimeException exception;
487
488 private CancellationRequest( SocketAddress address )
489 {
490 this.address = address;
491 }
492 }
493
494
495 public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
496 {
497 throw new UnsupportedOperationException();
498 }
499 }