1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.socket;
20
21 import java.io.IOException;
22 import java.net.ConnectException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.Iterator;
29 import java.util.Set;
30
31 import org.apache.mina.common.BaseSessionManager;
32 import org.apache.mina.io.IoConnector;
33 import org.apache.mina.io.IoFilterChain;
34 import org.apache.mina.io.IoHandler;
35 import org.apache.mina.io.IoSession;
36 import org.apache.mina.io.IoSessionManagerFilterChain;
37 import org.apache.mina.util.ExceptionUtil;
38 import org.apache.mina.util.Queue;
39
40 /***
41 * {@link IoConnector} for socket transport (TCP/IP).
42 *
43 * @author Trustin Lee (trustin@apache.org)
44 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
45 */
46 public class SocketConnector extends BaseSessionManager implements IoConnector
47 {
48 private static volatile int nextId = 0;
49
50 private final int id = nextId++;
51
52 private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
53
54 private Selector selector;
55
56 private final Queue connectQueue = new Queue();
57
58 private Worker worker;
59
60 /***
61 * Creates a new instance.
62 */
63 public SocketConnector()
64 {
65 }
66
67 public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
68 {
69 return connect( address, null, Integer.MAX_VALUE, handler);
70 }
71
72 public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException
73 {
74 return connect( address, localAddress, Integer.MAX_VALUE, handler);
75 }
76
77 public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
78 {
79 return connect( address, null, timeout, handler);
80 }
81
82 public IoSession connect( SocketAddress address, SocketAddress localAddress,
83 int timeout, IoHandler handler ) throws IOException
84 {
85 if( address == null )
86 throw new NullPointerException( "address" );
87 if( handler == null )
88 throw new NullPointerException( "handler" );
89
90 if( timeout <= 0 )
91 throw new IllegalArgumentException( "Illegal timeout: " + timeout );
92
93 if( ! ( address instanceof InetSocketAddress ) )
94 throw new IllegalArgumentException( "Unexpected address type: "
95 + address.getClass() );
96
97 if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
98 throw new IllegalArgumentException( "Unexpected local address type: "
99 + localAddress.getClass() );
100
101 SocketChannel ch = SocketChannel.open();
102 boolean success = false;
103 try
104 {
105 ch.socket().setReuseAddress( true );
106 if( localAddress != null )
107 {
108 ch.socket().bind( localAddress );
109 }
110
111 ch.configureBlocking( false );
112
113 if( ch.connect( address ) )
114 {
115 SocketSession session = newSession( ch, handler );
116 success = true;
117 return session;
118 }
119
120 success = true;
121 }
122 finally
123 {
124 if( !success )
125 {
126 ch.close();
127 }
128 }
129
130 ConnectionRequest request = new ConnectionRequest( ch, timeout, handler );
131 synchronized( this )
132 {
133 synchronized( connectQueue )
134 {
135 connectQueue.push( request );
136 }
137 startupWorker();
138 }
139 selector.wakeup();
140
141 synchronized( request )
142 {
143 while( !request.done )
144 {
145 try
146 {
147 request.wait();
148 }
149 catch( InterruptedException e )
150 {
151 }
152 }
153 }
154
155 if( request.exception != null )
156 {
157 ExceptionUtil.throwException( request.exception );
158 }
159
160 return request.session;
161 }
162
163 private synchronized void startupWorker() throws IOException
164 {
165 if( worker == null )
166 {
167 selector = Selector.open();
168 worker = new Worker();
169 worker.start();
170 }
171 }
172
173 private void registerNew()
174 {
175 if( connectQueue.isEmpty() )
176 return;
177
178 for( ;; )
179 {
180 ConnectionRequest req;
181 synchronized( connectQueue )
182 {
183 req = ( ConnectionRequest ) connectQueue.pop();
184 }
185
186 if( req == null )
187 break;
188
189 SocketChannel ch = req.channel;
190 try
191 {
192 ch.register( selector, SelectionKey.OP_CONNECT, req );
193 }
194 catch( IOException e )
195 {
196 req.exception = e;
197 synchronized( req )
198 {
199 req.done = true;
200 req.notify();
201 }
202 }
203 }
204 }
205
206 private void processSessions( Set keys )
207 {
208 Iterator it = keys.iterator();
209
210 while( it.hasNext() )
211 {
212 SelectionKey key = ( SelectionKey ) it.next();
213
214 if( !key.isConnectable() )
215 continue;
216
217 SocketChannel ch = ( SocketChannel ) key.channel();
218 ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
219
220 try
221 {
222 ch.finishConnect();
223 SocketSession session = newSession( ch, entry.handler );
224 entry.session = session;
225 }
226 catch( Throwable e )
227 {
228 entry.exception = e;
229 }
230 finally
231 {
232 key.cancel();
233 if( entry.session == null )
234 {
235 try
236 {
237 ch.close();
238 }
239 catch( IOException e )
240 {
241 exceptionMonitor.exceptionCaught( this, e );
242 }
243 }
244
245 synchronized( entry )
246 {
247 entry.done = true;
248 entry.notify();
249 }
250 }
251 }
252
253 keys.clear();
254 }
255
256 private void processTimedOutSessions( Set keys )
257 {
258 long currentTime = System.currentTimeMillis();
259 Iterator it = keys.iterator();
260
261 while( it.hasNext() )
262 {
263 SelectionKey key = ( SelectionKey ) it.next();
264
265 if( !key.isValid() )
266 continue;
267
268 ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
269
270 if( currentTime >= entry.deadline )
271 {
272 entry.exception = new ConnectException();
273 entry.done = true;
274
275 synchronized( entry )
276 {
277 entry.notify();
278 }
279
280 key.cancel();
281 }
282 }
283 }
284
285 private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException
286 {
287 SocketSession session = new SocketSession( filters, ch, handler );
288 try
289 {
290 handler.sessionCreated( session );
291 }
292 catch( Throwable e )
293 {
294 ExceptionUtil.throwException( e );
295 }
296 SocketIoProcessor.getInstance().addSession( session );
297 return session;
298 }
299
300 private class Worker extends Thread
301 {
302 public Worker()
303 {
304 super( "SocketConnector-" + id );
305 }
306
307 public void run()
308 {
309 for( ;; )
310 {
311 try
312 {
313 int nKeys = selector.select( 1000 );
314
315 registerNew();
316
317 if( nKeys > 0 )
318 {
319 processSessions( selector.selectedKeys() );
320 }
321
322 processTimedOutSessions( selector.keys() );
323
324 if( selector.keys().isEmpty() )
325 {
326 synchronized( SocketConnector.this )
327 {
328 if( selector.keys().isEmpty() &&
329 connectQueue.isEmpty() )
330 {
331 worker = null;
332 try
333 {
334 selector.close();
335 }
336 catch( IOException e )
337 {
338 exceptionMonitor.exceptionCaught( SocketConnector.this, e );
339 }
340 finally
341 {
342 selector = null;
343 }
344 break;
345 }
346 }
347 }
348 }
349 catch( IOException e )
350 {
351 exceptionMonitor.exceptionCaught( SocketConnector.this, e );
352
353 try
354 {
355 Thread.sleep( 1000 );
356 }
357 catch( InterruptedException e1 )
358 {
359 }
360 }
361 }
362 }
363 }
364
365 private static class ConnectionRequest
366 {
367 private final SocketChannel channel;
368
369 private final long deadline;
370
371 private final IoHandler handler;
372
373 private SocketSession session;
374
375 private boolean done;
376
377 private Throwable exception;
378
379 private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
380 {
381 this.channel = channel;
382 this.deadline = System.currentTimeMillis() + timeout * 1000L;
383 this.handler = handler;
384 }
385 }
386
387 public IoFilterChain getFilterChain()
388 {
389 return filters;
390 }
391 }