1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.socket;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.ServerSocketChannel;
27 import java.nio.channels.SocketChannel;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.Map;
31 import java.util.Set;
32
33 import org.apache.mina.common.BaseSessionManager;
34 import org.apache.mina.io.IoAcceptor;
35 import org.apache.mina.io.IoFilterChain;
36 import org.apache.mina.io.IoHandler;
37 import org.apache.mina.io.IoSessionManagerFilterChain;
38 import org.apache.mina.util.Queue;
39
40 /***
41 * {@link IoAcceptor} for socket transport (TCP/IP).
42 *
43 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
44 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
45 */
46 public class SocketAcceptor extends BaseSessionManager implements IoAcceptor
47 {
48 private static volatile int nextId = 0;
49
50 private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
51
52 private final int id = nextId ++ ;
53
54 private Selector selector;
55
56 private final Map channels = new HashMap();
57
58 private final Queue registerQueue = new Queue();
59
60 private final Queue cancelQueue = new Queue();
61
62 private int backlog = 50;
63
64 private Worker worker;
65
66
67 /***
68 * Creates a new instance.
69 */
70 public SocketAcceptor()
71 {
72 }
73
74 /***
75 * Binds to the specified <code>address</code> and handles incoming
76 * connections with the specified <code>handler</code>. Backlog value
77 * is configured to the value of <code>backlog</code> property.
78 *
79 * @throws IOException if failed to bind
80 */
81 public void bind( SocketAddress address, IoHandler handler ) throws IOException
82 {
83 if( address == null )
84 {
85 throw new NullPointerException( "address" );
86 }
87
88 if( handler == null )
89 {
90 throw new NullPointerException( "handler" );
91 }
92
93 if( !( address instanceof InetSocketAddress ) )
94 {
95 throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
96 }
97
98 if( ( ( InetSocketAddress ) address ).getPort() == 0 )
99 {
100 throw new IllegalArgumentException( "Unsupported port number: 0" );
101 }
102
103 RegistrationRequest request = new RegistrationRequest( address, backlog, handler );
104
105 synchronized( this )
106 {
107 synchronized( registerQueue )
108 {
109 registerQueue.push( request );
110 }
111 startupWorker();
112 }
113
114 selector.wakeup();
115
116 synchronized( request )
117 {
118 while( !request.done )
119 {
120 try
121 {
122 request.wait();
123 }
124 catch( InterruptedException e )
125 {
126 }
127 }
128 }
129
130 if( request.exception != null )
131 {
132 throw request.exception;
133 }
134 }
135
136
137 private synchronized void startupWorker() throws IOException
138 {
139 if( worker == null )
140 {
141 selector = Selector.open();
142 worker = new Worker();
143
144 worker.start();
145 }
146 }
147
148
149 public void unbind( SocketAddress address )
150 {
151 if( address == null )
152 {
153 throw new NullPointerException( "address" );
154 }
155
156 CancellationRequest request = new CancellationRequest( address );
157 synchronized( this )
158 {
159 try
160 {
161 startupWorker();
162 }
163 catch( IOException e )
164 {
165
166
167
168
169 throw new IllegalArgumentException( "Address not bound: " + address );
170 }
171
172 synchronized( cancelQueue )
173 {
174 cancelQueue.push( request );
175 }
176 }
177
178 selector.wakeup();
179
180 synchronized( request )
181 {
182 while( !request.done )
183 {
184 try
185 {
186 request.wait();
187 }
188 catch( InterruptedException e )
189 {
190 }
191 }
192 }
193
194 if( request.exception != null )
195 {
196 request.exception.fillInStackTrace();
197
198 throw request.exception;
199 }
200 }
201
202 /***
203 * Returns the default backlog value which is used when user binds.
204 */
205 public int getBacklog()
206 {
207 return backlog;
208 }
209
210 /***
211 * Sets the default backlog value which is used when user binds.
212 */
213 public void setBacklog( int defaultBacklog )
214 {
215 if( defaultBacklog <= 0 )
216 {
217 throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog );
218 }
219 this.backlog = defaultBacklog;
220 }
221
222
223 private class Worker extends Thread
224 {
225 public Worker()
226 {
227 super( "SocketAcceptor-" + id );
228 }
229
230 public void run()
231 {
232 for( ;; )
233 {
234 try
235 {
236 int nKeys = selector.select();
237
238 registerNew();
239 cancelKeys();
240
241 if( nKeys > 0 )
242 {
243 processSessions( selector.selectedKeys() );
244 }
245
246 if( selector.keys().isEmpty() )
247 {
248 synchronized( SocketAcceptor.this )
249 {
250 if( selector.keys().isEmpty() &&
251 registerQueue.isEmpty() &&
252 cancelQueue.isEmpty() )
253 {
254 worker = null;
255 try
256 {
257 selector.close();
258 }
259 catch( IOException e )
260 {
261 exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
262 }
263 finally
264 {
265 selector = null;
266 }
267 break;
268 }
269 }
270 }
271 }
272 catch( IOException e )
273 {
274 exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
275
276 try
277 {
278 Thread.sleep( 1000 );
279 }
280 catch( InterruptedException e1 )
281 {
282 }
283 }
284 }
285 }
286
287 private void processSessions( Set keys ) throws IOException
288 {
289 Iterator it = keys.iterator();
290 while( it.hasNext() )
291 {
292 SelectionKey key = ( SelectionKey ) it.next();
293
294 it.remove();
295
296 if( !key.isAcceptable() )
297 {
298 continue;
299 }
300
301 ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
302
303 SocketChannel ch = ssc.accept();
304
305 if( ch == null )
306 {
307 continue;
308 }
309
310 boolean success = false;
311 try
312 {
313 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
314 SocketSession session = new SocketSession( filters, ch, req.handler );
315 req.handler.sessionCreated( session );
316 SocketIoProcessor.getInstance().addSession( session );
317 success = true;
318 }
319 catch( Throwable t )
320 {
321 exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
322 }
323 finally
324 {
325 if( !success )
326 {
327 ch.close();
328 }
329 }
330 }
331 }
332 }
333
334
335 private void registerNew()
336 {
337 if( registerQueue.isEmpty() )
338 {
339 return;
340 }
341
342 for( ;; )
343 {
344 RegistrationRequest req;
345
346 synchronized( registerQueue )
347 {
348 req = ( RegistrationRequest ) registerQueue.pop();
349 }
350
351 if( req == null )
352 {
353 break;
354 }
355
356 ServerSocketChannel ssc = null;
357
358 try
359 {
360 ssc = ServerSocketChannel.open();
361 ssc.configureBlocking( false );
362 ssc.socket().bind( req.address, req.backlog );
363 ssc.register( selector, SelectionKey.OP_ACCEPT, req );
364
365 channels.put( req.address, ssc );
366 }
367 catch( IOException e )
368 {
369 req.exception = e;
370 }
371 finally
372 {
373 synchronized( req )
374 {
375 req.done = true;
376
377 req.notify();
378 }
379
380 if( ssc != null && req.exception != null )
381 {
382 try
383 {
384 ssc.close();
385 }
386 catch( IOException e )
387 {
388 exceptionMonitor.exceptionCaught( this, e );
389 }
390 }
391 }
392 }
393 }
394
395
396 private void cancelKeys()
397 {
398 if( cancelQueue.isEmpty() )
399 {
400 return;
401 }
402
403 for( ;; )
404 {
405 CancellationRequest request;
406
407 synchronized( cancelQueue )
408 {
409 request = ( CancellationRequest ) cancelQueue.pop();
410 }
411
412 if( request == null )
413 {
414 break;
415 }
416
417 ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
418
419
420 try
421 {
422 if( ssc == null )
423 {
424 request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
425 }
426 else
427 {
428 SelectionKey key = ssc.keyFor( selector );
429
430 key.cancel();
431
432 selector.wakeup();
433
434 ssc.close();
435 }
436 }
437 catch( IOException e )
438 {
439 exceptionMonitor.exceptionCaught( this, e );
440 }
441 finally
442 {
443 synchronized( request )
444 {
445 request.done = true;
446
447 request.notify();
448 }
449 }
450 }
451 }
452
453 public IoFilterChain getFilterChain()
454 {
455 return filters;
456 }
457
458 private static class RegistrationRequest
459 {
460 private final SocketAddress address;
461
462 private final int backlog;
463
464 private final IoHandler handler;
465
466 private IOException exception;
467
468 private boolean done;
469
470 private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler )
471 {
472 this.address = address;
473 this.backlog = backlog;
474 this.handler = handler;
475 }
476 }
477
478
479 private static class CancellationRequest
480 {
481 private final SocketAddress address;
482
483 private boolean done;
484
485 private RuntimeException exception;
486
487 private CancellationRequest( SocketAddress address )
488 {
489 this.address = address;
490 }
491 }
492 }