1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.datagram;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.mina.common.ByteBuffer;
33 import org.apache.mina.io.IoAcceptor;
34 import org.apache.mina.io.IoFilterChain;
35 import org.apache.mina.io.IoHandler;
36 import org.apache.mina.io.IoSessionManagerFilterChain;
37 import org.apache.mina.util.ExceptionUtil;
38 import org.apache.mina.util.Queue;
39
40 /***
41 * {@link IoAcceptor} for datagram transport (UDP/IP).
42 *
43 * @author Trustin Lee (trustin@apache.org)
44 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
45 */
46 public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
47 {
48 private static volatile int nextId = 0;
49
50 private final IoSessionManagerFilterChain filters =
51 new DatagramSessionManagerFilterChain( this );
52
53 private final int id = nextId ++ ;
54
55 private Selector selector;
56
57 private final Map channels = new HashMap();
58
59 private final Queue registerQueue = new Queue();
60
61 private final Queue cancelQueue = new Queue();
62
63 private final Queue flushingSessions = new Queue();
64
65 private Worker worker;
66
67 /***
68 * Creates a new instance.
69 */
70 public DatagramAcceptor()
71 {
72 }
73
74 public void bind( SocketAddress address, IoHandler handler )
75 throws IOException
76 {
77 if( address == null )
78 throw new NullPointerException( "address" );
79 if( handler == null )
80 throw new NullPointerException( "handler" );
81
82 if( !( address instanceof InetSocketAddress ) )
83 throw new IllegalArgumentException( "Unexpected address type: "
84 + address.getClass() );
85 if( ( ( InetSocketAddress ) address ).getPort() == 0 )
86 throw new IllegalArgumentException( "Unsupported port number: 0" );
87
88 RegistrationRequest request = new RegistrationRequest( address, handler );
89 synchronized( this )
90 {
91 synchronized( registerQueue )
92 {
93 registerQueue.push( request );
94 }
95 startupWorker();
96 }
97 selector.wakeup();
98
99 synchronized( request )
100 {
101 while( !request.done )
102 {
103 try
104 {
105 request.wait();
106 }
107 catch( InterruptedException e )
108 {
109 }
110 }
111 }
112
113 if( request.exception != null )
114 {
115 ExceptionUtil.throwException( request.exception );
116 }
117 }
118
119 public void unbind( SocketAddress address )
120 {
121 if( address == null )
122 throw new NullPointerException( "address" );
123
124 CancellationRequest request = new CancellationRequest( address );
125 synchronized( this )
126 {
127 try
128 {
129 startupWorker();
130 }
131 catch( IOException e )
132 {
133
134
135
136
137 throw new IllegalArgumentException( "Address not bound: " + address );
138 }
139
140 synchronized( cancelQueue )
141 {
142 cancelQueue.push( request );
143 }
144 }
145 selector.wakeup();
146
147 synchronized( request )
148 {
149 while( !request.done )
150 {
151 try
152 {
153 request.wait();
154 }
155 catch( InterruptedException e )
156 {
157 }
158 }
159 }
160
161 if( request.exception != null )
162 {
163 request.exception.fillInStackTrace();
164 throw request.exception;
165 }
166 }
167
168 private synchronized void startupWorker() throws IOException
169 {
170 if( worker == null )
171 {
172 selector = Selector.open();
173 worker = new Worker();
174 worker.start();
175 }
176 }
177
178 void flushSession( DatagramSession session )
179 {
180 scheduleFlush( session );
181 selector.wakeup();
182 }
183
184 void closeSession( DatagramSession session )
185 {
186 }
187
188 private void scheduleFlush( DatagramSession session )
189 {
190 synchronized( flushingSessions )
191 {
192 flushingSessions.push( session );
193 }
194 }
195
196 private class Worker extends Thread
197 {
198 public Worker()
199 {
200 super( "DatagramAcceptor-" + id );
201 }
202
203 public void run()
204 {
205 for( ;; )
206 {
207 try
208 {
209 int nKeys = selector.select();
210
211 registerNew();
212
213 if( nKeys > 0 )
214 {
215 processReadySessions( selector.selectedKeys() );
216 }
217
218 flushSessions();
219 cancelKeys();
220
221 if( selector.keys().isEmpty() )
222 {
223 synchronized( DatagramAcceptor.this )
224 {
225 if( selector.keys().isEmpty() &&
226 registerQueue.isEmpty() &&
227 cancelQueue.isEmpty() )
228 {
229 worker = null;
230 try
231 {
232 selector.close();
233 }
234 catch( IOException e )
235 {
236 exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e );
237 }
238 finally
239 {
240 selector = null;
241 }
242 break;
243 }
244 }
245 }
246 }
247 catch( IOException e )
248 {
249 exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
250 e );
251
252 try
253 {
254 Thread.sleep( 1000 );
255 }
256 catch( InterruptedException e1 )
257 {
258 }
259 }
260 }
261 }
262 }
263
264 private void processReadySessions( Set keys )
265 {
266 Iterator it = keys.iterator();
267 while( it.hasNext() )
268 {
269 SelectionKey key = ( SelectionKey ) it.next();
270 it.remove();
271
272 DatagramChannel ch = ( DatagramChannel ) key.channel();
273
274 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
275 DatagramSession session = new DatagramSession(
276 filters, ch, req.handler );
277 session.setSelectionKey( key );
278
279 try
280 {
281 req.handler.sessionCreated( session );
282
283 if( key.isReadable() )
284 {
285 readSession( session );
286 }
287
288 if( key.isWritable() )
289 {
290 scheduleFlush( session );
291 }
292 }
293 catch( Throwable t )
294 {
295 exceptionMonitor.exceptionCaught( this, t );
296 }
297 }
298 }
299
300 private void readSession( DatagramSession session )
301 {
302
303 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
304 try
305 {
306 SocketAddress remoteAddress = session.getChannel().receive(
307 readBuf.buf() );
308 if( remoteAddress != null )
309 {
310 readBuf.flip();
311 session.setRemoteAddress( remoteAddress );
312
313 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
314 newBuf.put( readBuf );
315 newBuf.flip();
316
317 session.increaseReadBytes( newBuf.remaining() );
318 filters.dataRead( session, newBuf );
319 }
320 }
321 catch( IOException e )
322 {
323 filters.exceptionCaught( session, e );
324 }
325 finally
326 {
327 readBuf.release();
328 }
329 }
330
331 private void flushSessions()
332 {
333 if( flushingSessions.size() == 0 )
334 return;
335
336 for( ;; )
337 {
338 DatagramSession session;
339
340 synchronized( flushingSessions )
341 {
342 session = ( DatagramSession ) flushingSessions.pop();
343 }
344
345 if( session == null )
346 break;
347
348 try
349 {
350 flush( session );
351 }
352 catch( IOException e )
353 {
354 session.getManagerFilterChain().exceptionCaught( session, e );
355 }
356 }
357 }
358
359 private void flush( DatagramSession session ) throws IOException
360 {
361 DatagramChannel ch = session.getChannel();
362
363 Queue writeBufferQueue = session.getWriteBufferQueue();
364 Queue writeMarkerQueue = session.getWriteMarkerQueue();
365
366 ByteBuffer buf;
367 Object marker;
368 for( ;; )
369 {
370 synchronized( writeBufferQueue )
371 {
372 buf = ( ByteBuffer ) writeBufferQueue.first();
373 marker = writeMarkerQueue.first();
374 }
375
376 if( buf == null )
377 break;
378
379 if( buf.remaining() == 0 )
380 {
381
382 synchronized( writeBufferQueue )
383 {
384 writeBufferQueue.pop();
385 writeMarkerQueue.pop();
386 }
387
388 try
389 {
390 buf.release();
391 }
392 catch( IllegalStateException e )
393 {
394 session.getManagerFilterChain().exceptionCaught( session, e );
395 }
396
397 session.increaseWrittenWriteRequests();
398 session.getManagerFilterChain().dataWritten( session, marker );
399 continue;
400 }
401
402 int writtenBytes = ch
403 .send( buf.buf(), session.getRemoteAddress() );
404
405 SelectionKey key = session.getSelectionKey();
406 if( writtenBytes == 0 )
407 {
408
409 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
410 }
411 else if( writtenBytes > 0 )
412 {
413 key.interestOps( key.interestOps()
414 & ( ~SelectionKey.OP_WRITE ) );
415
416
417 synchronized( writeBufferQueue )
418 {
419 writeBufferQueue.pop();
420 writeMarkerQueue.pop();
421 }
422
423 session.increaseWrittenBytes( writtenBytes );
424 session.increaseWrittenWriteRequests();
425 session.getManagerFilterChain().dataWritten( session, marker );
426 }
427 }
428 }
429
430 private void registerNew()
431 {
432 if( registerQueue.isEmpty() )
433 return;
434
435 for( ;; )
436 {
437 RegistrationRequest req;
438 synchronized( registerQueue )
439 {
440 req = ( RegistrationRequest ) registerQueue.pop();
441 }
442
443 if( req == null )
444 break;
445
446 DatagramChannel ch = null;
447 try
448 {
449 ch = DatagramChannel.open();
450 ch.configureBlocking( false );
451 ch.socket().bind( req.address );
452 ch.register( selector, SelectionKey.OP_READ, req );
453 channels.put( req.address, ch );
454 }
455 catch( Throwable t )
456 {
457 req.exception = t;
458 }
459 finally
460 {
461 synchronized( req )
462 {
463 req.done = true;
464 req.notify();
465 }
466
467 if( ch != null && req.exception != null )
468 {
469 try
470 {
471 ch.close();
472 }
473 catch( Throwable e )
474 {
475 exceptionMonitor.exceptionCaught( this, e );
476 }
477 }
478 }
479 }
480 }
481
482 private void cancelKeys()
483 {
484 if( cancelQueue.isEmpty() )
485 return;
486
487 for( ;; )
488 {
489 CancellationRequest request;
490 synchronized( cancelQueue )
491 {
492 request = ( CancellationRequest ) cancelQueue.pop();
493 }
494
495 if( request == null )
496 {
497 break;
498 }
499
500 DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
501
502 try
503 {
504 if( ch == null )
505 {
506 request.exception = new IllegalArgumentException(
507 "Address not bound: " + request.address );
508 }
509 else
510 {
511 SelectionKey key = ch.keyFor( selector );
512 key.cancel();
513 selector.wakeup();
514 ch.close();
515 }
516 }
517 catch( Throwable t )
518 {
519 exceptionMonitor.exceptionCaught( this, t );
520 }
521 finally
522 {
523 synchronized( request )
524 {
525 request.done = true;
526 request.notify();
527 }
528 }
529 }
530 }
531
532 public IoFilterChain getFilterChain()
533 {
534 return filters;
535 }
536
537 private static class RegistrationRequest
538 {
539 private final SocketAddress address;
540
541 private final IoHandler handler;
542
543 private Throwable exception;
544
545 private boolean done;
546
547 private RegistrationRequest( SocketAddress address, IoHandler handler )
548 {
549 this.address = address;
550 this.handler = handler;
551 }
552 }
553
554 private static class CancellationRequest
555 {
556 private final SocketAddress address;
557 private boolean done;
558 private RuntimeException exception;
559
560 private CancellationRequest( SocketAddress address )
561 {
562 this.address = address;
563 }
564 }
565 }