View Javadoc

1   /*
2    *   @(#) $Id: SocketConnector.java 165586 2005-05-02 06:27:27Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.net.ConnectException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.SocketChannel;
28  import java.util.Iterator;
29  import java.util.Set;
30  
31  import org.apache.mina.common.BaseSessionManager;
32  import org.apache.mina.io.IoConnector;
33  import org.apache.mina.io.IoFilterChain;
34  import org.apache.mina.io.IoHandler;
35  import org.apache.mina.io.IoSession;
36  import org.apache.mina.io.IoSessionManagerFilterChain;
37  import org.apache.mina.util.ExceptionUtil;
38  import org.apache.mina.util.Queue;
39  
40  /***
41   * {@link IoConnector} for socket transport (TCP/IP).
42   * 
43   * @author Trustin Lee (trustin@apache.org)
44   * @version $Rev: 165586 $, $Date: 2005-05-02 15:27:27 +0900 (?, 02  5? 2005) $
45   */
46  public class SocketConnector extends BaseSessionManager implements IoConnector
47  {
48      private static volatile int nextId = 0;
49  
50      private final int id = nextId++;
51  
52      private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
53  
54      private final Selector selector;
55  
56      private final Queue connectQueue = new Queue();
57  
58      private Worker worker;
59  
60      /***
61       * Creates a new instance.
62       * 
63       * @throws IOException
64       */
65      public SocketConnector() throws IOException
66      {
67          selector = Selector.open();
68      }
69  
70      public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
71      {
72          return connect( address, null, Integer.MAX_VALUE, handler);
73      }
74  
75      public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException
76      {
77          return connect( address, localAddress, Integer.MAX_VALUE, handler);
78      }
79  
80      public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
81      {
82          return connect( address, null, timeout, handler);
83      }
84  
85      public IoSession connect( SocketAddress address, SocketAddress localAddress,
86                                int timeout, IoHandler handler ) throws IOException
87      {
88          if( address == null )
89              throw new NullPointerException( "address" );
90          if( handler == null )
91              throw new NullPointerException( "handler" );
92  
93          if( timeout <= 0 )
94              throw new IllegalArgumentException( "Illegal timeout: " + timeout );
95  
96          if( ! ( address instanceof InetSocketAddress ) )
97              throw new IllegalArgumentException( "Unexpected address type: "
98                                                  + address.getClass() );
99  
100         if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
101             throw new IllegalArgumentException( "Unexpected local address type: "
102                                                 + localAddress.getClass() );
103 
104         SocketChannel ch = SocketChannel.open();
105         boolean success = false;
106         try
107         {
108             ch.socket().setReuseAddress( true );
109             if( localAddress != null )
110             {
111                 ch.socket().bind( localAddress );
112             }
113     
114             ch.configureBlocking( false );
115 
116             if( ch.connect( address ) )
117             {
118                 SocketSession session = newSession( ch, handler );
119                 success = true;
120                 return session;
121             }
122             
123             success = true;
124         }
125         finally
126         {
127             if( !success )
128             {
129                 ch.close();
130             }
131         }
132         
133         ConnectionRequest request = new ConnectionRequest( ch, timeout, handler );
134         synchronized( this )
135         {
136             synchronized( connectQueue )
137             {
138                 connectQueue.push( request );
139             }
140             startupWorker();
141         }
142         selector.wakeup();
143 
144         synchronized( request )
145         {
146             while( !request.done )
147             {
148                 try
149                 {
150                     request.wait();
151                 }
152                 catch( InterruptedException e )
153                 {
154                 }
155             }
156         }
157 
158         if( request.exception != null )
159         {
160             ExceptionUtil.throwException( request.exception );
161         }
162 
163         return request.session;
164     }
165     
166     private synchronized void startupWorker()
167     {
168         if( worker == null )
169         {
170             worker = new Worker();
171             worker.start();
172         }
173     }
174 
175     private void registerNew()
176     {
177         if( connectQueue.isEmpty() )
178             return;
179 
180         for( ;; )
181         {
182             ConnectionRequest req;
183             synchronized( connectQueue )
184             {
185                 req = ( ConnectionRequest ) connectQueue.pop();
186             }
187 
188             if( req == null )
189                 break;
190             
191             SocketChannel ch = req.channel;
192             try
193             {
194                 ch.register( selector, SelectionKey.OP_CONNECT, req );
195             }
196             catch( IOException e )
197             {
198                 req.exception = e;
199                 synchronized( req )
200                 {
201                     req.done = true;
202                     req.notify();
203                 }
204             }
205         }
206     }
207     
208     private void processSessions( Set keys )
209     {
210         Iterator it = keys.iterator();
211 
212         while( it.hasNext() )
213         {
214             SelectionKey key = ( SelectionKey ) it.next();
215 
216             if( !key.isConnectable() )
217                 continue;
218 
219             SocketChannel ch = ( SocketChannel ) key.channel();
220             ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
221 
222             try
223             {
224                 ch.finishConnect();
225                 SocketSession session = newSession( ch, entry.handler );
226                 entry.session = session;
227             }
228             catch( Throwable e )
229             {
230                 entry.exception = e;
231             }
232             finally
233             {
234                 key.cancel();
235                 if( entry.session == null )
236                 {
237                     try
238                     {
239                         ch.close();
240                     }
241                     catch( IOException e )
242                     {
243                         exceptionMonitor.exceptionCaught( this, e );
244                     }
245                 }
246 
247                 synchronized( entry )
248                 {
249                     entry.done = true;
250                     entry.notify();
251                 }
252             }
253         }
254 
255         keys.clear();
256     }
257 
258     private void processTimedOutSessions( Set keys )
259     {
260         long currentTime = System.currentTimeMillis();
261         Iterator it = keys.iterator();
262 
263         while( it.hasNext() )
264         {
265             SelectionKey key = ( SelectionKey ) it.next();
266 
267             if( !key.isValid() )
268                 continue;
269 
270             ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
271 
272             if( currentTime >= entry.deadline )
273             {
274                 entry.exception = new ConnectException();
275                 entry.done = true;
276 
277                 synchronized( entry )
278                 {
279                     entry.notify();
280                 }
281 
282                 key.cancel();
283             }
284         }
285     }
286 
287     private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException
288     {
289         SocketSession session = new SocketSession( filters, ch, handler );
290         try
291         {
292             handler.sessionCreated( session );
293         }
294         catch( Throwable e )
295         {
296             ExceptionUtil.throwException( e );
297         }
298         SocketIoProcessor.getInstance().addSession( session );
299         return session;
300     }
301 
302     private class Worker extends Thread
303     {
304         public Worker()
305         {
306             super( "SocketConnector-" + id );
307         }
308 
309         public void run()
310         {
311             for( ;; )
312             {
313                 try
314                 {
315                     int nKeys = selector.select( 1000 );
316 
317                     registerNew();
318                     
319                     if( nKeys > 0 )
320                     {
321                         processSessions( selector.selectedKeys() );
322                     }
323 
324                     processTimedOutSessions( selector.keys() );
325 
326                     if( selector.keys().isEmpty() )
327                     {
328                         synchronized( SocketConnector.this )
329                         {
330                             if( selector.keys().isEmpty() &&
331                                 connectQueue.isEmpty() )
332                             {
333                                 worker = null;
334                                 break;
335                             }
336                         }
337                     }
338                 }
339                 catch( IOException e )
340                 {
341                     exceptionMonitor.exceptionCaught( SocketConnector.this, e );
342 
343                     try
344                     {
345                         Thread.sleep( 1000 );
346                     }
347                     catch( InterruptedException e1 )
348                     {
349                     }
350                 }
351             }
352         }
353     }
354 
355     private static class ConnectionRequest
356     {
357         private final SocketChannel channel;
358         
359         private final long deadline;
360 
361         private final IoHandler handler;
362         
363         private SocketSession session;
364 
365         private boolean done;
366 
367         private Throwable exception;
368 
369         private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
370         {
371             this.channel = channel;
372             this.deadline = System.currentTimeMillis() + timeout * 1000L;
373             this.handler = handler;
374         }
375     }
376 
377     public IoFilterChain getFilterChain()
378     {
379         return filters;
380     }
381 }