View Javadoc

1   /*
2    *   @(#) $Id: SocketConnector.java 210062 2005-07-11 03:52:38Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.net.ConnectException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.SocketChannel;
28  import java.util.Iterator;
29  import java.util.Set;
30  
31  import org.apache.mina.common.BaseSessionManager;
32  import org.apache.mina.io.IoConnector;
33  import org.apache.mina.io.IoFilterChain;
34  import org.apache.mina.io.IoHandler;
35  import org.apache.mina.io.IoSession;
36  import org.apache.mina.io.IoSessionManagerFilterChain;
37  import org.apache.mina.util.ExceptionUtil;
38  import org.apache.mina.util.Queue;
39  
40  /***
41   * {@link IoConnector} for socket transport (TCP/IP).
42   * 
43   * @author Trustin Lee (trustin@apache.org)
44   * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
45   */
46  public class SocketConnector extends BaseSessionManager implements IoConnector
47  {
48      private static volatile int nextId = 0;
49  
50      private final int id = nextId++;
51  
52      private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
53  
54      private Selector selector;
55  
56      private final Queue connectQueue = new Queue();
57  
58      private Worker worker;
59  
60      /***
61       * Creates a new instance.
62       */
63      public SocketConnector()
64      {
65      }
66  
67      public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
68      {
69          return connect( address, null, Integer.MAX_VALUE, handler);
70      }
71  
72      public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException
73      {
74          return connect( address, localAddress, Integer.MAX_VALUE, handler);
75      }
76  
77      public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
78      {
79          return connect( address, null, timeout, handler);
80      }
81  
82      public IoSession connect( SocketAddress address, SocketAddress localAddress,
83                                int timeout, IoHandler handler ) throws IOException
84      {
85          if( address == null )
86              throw new NullPointerException( "address" );
87          if( handler == null )
88              throw new NullPointerException( "handler" );
89  
90          if( timeout <= 0 )
91              throw new IllegalArgumentException( "Illegal timeout: " + timeout );
92  
93          if( ! ( address instanceof InetSocketAddress ) )
94              throw new IllegalArgumentException( "Unexpected address type: "
95                                                  + address.getClass() );
96  
97          if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
98              throw new IllegalArgumentException( "Unexpected local address type: "
99                                                  + localAddress.getClass() );
100 
101         SocketChannel ch = SocketChannel.open();
102         boolean success = false;
103         try
104         {
105             ch.socket().setReuseAddress( true );
106             if( localAddress != null )
107             {
108                 ch.socket().bind( localAddress );
109             }
110     
111             ch.configureBlocking( false );
112 
113             if( ch.connect( address ) )
114             {
115                 SocketSession session = newSession( ch, handler );
116                 success = true;
117                 return session;
118             }
119             
120             success = true;
121         }
122         finally
123         {
124             if( !success )
125             {
126                 ch.close();
127             }
128         }
129         
130         ConnectionRequest request = new ConnectionRequest( ch, timeout, handler );
131         synchronized( this )
132         {
133             synchronized( connectQueue )
134             {
135                 connectQueue.push( request );
136             }
137             startupWorker();
138         }
139         selector.wakeup();
140 
141         synchronized( request )
142         {
143             while( !request.done )
144             {
145                 try
146                 {
147                     request.wait();
148                 }
149                 catch( InterruptedException e )
150                 {
151                 }
152             }
153         }
154 
155         if( request.exception != null )
156         {
157             ExceptionUtil.throwException( request.exception );
158         }
159 
160         return request.session;
161     }
162     
163     private synchronized void startupWorker() throws IOException
164     {
165         if( worker == null )
166         {
167             selector = Selector.open();
168             worker = new Worker();
169             worker.start();
170         }
171     }
172 
173     private void registerNew()
174     {
175         if( connectQueue.isEmpty() )
176             return;
177 
178         for( ;; )
179         {
180             ConnectionRequest req;
181             synchronized( connectQueue )
182             {
183                 req = ( ConnectionRequest ) connectQueue.pop();
184             }
185 
186             if( req == null )
187                 break;
188             
189             SocketChannel ch = req.channel;
190             try
191             {
192                 ch.register( selector, SelectionKey.OP_CONNECT, req );
193             }
194             catch( IOException e )
195             {
196                 req.exception = e;
197                 synchronized( req )
198                 {
199                     req.done = true;
200                     req.notify();
201                 }
202             }
203         }
204     }
205     
206     private void processSessions( Set keys )
207     {
208         Iterator it = keys.iterator();
209 
210         while( it.hasNext() )
211         {
212             SelectionKey key = ( SelectionKey ) it.next();
213 
214             if( !key.isConnectable() )
215                 continue;
216 
217             SocketChannel ch = ( SocketChannel ) key.channel();
218             ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
219 
220             try
221             {
222                 ch.finishConnect();
223                 SocketSession session = newSession( ch, entry.handler );
224                 entry.session = session;
225             }
226             catch( Throwable e )
227             {
228                 entry.exception = e;
229             }
230             finally
231             {
232                 key.cancel();
233                 if( entry.session == null )
234                 {
235                     try
236                     {
237                         ch.close();
238                     }
239                     catch( IOException e )
240                     {
241                         exceptionMonitor.exceptionCaught( this, e );
242                     }
243                 }
244 
245                 synchronized( entry )
246                 {
247                     entry.done = true;
248                     entry.notify();
249                 }
250             }
251         }
252 
253         keys.clear();
254     }
255 
256     private void processTimedOutSessions( Set keys )
257     {
258         long currentTime = System.currentTimeMillis();
259         Iterator it = keys.iterator();
260 
261         while( it.hasNext() )
262         {
263             SelectionKey key = ( SelectionKey ) it.next();
264 
265             if( !key.isValid() )
266                 continue;
267 
268             ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
269 
270             if( currentTime >= entry.deadline )
271             {
272                 entry.exception = new ConnectException();
273                 entry.done = true;
274 
275                 synchronized( entry )
276                 {
277                     entry.notify();
278                 }
279 
280                 key.cancel();
281             }
282         }
283     }
284 
285     private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException
286     {
287         SocketSession session = new SocketSession( filters, ch, handler );
288         try
289         {
290             handler.sessionCreated( session );
291         }
292         catch( Throwable e )
293         {
294             ExceptionUtil.throwException( e );
295         }
296         SocketIoProcessor.getInstance().addSession( session );
297         return session;
298     }
299 
300     private class Worker extends Thread
301     {
302         public Worker()
303         {
304             super( "SocketConnector-" + id );
305         }
306 
307         public void run()
308         {
309             for( ;; )
310             {
311                 try
312                 {
313                     int nKeys = selector.select( 1000 );
314 
315                     registerNew();
316                     
317                     if( nKeys > 0 )
318                     {
319                         processSessions( selector.selectedKeys() );
320                     }
321 
322                     processTimedOutSessions( selector.keys() );
323 
324                     if( selector.keys().isEmpty() )
325                     {
326                         synchronized( SocketConnector.this )
327                         {
328                             if( selector.keys().isEmpty() &&
329                                 connectQueue.isEmpty() )
330                             {
331                                 worker = null;
332                                 try
333                                 {
334                                     selector.close();
335                                 }
336                                 catch( IOException e )
337                                 {
338                                     exceptionMonitor.exceptionCaught( SocketConnector.this, e );
339                                 }
340                                 finally
341                                 {
342                                     selector = null;
343                                 }
344                                 break;
345                             }
346                         }
347                     }
348                 }
349                 catch( IOException e )
350                 {
351                     exceptionMonitor.exceptionCaught( SocketConnector.this, e );
352 
353                     try
354                     {
355                         Thread.sleep( 1000 );
356                     }
357                     catch( InterruptedException e1 )
358                     {
359                     }
360                 }
361             }
362         }
363     }
364 
365     private static class ConnectionRequest
366     {
367         private final SocketChannel channel;
368         
369         private final long deadline;
370 
371         private final IoHandler handler;
372         
373         private SocketSession session;
374 
375         private boolean done;
376 
377         private Throwable exception;
378 
379         private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
380         {
381             this.channel = channel;
382             this.deadline = System.currentTimeMillis() + timeout * 1000L;
383             this.handler = handler;
384         }
385     }
386 
387     public IoFilterChain getFilterChain()
388     {
389         return filters;
390     }
391 }