1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.transport.socket.nio.support;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.HashMap;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31
32 import org.apache.mina.common.ByteBuffer;
33 import org.apache.mina.common.ExceptionMonitor;
34 import org.apache.mina.common.IoAcceptor;
35 import org.apache.mina.common.IoFilterChainBuilder;
36 import org.apache.mina.common.IoHandler;
37 import org.apache.mina.common.IoSession;
38 import org.apache.mina.common.IoFilter.WriteRequest;
39 import org.apache.mina.common.support.BaseIoAcceptor;
40 import org.apache.mina.util.ExceptionUtil;
41 import org.apache.mina.util.Queue;
42
43 /***
44 * {@link IoAcceptor} for datagram transport (UDP/IP).
45 *
46 * @author The Apache Directory Project (dev@directory.apache.org)
47 * @version $Rev: 355016 $, $Date: 2005-12-08 16:00:30 +0900 (Thu, 08 Dec 2005) $
48 */
49 public class DatagramAcceptorDelegate extends BaseIoAcceptor implements IoAcceptor, DatagramSessionManager
50 {
51 private static volatile int nextId = 0;
52
53 private final IoAcceptor wrapper;
54 private final int id = nextId ++ ;
55 private Selector selector;
56 private final Map channels = new HashMap();
57 private final Queue registerQueue = new Queue();
58 private final Queue cancelQueue = new Queue();
59 private final Queue flushingSessions = new Queue();
60 private Worker worker;
61
62 /***
63 * Creates a new instance.
64 */
65 public DatagramAcceptorDelegate( IoAcceptor wrapper )
66 {
67 this.wrapper = wrapper;
68 }
69
70 public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
71 throws IOException
72 {
73 if( address == null )
74 throw new NullPointerException( "address" );
75 if( handler == null )
76 throw new NullPointerException( "handler" );
77
78 if( !( address instanceof InetSocketAddress ) )
79 throw new IllegalArgumentException( "Unexpected address type: "
80 + address.getClass() );
81 if( ( ( InetSocketAddress ) address ).getPort() == 0 )
82 throw new IllegalArgumentException( "Unsupported port number: 0" );
83
84 if( filterChainBuilder == null )
85 {
86 filterChainBuilder = IoFilterChainBuilder.NOOP;
87 }
88
89 RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder );
90 synchronized( this )
91 {
92 synchronized( registerQueue )
93 {
94 registerQueue.push( request );
95 }
96 startupWorker();
97 }
98 selector.wakeup();
99
100 synchronized( request )
101 {
102 while( !request.done )
103 {
104 try
105 {
106 request.wait();
107 }
108 catch( InterruptedException e )
109 {
110 }
111 }
112 }
113
114 if( request.exception != null )
115 {
116 ExceptionUtil.throwException( request.exception );
117 }
118 }
119
120 public void unbind( SocketAddress address )
121 {
122
123 if( address == null )
124 throw new NullPointerException( "address" );
125
126 CancellationRequest request = new CancellationRequest( address );
127 synchronized( this )
128 {
129 try
130 {
131 startupWorker();
132 }
133 catch( IOException e )
134 {
135
136
137
138
139 throw new IllegalArgumentException( "Address not bound: " + address );
140 }
141
142 synchronized( cancelQueue )
143 {
144 cancelQueue.push( request );
145 }
146 }
147 selector.wakeup();
148
149 synchronized( request )
150 {
151 while( !request.done )
152 {
153 try
154 {
155 request.wait();
156 }
157 catch( InterruptedException e )
158 {
159 }
160 }
161 }
162
163 if( request.exception != null )
164 {
165 request.exception.fillInStackTrace();
166 throw request.exception;
167 }
168 }
169
170 public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
171 {
172 if( remoteAddress == null )
173 {
174 throw new NullPointerException( "remoteAddress" );
175 }
176 if( localAddress == null )
177 {
178 throw new NullPointerException( "localAddress" );
179 }
180
181 Selector selector = this.selector;
182 DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
183 if( selector == null || ch == null )
184 {
185 throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
186 }
187
188 SelectionKey key = ch.keyFor( selector );
189 if( key == null )
190 {
191 throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
192 }
193
194 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
195 DatagramSessionImpl s = new DatagramSessionImpl( wrapper, this, ch, req.handler );
196 s.setRemoteAddress( remoteAddress );
197 s.setSelectionKey( key );
198
199 try
200 {
201 this.filterChainBuilder.buildFilterChain( s.getFilterChain() );
202 req.filterChainBuilder.buildFilterChain( s.getFilterChain() );
203 ( ( DatagramFilterChain ) s.getFilterChain() ).sessionCreated( s );
204 }
205 catch( Throwable t )
206 {
207 ExceptionMonitor.getInstance().exceptionCaught( t );
208 }
209
210 return s;
211 }
212
213 private synchronized void startupWorker() throws IOException
214 {
215 if( worker == null )
216 {
217 selector = Selector.open();
218 worker = new Worker();
219 worker.start();
220 }
221 }
222
223 public void flushSession( DatagramSessionImpl session )
224 {
225 scheduleFlush( session );
226 Selector selector = this.selector;
227 if( selector != null )
228 {
229 selector.wakeup();
230 }
231 }
232
233 public void closeSession( DatagramSessionImpl session )
234 {
235 }
236
237 private void scheduleFlush( DatagramSessionImpl session )
238 {
239 synchronized( flushingSessions )
240 {
241 flushingSessions.push( session );
242 }
243 }
244
245 private class Worker extends Thread
246 {
247 public Worker()
248 {
249 super( "DatagramAcceptor-" + id );
250 }
251
252 public void run()
253 {
254 for( ;; )
255 {
256 try
257 {
258 int nKeys = selector.select();
259
260 registerNew();
261
262 if( nKeys > 0 )
263 {
264 processReadySessions( selector.selectedKeys() );
265 }
266
267 flushSessions();
268 cancelKeys();
269
270 if( selector.keys().isEmpty() )
271 {
272 synchronized( DatagramAcceptorDelegate.this )
273 {
274 if( selector.keys().isEmpty() &&
275 registerQueue.isEmpty() &&
276 cancelQueue.isEmpty() )
277 {
278 worker = null;
279 try
280 {
281 selector.close();
282 }
283 catch( IOException e )
284 {
285 ExceptionMonitor.getInstance().exceptionCaught( e );
286 }
287 finally
288 {
289 selector = null;
290 }
291 break;
292 }
293 }
294 }
295 }
296 catch( IOException e )
297 {
298 ExceptionMonitor.getInstance().exceptionCaught( e );
299
300 try
301 {
302 Thread.sleep( 1000 );
303 }
304 catch( InterruptedException e1 )
305 {
306 }
307 }
308 }
309 }
310 }
311
312 private void processReadySessions( Set keys )
313 {
314 Iterator it = keys.iterator();
315 while( it.hasNext() )
316 {
317 SelectionKey key = ( SelectionKey ) it.next();
318 it.remove();
319
320 DatagramChannel ch = ( DatagramChannel ) key.channel();
321
322 RegistrationRequest req = ( RegistrationRequest ) key.attachment();
323 DatagramSessionImpl session =
324 new DatagramSessionImpl( wrapper, this, ch, req.handler );
325 session.setSelectionKey( key );
326
327 try
328 {
329 ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session );
330
331 if( key.isReadable() )
332 {
333 readSession( session );
334 }
335
336 if( key.isWritable() )
337 {
338 scheduleFlush( session );
339 }
340 }
341 catch( Throwable t )
342 {
343 ExceptionMonitor.getInstance().exceptionCaught( t );
344 }
345 }
346 }
347
348 private void readSession( DatagramSessionImpl session )
349 {
350
351 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
352 try
353 {
354 SocketAddress remoteAddress = session.getChannel().receive(
355 readBuf.buf() );
356 if( remoteAddress != null )
357 {
358 readBuf.flip();
359 session.setRemoteAddress( remoteAddress );
360
361 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
362 newBuf.put( readBuf );
363 newBuf.flip();
364
365 session.increaseReadBytes( newBuf.remaining() );
366 ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
367 }
368 }
369 catch( IOException e )
370 {
371 ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
372 }
373 finally
374 {
375 readBuf.release();
376 }
377 }
378
379 private void flushSessions()
380 {
381 if( flushingSessions.size() == 0 )
382 return;
383
384 for( ;; )
385 {
386 DatagramSessionImpl session;
387
388 synchronized( flushingSessions )
389 {
390 session = ( DatagramSessionImpl ) flushingSessions.pop();
391 }
392
393 if( session == null )
394 break;
395
396 try
397 {
398 flush( session );
399 }
400 catch( IOException e )
401 {
402 ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
403 }
404 }
405 }
406
407 private void flush( DatagramSessionImpl session ) throws IOException
408 {
409 DatagramChannel ch = session.getChannel();
410
411 Queue writeRequestQueue = session.getWriteRequestQueue();
412
413 WriteRequest req;
414 for( ;; )
415 {
416 synchronized( writeRequestQueue )
417 {
418 req = ( WriteRequest ) writeRequestQueue.first();
419 }
420
421 if( req == null )
422 break;
423
424 ByteBuffer buf = ( ByteBuffer ) req.getMessage();
425 if( buf.remaining() == 0 )
426 {
427
428 synchronized( writeRequestQueue )
429 {
430 writeRequestQueue.pop();
431 }
432
433 req.getFuture().setWritten( true );
434 session.increaseWrittenWriteRequests();
435 ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf );
436 continue;
437 }
438
439 SelectionKey key = session.getSelectionKey();
440 if( key == null )
441 {
442 scheduleFlush( session );
443 break;
444 }
445 if( !key.isValid() )
446 {
447 continue;
448 }
449
450 int pos = buf.position();
451 int writtenBytes = ch
452 .send( buf.buf(), session.getRemoteAddress() );
453
454 if( writtenBytes == 0 )
455 {
456
457 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
458 }
459 else if( writtenBytes > 0 )
460 {
461 key.interestOps( key.interestOps()
462 & ( ~SelectionKey.OP_WRITE ) );
463
464
465 synchronized( writeRequestQueue )
466 {
467 writeRequestQueue.pop();
468 }
469
470 session.increaseWrittenBytes( writtenBytes );
471 req.getFuture().setWritten( true );
472 session.increaseWrittenWriteRequests();
473 ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) );
474 }
475 }
476 }
477
478 private void registerNew()
479 {
480 if( registerQueue.isEmpty() )
481 return;
482
483 for( ;; )
484 {
485 RegistrationRequest req;
486 synchronized( registerQueue )
487 {
488 req = ( RegistrationRequest ) registerQueue.pop();
489 }
490
491 if( req == null )
492 break;
493
494 DatagramChannel ch = null;
495 try
496 {
497 ch = DatagramChannel.open();
498 ch.configureBlocking( false );
499 ch.socket().bind( req.address );
500 ch.register( selector, SelectionKey.OP_READ, req );
501 channels.put( req.address, ch );
502 }
503 catch( Throwable t )
504 {
505 req.exception = t;
506 }
507 finally
508 {
509 synchronized( req )
510 {
511 req.done = true;
512 req.notify();
513 }
514
515 if( ch != null && req.exception != null )
516 {
517 try
518 {
519 ch.close();
520 }
521 catch( Throwable e )
522 {
523 ExceptionMonitor.getInstance().exceptionCaught( e );
524 }
525 }
526 }
527 }
528 }
529
530 private void cancelKeys()
531 {
532 if( cancelQueue.isEmpty() )
533 return;
534
535 for( ;; )
536 {
537 CancellationRequest request;
538 synchronized( cancelQueue )
539 {
540 request = ( CancellationRequest ) cancelQueue.pop();
541 }
542
543 if( request == null )
544 {
545 break;
546 }
547
548 DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
549
550 try
551 {
552 if( ch == null )
553 {
554 request.exception = new IllegalArgumentException(
555 "Address not bound: " + request.address );
556 }
557 else
558 {
559 SelectionKey key = ch.keyFor( selector );
560 key.cancel();
561 selector.wakeup();
562 ch.close();
563 }
564 }
565 catch( Throwable t )
566 {
567 ExceptionMonitor.getInstance().exceptionCaught( t );
568 }
569 finally
570 {
571 synchronized( request )
572 {
573 request.done = true;
574 request.notify();
575 }
576 }
577 }
578 }
579
580 public void updateTrafficMask( DatagramSessionImpl session )
581 {
582
583
584
585 }
586
587 private static class RegistrationRequest
588 {
589 private final SocketAddress address;
590 private final IoHandler handler;
591 private final IoFilterChainBuilder filterChainBuilder;
592
593 private Throwable exception;
594 private boolean done;
595
596 private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
597 {
598 this.address = address;
599 this.handler = handler;
600 this.filterChainBuilder = filterChainBuilder;
601 }
602 }
603
604 private static class CancellationRequest
605 {
606 private final SocketAddress address;
607 private boolean done;
608 private RuntimeException exception;
609
610 private CancellationRequest( SocketAddress address )
611 {
612 this.address = address;
613 }
614 }
615 }