View Javadoc

1   /*
2    *   @(#) $Id: DatagramAcceptorDelegate.java 355016 2005-12-08 07:00:30Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.transport.socket.nio.support;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.util.HashMap;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.Set;
31  
32  import org.apache.mina.common.ByteBuffer;
33  import org.apache.mina.common.ExceptionMonitor;
34  import org.apache.mina.common.IoAcceptor;
35  import org.apache.mina.common.IoFilterChainBuilder;
36  import org.apache.mina.common.IoHandler;
37  import org.apache.mina.common.IoSession;
38  import org.apache.mina.common.IoFilter.WriteRequest;
39  import org.apache.mina.common.support.BaseIoAcceptor;
40  import org.apache.mina.util.ExceptionUtil;
41  import org.apache.mina.util.Queue;
42  
43  /***
44   * {@link IoAcceptor} for datagram transport (UDP/IP).
45   * 
46   * @author The Apache Directory Project (dev@directory.apache.org)
47   * @version $Rev: 355016 $, $Date: 2005-12-08 16:00:30 +0900 (Thu, 08 Dec 2005) $
48   */
49  public class DatagramAcceptorDelegate extends BaseIoAcceptor implements IoAcceptor, DatagramSessionManager
50  {
51      private static volatile int nextId = 0;
52  
53      private final IoAcceptor wrapper;
54      private final int id = nextId ++ ;
55      private Selector selector;
56      private final Map channels = new HashMap();
57      private final Queue registerQueue = new Queue();
58      private final Queue cancelQueue = new Queue();
59      private final Queue flushingSessions = new Queue();
60      private Worker worker;
61  
62      /***
63       * Creates a new instance.
64       */
65      public DatagramAcceptorDelegate( IoAcceptor wrapper )
66      {
67          this.wrapper = wrapper;
68      }
69  
70      public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
71              throws IOException
72      {
73          if( address == null )
74              throw new NullPointerException( "address" );
75          if( handler == null )
76              throw new NullPointerException( "handler" );
77  
78          if( !( address instanceof InetSocketAddress ) )
79              throw new IllegalArgumentException( "Unexpected address type: "
80                                                  + address.getClass() );
81          if( ( ( InetSocketAddress ) address ).getPort() == 0 )
82              throw new IllegalArgumentException( "Unsupported port number: 0" );
83          
84          if( filterChainBuilder == null )
85          {
86              filterChainBuilder = IoFilterChainBuilder.NOOP;
87          }
88  
89          RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder );
90          synchronized( this )
91          {
92              synchronized( registerQueue )
93              {
94                  registerQueue.push( request );
95              }
96              startupWorker();
97          }
98          selector.wakeup();
99          
100         synchronized( request )
101         {
102             while( !request.done )
103             {
104                 try
105                 {
106                     request.wait();
107                 }
108                 catch( InterruptedException e )
109                 {
110                 }
111             }
112         }
113         
114         if( request.exception != null )
115         {
116             ExceptionUtil.throwException( request.exception );
117         }
118     }
119 
120     public void unbind( SocketAddress address )
121     {
122         // TODO: DIRMINA-93
123         if( address == null )
124             throw new NullPointerException( "address" );
125 
126         CancellationRequest request = new CancellationRequest( address );
127         synchronized( this )
128         {
129             try
130             {
131                 startupWorker();
132             }
133             catch( IOException e )
134             {
135                 // IOException is thrown only when Worker thread is not
136                 // running and failed to open a selector.  We simply throw
137                 // IllegalArgumentException here because we can simply
138                 // conclude that nothing is bound to the selector.
139                 throw new IllegalArgumentException( "Address not bound: " + address );
140             }
141 
142             synchronized( cancelQueue )
143             {
144                 cancelQueue.push( request );
145             }
146         }
147         selector.wakeup();
148         
149         synchronized( request )
150         {
151             while( !request.done )
152             {
153                 try
154                 {
155                     request.wait();
156                 }
157                 catch( InterruptedException e )
158                 {
159                 }
160             }
161         }
162         
163         if( request.exception != null )
164         {
165             request.exception.fillInStackTrace();
166             throw request.exception;
167         }
168     }
169     
170     public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
171     {
172         if( remoteAddress == null )
173         {
174             throw new NullPointerException( "remoteAddress" );
175         }
176         if( localAddress == null )
177         {
178             throw new NullPointerException( "localAddress" );
179         }
180         
181         Selector selector = this.selector;
182         DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
183         if( selector == null || ch == null )
184         {
185             throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
186         }
187             
188         SelectionKey key = ch.keyFor( selector );
189         if( key == null )
190         {
191             throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
192         }
193 
194         RegistrationRequest req = ( RegistrationRequest ) key.attachment();
195         DatagramSessionImpl s = new DatagramSessionImpl( wrapper, this, ch, req.handler );
196         s.setRemoteAddress( remoteAddress );
197         s.setSelectionKey( key );
198         
199         try
200         {
201             this.filterChainBuilder.buildFilterChain( s.getFilterChain() );
202             req.filterChainBuilder.buildFilterChain( s.getFilterChain() );
203             ( ( DatagramFilterChain ) s.getFilterChain() ).sessionCreated( s );
204         }
205         catch( Throwable t )
206         {
207             ExceptionMonitor.getInstance().exceptionCaught( t );
208         }
209         
210         return s;
211     }
212 
213     private synchronized void startupWorker() throws IOException
214     {
215         if( worker == null )
216         {
217             selector = Selector.open();
218             worker = new Worker();
219             worker.start();
220         }
221     }
222 
223     public void flushSession( DatagramSessionImpl session )
224     {
225         scheduleFlush( session );
226         Selector selector = this.selector;
227         if( selector != null )
228         {
229             selector.wakeup();
230         }
231     }
232 
233     public void closeSession( DatagramSessionImpl session )
234     {
235     }
236 
237     private void scheduleFlush( DatagramSessionImpl session )
238     {
239         synchronized( flushingSessions )
240         {
241             flushingSessions.push( session );
242         }
243     }
244 
245     private class Worker extends Thread
246     {
247         public Worker()
248         {
249             super( "DatagramAcceptor-" + id );
250         }
251 
252         public void run()
253         {
254             for( ;; )
255             {
256                 try
257                 {
258                     int nKeys = selector.select();
259 
260                     registerNew();
261 
262                     if( nKeys > 0 )
263                     {
264                         processReadySessions( selector.selectedKeys() );
265                     }
266 
267                     flushSessions();
268                     cancelKeys();
269 
270                     if( selector.keys().isEmpty() )
271                     {
272                         synchronized( DatagramAcceptorDelegate.this )
273                         {
274                             if( selector.keys().isEmpty() &&
275                                 registerQueue.isEmpty() &&
276                                 cancelQueue.isEmpty() )
277                             {
278                                 worker = null;
279                                 try
280                                 {
281                                     selector.close();
282                                 }
283                                 catch( IOException e )
284                                 {
285                                     ExceptionMonitor.getInstance().exceptionCaught( e );
286                                 }
287                                 finally
288                                 {
289                                     selector = null;
290                                 }
291                                 break;
292                             }
293                         }
294                     }
295                 }
296                 catch( IOException e )
297                 {
298                     ExceptionMonitor.getInstance().exceptionCaught( e );
299 
300                     try
301                     {
302                         Thread.sleep( 1000 );
303                     }
304                     catch( InterruptedException e1 )
305                     {
306                     }
307                 }
308             }
309         }
310     }
311 
312     private void processReadySessions( Set keys )
313     {
314         Iterator it = keys.iterator();
315         while( it.hasNext() )
316         {
317             SelectionKey key = ( SelectionKey ) it.next();
318             it.remove();
319 
320             DatagramChannel ch = ( DatagramChannel ) key.channel();
321 
322             RegistrationRequest req = ( RegistrationRequest ) key.attachment();
323             DatagramSessionImpl session =
324                 new DatagramSessionImpl( wrapper, this, ch, req.handler );
325             session.setSelectionKey( key );
326             
327             try
328             {
329                 ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session );
330 
331                 if( key.isReadable() )
332                 {
333                     readSession( session );
334                 }
335 
336                 if( key.isWritable() )
337                 {
338                     scheduleFlush( session );
339                 }
340             }
341             catch( Throwable t )
342             {
343                 ExceptionMonitor.getInstance().exceptionCaught( t );
344             }
345         }
346     }
347 
348     private void readSession( DatagramSessionImpl session )
349     {
350 
351         ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
352         try
353         {
354             SocketAddress remoteAddress = session.getChannel().receive(
355                     readBuf.buf() );
356             if( remoteAddress != null )
357             {
358                 readBuf.flip();
359                 session.setRemoteAddress( remoteAddress );
360 
361                 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
362                 newBuf.put( readBuf );
363                 newBuf.flip();
364 
365                 session.increaseReadBytes( newBuf.remaining() );
366                 ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
367             }
368         }
369         catch( IOException e )
370         {
371             ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
372         }
373         finally
374         {
375             readBuf.release();
376         }
377     }
378 
379     private void flushSessions()
380     {
381         if( flushingSessions.size() == 0 )
382             return;
383 
384         for( ;; )
385         {
386             DatagramSessionImpl session;
387 
388             synchronized( flushingSessions )
389             {
390                 session = ( DatagramSessionImpl ) flushingSessions.pop();
391             }
392 
393             if( session == null )
394                 break;
395 
396             try
397             {
398                 flush( session );
399             }
400             catch( IOException e )
401             {
402                 ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
403             }
404         }
405     }
406 
407     private void flush( DatagramSessionImpl session ) throws IOException
408     {
409         DatagramChannel ch = session.getChannel();
410 
411         Queue writeRequestQueue = session.getWriteRequestQueue();
412 
413         WriteRequest req;
414         for( ;; )
415         {
416             synchronized( writeRequestQueue )
417             {
418                 req = ( WriteRequest ) writeRequestQueue.first();
419             }
420 
421             if( req == null )
422                 break;
423 
424             ByteBuffer buf = ( ByteBuffer ) req.getMessage();
425             if( buf.remaining() == 0 )
426             {
427                 // pop and fire event
428                 synchronized( writeRequestQueue )
429                 {
430                     writeRequestQueue.pop();
431                 }
432 
433                 req.getFuture().setWritten( true );
434                 session.increaseWrittenWriteRequests();
435                 ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf );
436                 continue;
437             }
438 
439             SelectionKey key = session.getSelectionKey();
440             if( key == null )
441             {
442                 scheduleFlush( session );
443                 break;
444             }
445             if( !key.isValid() )
446             {
447                 continue;
448             }
449 
450             int pos = buf.position();
451             int writtenBytes = ch
452                     .send( buf.buf(), session.getRemoteAddress() );
453 
454             if( writtenBytes == 0 )
455             {
456                 // Kernel buffer is full
457                 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
458             }
459             else if( writtenBytes > 0 )
460             {
461                 key.interestOps( key.interestOps()
462                                  & ( ~SelectionKey.OP_WRITE ) );
463 
464                 // pop and fire event
465                 synchronized( writeRequestQueue )
466                 {
467                     writeRequestQueue.pop();
468                 }
469 
470                 session.increaseWrittenBytes( writtenBytes );
471                 req.getFuture().setWritten( true );
472                 session.increaseWrittenWriteRequests();
473                 ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) );
474             }
475         }
476     }
477 
478     private void registerNew()
479     {
480         if( registerQueue.isEmpty() )
481             return;
482 
483         for( ;; )
484         {
485             RegistrationRequest req;
486             synchronized( registerQueue )
487             {
488                 req = ( RegistrationRequest ) registerQueue.pop();
489             }
490 
491             if( req == null )
492                 break;
493 
494             DatagramChannel ch = null;
495             try
496             {
497                 ch = DatagramChannel.open();
498                 ch.configureBlocking( false );
499                 ch.socket().bind( req.address );
500                 ch.register( selector, SelectionKey.OP_READ, req );
501                 channels.put( req.address, ch );
502             }
503             catch( Throwable t )
504             {
505                 req.exception = t;
506             }
507             finally
508             {
509                 synchronized( req )
510                 {
511                     req.done = true;
512                     req.notify();
513                 }
514 
515                 if( ch != null && req.exception != null )
516                 {
517                     try
518                     {
519                         ch.close();
520                     }
521                     catch( Throwable e )
522                     {
523                         ExceptionMonitor.getInstance().exceptionCaught( e );
524                     }
525                 }
526             }
527         }
528     }
529 
530     private void cancelKeys()
531     {
532         if( cancelQueue.isEmpty() )
533             return;
534 
535         for( ;; )
536         {
537             CancellationRequest request;
538             synchronized( cancelQueue )
539             {
540                 request = ( CancellationRequest ) cancelQueue.pop();
541             }
542             
543             if( request == null )
544             {
545                 break;
546             }
547 
548             DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
549             // close the channel
550             try
551             {
552                 if( ch == null )
553                 {
554                     request.exception = new IllegalArgumentException(
555                             "Address not bound: " + request.address );
556                 }
557                 else
558                 {
559                     SelectionKey key = ch.keyFor( selector );
560                     key.cancel();
561                     selector.wakeup(); // wake up again to trigger thread death
562                     ch.close();
563                 }
564             }
565             catch( Throwable t )
566             {
567                 ExceptionMonitor.getInstance().exceptionCaught( t );
568             }
569             finally
570             {
571                 synchronized( request )
572                 {
573                     request.done = true;
574                     request.notify();
575                 }
576             }
577         }
578     }
579     
580     public void updateTrafficMask( DatagramSessionImpl session )
581     {
582         // There's no point in changing the traffic mask for sessions originating
583         // from this acceptor since new sessions are created every time data is
584         // received.
585     }
586 
587     private static class RegistrationRequest
588     {
589         private final SocketAddress address;
590         private final IoHandler handler;
591         private final IoFilterChainBuilder filterChainBuilder;
592 
593         private Throwable exception; 
594         private boolean done;
595         
596         private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
597         {
598             this.address = address;
599             this.handler = handler;
600             this.filterChainBuilder = filterChainBuilder;
601         }
602     }
603 
604     private static class CancellationRequest
605     {
606         private final SocketAddress address;
607         private boolean done;
608         private RuntimeException exception;
609         
610         private CancellationRequest( SocketAddress address )
611         {
612             this.address = address;
613         }
614     }
615 }