View Javadoc

1   /*
2    *   @(#) $Id: SocketAcceptor.java 357871 2005-12-20 01:56:40Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.net.SocketAddress;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.ServerSocketChannel;
27  import java.nio.channels.SocketChannel;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.Map;
31  import java.util.Set;
32  
33  import org.apache.mina.common.BaseSessionManager;
34  import org.apache.mina.io.IoAcceptor;
35  import org.apache.mina.io.IoFilterChain;
36  import org.apache.mina.io.IoHandler;
37  import org.apache.mina.io.IoSession;
38  import org.apache.mina.io.IoSessionManagerFilterChain;
39  import org.apache.mina.util.Queue;
40  
41  /***
42   * {@link IoAcceptor} for socket transport (TCP/IP).
43   * 
44   * @author The Apache Directory Project (dev@directory.apache.org)
45   * @version $Rev: 357871 $, $Date: 2005-12-20 10:56:40 +0900 (Tue, 20 Dec 2005) $
46   */
47  public class SocketAcceptor extends BaseSessionManager implements IoAcceptor
48  {
49      private static volatile int nextId = 0;
50  
51      private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
52  
53      private final int id = nextId ++ ;
54  
55      private Selector selector;
56  
57      private final Map channels = new HashMap();
58  
59      private final Queue registerQueue = new Queue();
60  
61      private final Queue cancelQueue = new Queue();
62      
63      private int backlog = 50;
64  
65      private Worker worker;
66  
67  
68      /***
69       * Creates a new instance.
70       */
71      public SocketAcceptor()
72      {
73      }
74  
75      /***
76       * Binds to the specified <code>address</code> and handles incoming
77       * connections with the specified <code>handler</code>.  Backlog value
78       * is configured to the value of <code>backlog</code> property.
79       *
80       * @throws IOException if failed to bind
81       */
82      public void bind( SocketAddress address, IoHandler handler ) throws IOException
83      {
84          if( address == null )
85          {
86              throw new NullPointerException( "address" );
87          }
88  
89          if( handler == null )
90          {
91              throw new NullPointerException( "handler" );
92          }
93  
94          if( !( address instanceof InetSocketAddress ) )
95          {
96              throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
97          }
98  
99          if( ( ( InetSocketAddress ) address ).getPort() == 0 )
100         {
101             throw new IllegalArgumentException( "Unsupported port number: 0" );
102         }
103         
104         RegistrationRequest request = new RegistrationRequest( address, backlog, handler );
105 
106         synchronized( this )
107         {
108             synchronized( registerQueue )
109             {
110                 registerQueue.push( request );
111             }
112             startupWorker();
113         }
114         
115         selector.wakeup();
116         
117         synchronized( request )
118         {
119             while( !request.done )
120             {
121                 try
122                 {
123                     request.wait();
124                 }
125                 catch( InterruptedException e )
126                 {
127                 }
128             }
129         }
130         
131         if( request.exception != null )
132         {
133             throw request.exception;
134         }
135     }
136 
137 
138     private synchronized void startupWorker() throws IOException
139     {
140         if( worker == null )
141         {
142             selector = Selector.open();
143             worker = new Worker();
144 
145             worker.start();
146         }
147     }
148 
149 
150     public void unbind( SocketAddress address )
151     {
152         if( address == null )
153         {
154             throw new NullPointerException( "address" );
155         }
156 
157         CancellationRequest request = new CancellationRequest( address );
158         synchronized( this )
159         {
160             try
161             {
162                 startupWorker();
163             }
164             catch( IOException e )
165             {
166                 // IOException is thrown only when Worker thread is not
167                 // running and failed to open a selector.  We simply throw
168                 // IllegalArgumentException here because we can simply
169                 // conclude that nothing is bound to the selector.
170                 throw new IllegalArgumentException( "Address not bound: " + address );
171             }
172 
173             synchronized( cancelQueue )
174             {
175                 cancelQueue.push( request );
176             }
177         }
178         
179         selector.wakeup();
180 
181         synchronized( request )
182         {
183             while( !request.done )
184             {
185                 try
186                 {
187                     request.wait();
188                 }
189                 catch( InterruptedException e )
190                 {
191                 }
192             }
193         }
194         
195         if( request.exception != null )
196         {
197             request.exception.fillInStackTrace();
198 
199             throw request.exception;
200         }
201     }
202     
203     /***
204      * Returns the default backlog value which is used when user binds. 
205      */
206     public int getBacklog()
207     {
208         return backlog;
209     }
210     
211     /***
212      * Sets the default backlog value which is used when user binds. 
213      */
214     public void setBacklog( int defaultBacklog )
215     {
216         if( defaultBacklog <= 0 )
217         {
218             throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog );
219         }
220         this.backlog = defaultBacklog;
221     }
222 
223 
224     private class Worker extends Thread
225     {
226         public Worker()
227         {
228             super( "SocketAcceptor-" + id );
229         }
230 
231         public void run()
232         {
233             for( ;; )
234             {
235                 try
236                 {
237                     int nKeys = selector.select();
238 
239                     registerNew();
240                     cancelKeys();
241 
242                     if( nKeys > 0 )
243                     {
244                         processSessions( selector.selectedKeys() );
245                     }
246 
247                     if( selector.keys().isEmpty() )
248                     {
249                         synchronized( SocketAcceptor.this )
250                         {
251                             if( selector.keys().isEmpty() &&
252                                 registerQueue.isEmpty() &&
253                                 cancelQueue.isEmpty() )
254                             {
255                                 worker = null;
256                                 try
257                                 {
258                                     selector.close();
259                                 }
260                                 catch( IOException e )
261                                 {
262                                     exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
263                                 }
264                                 finally
265                                 {
266                                     selector = null;
267                                 }
268                                 break;
269                             }
270                         }
271                     }
272                 }
273                 catch( IOException e )
274                 {
275                     exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
276 
277                     try
278                     {
279                         Thread.sleep( 1000 );
280                     }
281                     catch( InterruptedException e1 )
282                     {
283                     }
284                 }
285             }
286         }
287 
288         private void processSessions( Set keys ) throws IOException
289         {
290             Iterator it = keys.iterator();
291             while( it.hasNext() )
292             {
293                 SelectionKey key = ( SelectionKey ) it.next();
294    
295                 it.remove();
296    
297                 if( !key.isAcceptable() )
298                 {
299                     continue;
300                 }
301    
302                 ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
303    
304                 SocketChannel ch = ssc.accept();
305    
306                 if( ch == null )
307                 {
308                     continue;
309                 }
310    
311                 boolean success = false;
312                 try
313                 {
314                     RegistrationRequest req = ( RegistrationRequest ) key.attachment();
315                     SocketSession session = new SocketSession( filters, ch, req.handler );
316                     req.handler.sessionCreated( session );
317                     SocketIoProcessor.getInstance().addSession( session );
318                     success = true;
319                 }
320                 catch( Throwable t )
321                 {
322                     exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
323                 }
324                 finally
325                 {
326                     if( !success )
327                     {
328                         ch.close();
329                     }
330                 }
331             }
332         }
333     }
334 
335 
336     private void registerNew()
337     {
338         if( registerQueue.isEmpty() )
339         {
340             return;
341         }
342 
343         for( ;; )
344         {
345             RegistrationRequest req;
346 
347             synchronized( registerQueue )
348             {
349                 req = ( RegistrationRequest ) registerQueue.pop();
350             }
351 
352             if( req == null )
353             {
354                 break;
355             }
356 
357             ServerSocketChannel ssc = null;
358 
359             try
360             {
361                 ssc = ServerSocketChannel.open();
362                 ssc.configureBlocking( false );
363                 ssc.socket().bind( req.address, req.backlog );
364                 ssc.register( selector, SelectionKey.OP_ACCEPT, req );
365 
366                 channels.put( req.address, ssc );
367             }
368             catch( IOException e )
369             {
370                 req.exception = e;
371             }
372             finally
373             {
374                 synchronized( req )
375                 {
376                     req.done = true;
377 
378                     req.notify();
379                 }
380 
381                 if( ssc != null && req.exception != null )
382                 {
383                     try
384                     {
385                         ssc.close();
386                     }
387                     catch( IOException e )
388                     {
389                         exceptionMonitor.exceptionCaught( this, e );
390                     }
391                 }
392             }
393         }
394     }
395 
396 
397     private void cancelKeys()
398     {
399         if( cancelQueue.isEmpty() )
400         {
401             return;
402         }
403 
404         for( ;; )
405         {
406             CancellationRequest request;
407 
408             synchronized( cancelQueue )
409             {
410                 request = ( CancellationRequest ) cancelQueue.pop();
411             }
412 
413             if( request == null )
414             {
415                 break;
416             }
417 
418             ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
419             
420             // close the channel
421             try
422             {
423                 if( ssc == null )
424                 {
425                     request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
426                 }
427                 else
428                 {
429                     SelectionKey key = ssc.keyFor( selector );
430 
431                     key.cancel();
432 
433                     selector.wakeup(); // wake up again to trigger thread death
434 
435                     ssc.close();
436                 }
437             }
438             catch( IOException e )
439             {
440                 exceptionMonitor.exceptionCaught( this, e );
441             }
442             finally
443             {
444                 synchronized( request )
445                 {
446                     request.done = true;
447 
448                     request.notify();
449                 }
450             }
451         }
452     }
453 
454     public IoFilterChain getFilterChain()
455     {
456         return filters;
457     }
458 
459     private static class RegistrationRequest
460     {
461         private final SocketAddress address;
462         
463         private final int backlog;
464 
465         private final IoHandler handler;
466         
467         private IOException exception; 
468         
469         private boolean done;
470         
471         private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler )
472         {
473             this.address = address;
474             this.backlog = backlog;
475             this.handler = handler;
476         }
477     }
478 
479 
480     private static class CancellationRequest
481     {
482         private final SocketAddress address;
483 
484         private boolean done;
485 
486         private RuntimeException exception;
487         
488         private CancellationRequest( SocketAddress address )
489         {
490             this.address = address;
491         }
492     }
493 
494 
495     public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
496     {
497         throw new UnsupportedOperationException();
498     }
499 }