1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.datagram;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Set;
29
30 import org.apache.mina.common.ByteBuffer;
31 import org.apache.mina.io.IoConnector;
32 import org.apache.mina.io.IoFilterChain;
33 import org.apache.mina.io.IoHandler;
34 import org.apache.mina.io.IoSession;
35 import org.apache.mina.io.IoSessionManagerFilterChain;
36 import org.apache.mina.util.ExceptionUtil;
37 import org.apache.mina.util.Queue;
38
39 /***
40 * {@link IoConnector} for datagram transport (UDP/IP).
41 *
42 * @author Trustin Lee (trustin@apache.org)
43 * @version $Rev: 165586 $, $Date: 2005-05-02 15:27:27 +0900 (?, 02 5? 2005) $
44 */
45 public class DatagramConnector extends DatagramSessionManager implements IoConnector
46 {
47 private static volatile int nextId = 0;
48
49 private final IoSessionManagerFilterChain filters =
50 new DatagramSessionManagerFilterChain( this );
51
52 private final int id = nextId ++ ;
53
54 private final Selector selector;
55
56 private final Queue registerQueue = new Queue();
57
58 private final Queue cancelQueue = new Queue();
59
60 private final Queue flushingSessions = new Queue();
61
62 private Worker worker;
63
64 /***
65 * Creates a new instance.
66 *
67 * @throws IOException if failed to open a selector
68 */
69 public DatagramConnector() throws IOException
70 {
71 selector = Selector.open();
72 }
73
74 public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
75 {
76 return connect( address, null, handler);
77 }
78
79 public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
80 {
81 return connect( address, null, handler );
82 }
83
84 public IoSession connect( SocketAddress address, SocketAddress localAddress, int timeout, IoHandler handler ) throws IOException
85 {
86 return connect( address, localAddress, handler );
87 }
88
89 public IoSession connect( SocketAddress address, SocketAddress localAddress,
90 IoHandler handler ) throws IOException
91 {
92 if( address == null )
93 throw new NullPointerException( "address" );
94 if( handler == null )
95 throw new NullPointerException( "handler" );
96
97 if( !( address instanceof InetSocketAddress ) )
98 throw new IllegalArgumentException( "Unexpected address type: "
99 + address.getClass() );
100
101 if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
102 {
103 throw new IllegalArgumentException( "Unexpected local address type: "
104 + localAddress.getClass() );
105 }
106
107 DatagramChannel ch = DatagramChannel.open();
108 boolean initialized = false;
109 try
110 {
111 ch.socket().setReuseAddress( true );
112 if( localAddress != null )
113 {
114 ch.socket().bind( localAddress );
115 }
116 ch.connect( address );
117 ch.configureBlocking( false );
118 initialized = true;
119 }
120 finally
121 {
122 if( !initialized )
123 {
124 ch.close();
125 }
126 }
127
128 RegistrationRequest request = new RegistrationRequest( ch, handler );
129 synchronized( this )
130 {
131 synchronized( registerQueue )
132 {
133 registerQueue.push( request );
134 }
135 startupWorker();
136 }
137
138 selector.wakeup();
139
140 synchronized( request )
141 {
142 while( !request.done )
143 {
144 try
145 {
146 request.wait();
147 }
148 catch( InterruptedException e )
149 {
150 }
151 }
152 }
153
154 if( request.exception != null )
155 {
156 ExceptionUtil.throwException( request.exception );
157 }
158
159 return request.session;
160 }
161
162 private synchronized void startupWorker()
163 {
164 if( worker == null )
165 {
166 worker = new Worker();
167 worker.start();
168 }
169 }
170
171 void closeSession( DatagramSession session )
172 {
173 synchronized( this )
174 {
175 SelectionKey key = session.getSelectionKey();
176 synchronized( cancelQueue )
177 {
178 cancelQueue.push( key );
179 }
180 startupWorker();
181 }
182
183 selector.wakeup();
184 }
185
186 void flushSession( DatagramSession session )
187 {
188 scheduleFlush( session );
189 selector.wakeup();
190 }
191
192 private void scheduleFlush( DatagramSession session )
193 {
194 synchronized( flushingSessions )
195 {
196 flushingSessions.push( session );
197 }
198 }
199
200 private class Worker extends Thread
201 {
202 public Worker()
203 {
204 super( "DatagramAcceptor-" + id );
205 }
206
207 public void run()
208 {
209 for( ;; )
210 {
211 try
212 {
213 int nKeys = selector.select();
214
215 registerNew();
216
217 if( nKeys > 0 )
218 {
219 processReadySessions( selector.selectedKeys() );
220 }
221
222 flushSessions();
223 cancelKeys();
224
225 if( selector.keys().isEmpty() )
226 {
227 synchronized( DatagramConnector.this )
228 {
229 if( selector.keys().isEmpty() &&
230 registerQueue.isEmpty() &&
231 cancelQueue.isEmpty() )
232 {
233 worker = null;
234 break;
235 }
236 }
237 }
238 }
239 catch( IOException e )
240 {
241 exceptionMonitor.exceptionCaught( DatagramConnector.this,
242 e );
243
244 try
245 {
246 Thread.sleep( 1000 );
247 }
248 catch( InterruptedException e1 )
249 {
250 }
251 }
252 }
253 }
254 }
255
256 private void processReadySessions( Set keys )
257 {
258 Iterator it = keys.iterator();
259 while( it.hasNext() )
260 {
261 SelectionKey key = ( SelectionKey ) it.next();
262 it.remove();
263
264 DatagramSession session = ( DatagramSession ) key.attachment();
265
266 if( key.isReadable() )
267 {
268 readSession( session );
269 }
270
271 if( key.isWritable() )
272 {
273 scheduleFlush( session );
274 }
275 }
276 }
277
278 private void readSession( DatagramSession session )
279 {
280
281 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
282 try
283 {
284 int readBytes = session.getChannel().read( readBuf.buf() );
285 if( readBytes > 0 )
286 {
287 readBuf.flip();
288 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
289 newBuf.put( readBuf );
290 newBuf.flip();
291
292 session.increaseReadBytes( readBytes );
293 filters.dataRead( session, newBuf );
294 }
295 }
296 catch( IOException e )
297 {
298 filters.exceptionCaught( session, e );
299 }
300 finally
301 {
302 readBuf.release();
303 }
304 }
305
306 private void flushSessions()
307 {
308 if( flushingSessions.size() == 0 )
309 return;
310
311 for( ;; )
312 {
313 DatagramSession session;
314
315 synchronized( flushingSessions )
316 {
317 session = ( DatagramSession ) flushingSessions.pop();
318 }
319
320 if( session == null )
321 break;
322
323 try
324 {
325 flush( session );
326 }
327 catch( IOException e )
328 {
329 session.getManagerFilterChain().exceptionCaught( session, e );
330 }
331 }
332 }
333
334 private void flush( DatagramSession session ) throws IOException
335 {
336 DatagramChannel ch = session.getChannel();
337
338 Queue writeBufferQueue = session.getWriteBufferQueue();
339 Queue writeMarkerQueue = session.getWriteMarkerQueue();
340
341 ByteBuffer buf;
342 Object marker;
343 for( ;; )
344 {
345 synchronized( writeBufferQueue )
346 {
347 buf = ( ByteBuffer ) writeBufferQueue.first();
348 marker = writeMarkerQueue.first();
349 }
350
351 if( buf == null )
352 break;
353
354 if( buf.remaining() == 0 )
355 {
356
357 synchronized( writeBufferQueue )
358 {
359 writeBufferQueue.pop();
360 writeMarkerQueue.pop();
361 }
362
363 try
364 {
365 buf.release();
366 }
367 catch( IllegalStateException e )
368 {
369 session.getManagerFilterChain().exceptionCaught( session, e );
370 }
371
372 session.getManagerFilterChain().dataWritten( session, marker );
373 continue;
374 }
375
376 int writtenBytes = ch.write( buf.buf() );
377
378 SelectionKey key = session.getSelectionKey();
379 if( writtenBytes == 0 )
380 {
381
382 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
383 }
384 else if( writtenBytes > 0 )
385 {
386 key.interestOps( key.interestOps()
387 & ( ~SelectionKey.OP_WRITE ) );
388
389
390 synchronized( writeBufferQueue )
391 {
392 writeBufferQueue.pop();
393 writeMarkerQueue.pop();
394 }
395
396 session.increaseWrittenBytes( writtenBytes );
397 session.getManagerFilterChain().dataWritten( session, marker );
398 }
399 }
400 }
401
402 private void registerNew()
403 {
404 if( registerQueue.isEmpty() )
405 return;
406
407 for( ;; )
408 {
409 RegistrationRequest req;
410 synchronized( registerQueue )
411 {
412 req = ( RegistrationRequest ) registerQueue.pop();
413 }
414
415 if( req == null )
416 break;
417
418 DatagramSession session = new DatagramSession(
419 filters, req.channel, req.handler );
420
421 try
422 {
423 req.handler.sessionCreated( session );
424
425 SelectionKey key = req.channel.register( selector,
426 SelectionKey.OP_READ, session );
427
428 session.setSelectionKey( key );
429 }
430 catch( Throwable t )
431 {
432 req.exception = t;
433 }
434 finally
435 {
436 synchronized( req )
437 {
438 req.done = true;
439 req.session = session;
440 req.notify();
441 }
442
443 if( req.exception != null )
444 {
445 try
446 {
447 req.channel.close();
448 }
449 catch (IOException e)
450 {
451 exceptionMonitor.exceptionCaught( this, e );
452 }
453 }
454 }
455 }
456 }
457
458 private void cancelKeys()
459 {
460 if( cancelQueue.isEmpty() )
461 return;
462
463 for( ;; )
464 {
465 SelectionKey key;
466 synchronized( cancelQueue )
467 {
468 key = ( SelectionKey ) cancelQueue.pop();
469 }
470
471 if( key == null )
472 break;
473 else
474 {
475 DatagramChannel ch = ( DatagramChannel ) key.channel();
476 try
477 {
478 ch.close();
479 }
480 catch( IOException e )
481 {
482 exceptionMonitor.exceptionCaught( this, e );
483 }
484 key.cancel();
485 selector.wakeup();
486 }
487 }
488 }
489
490 public IoFilterChain getFilterChain()
491 {
492 return filters;
493 }
494
495 private static class RegistrationRequest
496 {
497 private final DatagramChannel channel;
498
499 private final IoHandler handler;
500
501 private boolean done;
502
503 private DatagramSession session;
504
505 private Throwable exception;
506
507 private RegistrationRequest( DatagramChannel channel,
508 IoHandler handler )
509 {
510 this.channel = channel;
511 this.handler = handler;
512 }
513 }
514 }