1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.socket;
20
21 import java.io.IOException;
22 import java.net.ConnectException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.SocketChannel;
28 import java.util.Iterator;
29 import java.util.Set;
30
31 import org.apache.mina.common.BaseSessionManager;
32 import org.apache.mina.io.IoConnector;
33 import org.apache.mina.io.IoFilterChain;
34 import org.apache.mina.io.IoHandler;
35 import org.apache.mina.io.IoSession;
36 import org.apache.mina.io.IoSessionManagerFilterChain;
37 import org.apache.mina.util.ExceptionUtil;
38 import org.apache.mina.util.Queue;
39
40 /***
41 * {@link IoConnector} for socket transport (TCP/IP).
42 *
43 * @author Trustin Lee (trustin@apache.org)
44 * @version $Rev: 165586 $, $Date: 2005-05-02 15:27:27 +0900 (?, 02 5? 2005) $
45 */
46 public class SocketConnector extends BaseSessionManager implements IoConnector
47 {
48 private static volatile int nextId = 0;
49
50 private final int id = nextId++;
51
52 private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
53
54 private final Selector selector;
55
56 private final Queue connectQueue = new Queue();
57
58 private Worker worker;
59
60 /***
61 * Creates a new instance.
62 *
63 * @throws IOException
64 */
65 public SocketConnector() throws IOException
66 {
67 selector = Selector.open();
68 }
69
70 public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
71 {
72 return connect( address, null, Integer.MAX_VALUE, handler);
73 }
74
75 public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException
76 {
77 return connect( address, localAddress, Integer.MAX_VALUE, handler);
78 }
79
80 public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
81 {
82 return connect( address, null, timeout, handler);
83 }
84
85 public IoSession connect( SocketAddress address, SocketAddress localAddress,
86 int timeout, IoHandler handler ) throws IOException
87 {
88 if( address == null )
89 throw new NullPointerException( "address" );
90 if( handler == null )
91 throw new NullPointerException( "handler" );
92
93 if( timeout <= 0 )
94 throw new IllegalArgumentException( "Illegal timeout: " + timeout );
95
96 if( ! ( address instanceof InetSocketAddress ) )
97 throw new IllegalArgumentException( "Unexpected address type: "
98 + address.getClass() );
99
100 if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
101 throw new IllegalArgumentException( "Unexpected local address type: "
102 + localAddress.getClass() );
103
104 SocketChannel ch = SocketChannel.open();
105 boolean success = false;
106 try
107 {
108 ch.socket().setReuseAddress( true );
109 if( localAddress != null )
110 {
111 ch.socket().bind( localAddress );
112 }
113
114 ch.configureBlocking( false );
115
116 if( ch.connect( address ) )
117 {
118 SocketSession session = newSession( ch, handler );
119 success = true;
120 return session;
121 }
122
123 success = true;
124 }
125 finally
126 {
127 if( !success )
128 {
129 ch.close();
130 }
131 }
132
133 ConnectionRequest request = new ConnectionRequest( ch, timeout, handler );
134 synchronized( this )
135 {
136 synchronized( connectQueue )
137 {
138 connectQueue.push( request );
139 }
140 startupWorker();
141 }
142 selector.wakeup();
143
144 synchronized( request )
145 {
146 while( !request.done )
147 {
148 try
149 {
150 request.wait();
151 }
152 catch( InterruptedException e )
153 {
154 }
155 }
156 }
157
158 if( request.exception != null )
159 {
160 ExceptionUtil.throwException( request.exception );
161 }
162
163 return request.session;
164 }
165
166 private synchronized void startupWorker()
167 {
168 if( worker == null )
169 {
170 worker = new Worker();
171 worker.start();
172 }
173 }
174
175 private void registerNew()
176 {
177 if( connectQueue.isEmpty() )
178 return;
179
180 for( ;; )
181 {
182 ConnectionRequest req;
183 synchronized( connectQueue )
184 {
185 req = ( ConnectionRequest ) connectQueue.pop();
186 }
187
188 if( req == null )
189 break;
190
191 SocketChannel ch = req.channel;
192 try
193 {
194 ch.register( selector, SelectionKey.OP_CONNECT, req );
195 }
196 catch( IOException e )
197 {
198 req.exception = e;
199 synchronized( req )
200 {
201 req.done = true;
202 req.notify();
203 }
204 }
205 }
206 }
207
208 private void processSessions( Set keys )
209 {
210 Iterator it = keys.iterator();
211
212 while( it.hasNext() )
213 {
214 SelectionKey key = ( SelectionKey ) it.next();
215
216 if( !key.isConnectable() )
217 continue;
218
219 SocketChannel ch = ( SocketChannel ) key.channel();
220 ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
221
222 try
223 {
224 ch.finishConnect();
225 SocketSession session = newSession( ch, entry.handler );
226 entry.session = session;
227 }
228 catch( Throwable e )
229 {
230 entry.exception = e;
231 }
232 finally
233 {
234 key.cancel();
235 if( entry.session == null )
236 {
237 try
238 {
239 ch.close();
240 }
241 catch( IOException e )
242 {
243 exceptionMonitor.exceptionCaught( this, e );
244 }
245 }
246
247 synchronized( entry )
248 {
249 entry.done = true;
250 entry.notify();
251 }
252 }
253 }
254
255 keys.clear();
256 }
257
258 private void processTimedOutSessions( Set keys )
259 {
260 long currentTime = System.currentTimeMillis();
261 Iterator it = keys.iterator();
262
263 while( it.hasNext() )
264 {
265 SelectionKey key = ( SelectionKey ) it.next();
266
267 if( !key.isValid() )
268 continue;
269
270 ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
271
272 if( currentTime >= entry.deadline )
273 {
274 entry.exception = new ConnectException();
275 entry.done = true;
276
277 synchronized( entry )
278 {
279 entry.notify();
280 }
281
282 key.cancel();
283 }
284 }
285 }
286
287 private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException
288 {
289 SocketSession session = new SocketSession( filters, ch, handler );
290 try
291 {
292 handler.sessionCreated( session );
293 }
294 catch( Throwable e )
295 {
296 ExceptionUtil.throwException( e );
297 }
298 SocketIoProcessor.getInstance().addSession( session );
299 return session;
300 }
301
302 private class Worker extends Thread
303 {
304 public Worker()
305 {
306 super( "SocketConnector-" + id );
307 }
308
309 public void run()
310 {
311 for( ;; )
312 {
313 try
314 {
315 int nKeys = selector.select( 1000 );
316
317 registerNew();
318
319 if( nKeys > 0 )
320 {
321 processSessions( selector.selectedKeys() );
322 }
323
324 processTimedOutSessions( selector.keys() );
325
326 if( selector.keys().isEmpty() )
327 {
328 synchronized( SocketConnector.this )
329 {
330 if( selector.keys().isEmpty() &&
331 connectQueue.isEmpty() )
332 {
333 worker = null;
334 break;
335 }
336 }
337 }
338 }
339 catch( IOException e )
340 {
341 exceptionMonitor.exceptionCaught( SocketConnector.this, e );
342
343 try
344 {
345 Thread.sleep( 1000 );
346 }
347 catch( InterruptedException e1 )
348 {
349 }
350 }
351 }
352 }
353 }
354
355 private static class ConnectionRequest
356 {
357 private final SocketChannel channel;
358
359 private final long deadline;
360
361 private final IoHandler handler;
362
363 private SocketSession session;
364
365 private boolean done;
366
367 private Throwable exception;
368
369 private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
370 {
371 this.channel = channel;
372 this.deadline = System.currentTimeMillis() + timeout * 1000L;
373 this.handler = handler;
374 }
375 }
376
377 public IoFilterChain getFilterChain()
378 {
379 return filters;
380 }
381 }