View Javadoc

1   /*
2    *   @(#) $Id: SocketAcceptor.java 210062 2005-07-11 03:52:38Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.ServerSocketChannel;
27  import java.nio.channels.SocketChannel;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.Map;
31  import java.util.Set;
32  
33  import org.apache.mina.common.BaseSessionManager;
34  import org.apache.mina.io.IoAcceptor;
35  import org.apache.mina.io.IoFilterChain;
36  import org.apache.mina.io.IoHandler;
37  import org.apache.mina.io.IoSessionManagerFilterChain;
38  import org.apache.mina.util.Queue;
39  
40  /***
41   * {@link IoAcceptor} for socket transport (TCP/IP).
42   * 
43   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
44   * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
45   */
46  public class SocketAcceptor extends BaseSessionManager implements IoAcceptor
47  {
48      private static volatile int nextId = 0;
49  
50      private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
51  
52      private final int id = nextId ++ ;
53  
54      private Selector selector;
55  
56      private final Map channels = new HashMap();
57  
58      private final Queue registerQueue = new Queue();
59  
60      private final Queue cancelQueue = new Queue();
61      
62      private int backlog = 50;
63  
64      private Worker worker;
65  
66  
67      /***
68       * Creates a new instance.
69       */
70      public SocketAcceptor()
71      {
72      }
73  
74      /***
75       * Binds to the specified <code>address</code> and handles incoming
76       * connections with the specified <code>handler</code>.  Backlog value
77       * is configured to the value of <code>backlog</code> property.
78       *
79       * @throws IOException if failed to bind
80       */
81      public void bind( SocketAddress address, IoHandler handler ) throws IOException
82      {
83          if( address == null )
84          {
85              throw new NullPointerException( "address" );
86          }
87  
88          if( handler == null )
89          {
90              throw new NullPointerException( "handler" );
91          }
92  
93          if( !( address instanceof InetSocketAddress ) )
94          {
95              throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
96          }
97  
98          if( ( ( InetSocketAddress ) address ).getPort() == 0 )
99          {
100             throw new IllegalArgumentException( "Unsupported port number: 0" );
101         }
102         
103         RegistrationRequest request = new RegistrationRequest( address, backlog, handler );
104 
105         synchronized( this )
106         {
107             synchronized( registerQueue )
108             {
109                 registerQueue.push( request );
110             }
111             startupWorker();
112         }
113         
114         selector.wakeup();
115         
116         synchronized( request )
117         {
118             while( !request.done )
119             {
120                 try
121                 {
122                     request.wait();
123                 }
124                 catch( InterruptedException e )
125                 {
126                 }
127             }
128         }
129         
130         if( request.exception != null )
131         {
132             throw request.exception;
133         }
134     }
135 
136 
137     private synchronized void startupWorker() throws IOException
138     {
139         if( worker == null )
140         {
141             selector = Selector.open();
142             worker = new Worker();
143 
144             worker.start();
145         }
146     }
147 
148 
149     public void unbind( SocketAddress address )
150     {
151         if( address == null )
152         {
153             throw new NullPointerException( "address" );
154         }
155 
156         CancellationRequest request = new CancellationRequest( address );
157         synchronized( this )
158         {
159             try
160             {
161                 startupWorker();
162             }
163             catch( IOException e )
164             {
165                 // IOException is thrown only when Worker thread is not
166                 // running and failed to open a selector.  We simply throw
167                 // IllegalArgumentException here because we can simply
168                 // conclude that nothing is bound to the selector.
169                 throw new IllegalArgumentException( "Address not bound: " + address );
170             }
171 
172             synchronized( cancelQueue )
173             {
174                 cancelQueue.push( request );
175             }
176         }
177         
178         selector.wakeup();
179 
180         synchronized( request )
181         {
182             while( !request.done )
183             {
184                 try
185                 {
186                     request.wait();
187                 }
188                 catch( InterruptedException e )
189                 {
190                 }
191             }
192         }
193         
194         if( request.exception != null )
195         {
196             request.exception.fillInStackTrace();
197 
198             throw request.exception;
199         }
200     }
201     
202     /***
203      * Returns the default backlog value which is used when user binds. 
204      */
205     public int getBacklog()
206     {
207         return backlog;
208     }
209     
210     /***
211      * Sets the default backlog value which is used when user binds. 
212      */
213     public void setBacklog( int defaultBacklog )
214     {
215         if( defaultBacklog <= 0 )
216         {
217             throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog );
218         }
219         this.backlog = defaultBacklog;
220     }
221 
222 
223     private class Worker extends Thread
224     {
225         public Worker()
226         {
227             super( "SocketAcceptor-" + id );
228         }
229 
230         public void run()
231         {
232             for( ;; )
233             {
234                 try
235                 {
236                     int nKeys = selector.select();
237 
238                     registerNew();
239                     cancelKeys();
240 
241                     if( nKeys > 0 )
242                     {
243                         processSessions( selector.selectedKeys() );
244                     }
245 
246                     if( selector.keys().isEmpty() )
247                     {
248                         synchronized( SocketAcceptor.this )
249                         {
250                             if( selector.keys().isEmpty() &&
251                                 registerQueue.isEmpty() &&
252                                 cancelQueue.isEmpty() )
253                             {
254                                 worker = null;
255                                 try
256                                 {
257                                     selector.close();
258                                 }
259                                 catch( IOException e )
260                                 {
261                                     exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
262                                 }
263                                 finally
264                                 {
265                                     selector = null;
266                                 }
267                                 break;
268                             }
269                         }
270                     }
271                 }
272                 catch( IOException e )
273                 {
274                     exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
275 
276                     try
277                     {
278                         Thread.sleep( 1000 );
279                     }
280                     catch( InterruptedException e1 )
281                     {
282                     }
283                 }
284             }
285         }
286 
287         private void processSessions( Set keys ) throws IOException
288         {
289             Iterator it = keys.iterator();
290             while( it.hasNext() )
291             {
292                 SelectionKey key = ( SelectionKey ) it.next();
293    
294                 it.remove();
295    
296                 if( !key.isAcceptable() )
297                 {
298                     continue;
299                 }
300    
301                 ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
302    
303                 SocketChannel ch = ssc.accept();
304    
305                 if( ch == null )
306                 {
307                     continue;
308                 }
309    
310                 boolean success = false;
311                 try
312                 {
313                     RegistrationRequest req = ( RegistrationRequest ) key.attachment();
314                     SocketSession session = new SocketSession( filters, ch, req.handler );
315                     req.handler.sessionCreated( session );
316                     SocketIoProcessor.getInstance().addSession( session );
317                     success = true;
318                 }
319                 catch( Throwable t )
320                 {
321                     exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
322                 }
323                 finally
324                 {
325                     if( !success )
326                     {
327                         ch.close();
328                     }
329                 }
330             }
331         }
332     }
333 
334 
335     private void registerNew()
336     {
337         if( registerQueue.isEmpty() )
338         {
339             return;
340         }
341 
342         for( ;; )
343         {
344             RegistrationRequest req;
345 
346             synchronized( registerQueue )
347             {
348                 req = ( RegistrationRequest ) registerQueue.pop();
349             }
350 
351             if( req == null )
352             {
353                 break;
354             }
355 
356             ServerSocketChannel ssc = null;
357 
358             try
359             {
360                 ssc = ServerSocketChannel.open();
361                 ssc.configureBlocking( false );
362                 ssc.socket().bind( req.address, req.backlog );
363                 ssc.register( selector, SelectionKey.OP_ACCEPT, req );
364 
365                 channels.put( req.address, ssc );
366             }
367             catch( IOException e )
368             {
369                 req.exception = e;
370             }
371             finally
372             {
373                 synchronized( req )
374                 {
375                     req.done = true;
376 
377                     req.notify();
378                 }
379 
380                 if( ssc != null && req.exception != null )
381                 {
382                     try
383                     {
384                         ssc.close();
385                     }
386                     catch( IOException e )
387                     {
388                         exceptionMonitor.exceptionCaught( this, e );
389                     }
390                 }
391             }
392         }
393     }
394 
395 
396     private void cancelKeys()
397     {
398         if( cancelQueue.isEmpty() )
399         {
400             return;
401         }
402 
403         for( ;; )
404         {
405             CancellationRequest request;
406 
407             synchronized( cancelQueue )
408             {
409                 request = ( CancellationRequest ) cancelQueue.pop();
410             }
411 
412             if( request == null )
413             {
414                 break;
415             }
416 
417             ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
418             
419             // close the channel
420             try
421             {
422                 if( ssc == null )
423                 {
424                     request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
425                 }
426                 else
427                 {
428                     SelectionKey key = ssc.keyFor( selector );
429 
430                     key.cancel();
431 
432                     selector.wakeup(); // wake up again to trigger thread death
433 
434                     ssc.close();
435                 }
436             }
437             catch( IOException e )
438             {
439                 exceptionMonitor.exceptionCaught( this, e );
440             }
441             finally
442             {
443                 synchronized( request )
444                 {
445                     request.done = true;
446 
447                     request.notify();
448                 }
449             }
450         }
451     }
452 
453     public IoFilterChain getFilterChain()
454     {
455         return filters;
456     }
457 
458     private static class RegistrationRequest
459     {
460         private final SocketAddress address;
461         
462         private final int backlog;
463 
464         private final IoHandler handler;
465         
466         private IOException exception; 
467         
468         private boolean done;
469         
470         private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler )
471         {
472             this.address = address;
473             this.backlog = backlog;
474             this.handler = handler;
475         }
476     }
477 
478 
479     private static class CancellationRequest
480     {
481         private final SocketAddress address;
482 
483         private boolean done;
484 
485         private RuntimeException exception;
486         
487         private CancellationRequest( SocketAddress address )
488         {
489             this.address = address;
490         }
491     }
492 }