1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.datagram;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.SocketAddress;
24 import java.nio.channels.DatagramChannel;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.util.Iterator;
28 import java.util.Set;
29
30 import org.apache.mina.common.ByteBuffer;
31 import org.apache.mina.io.IoConnector;
32 import org.apache.mina.io.IoFilterChain;
33 import org.apache.mina.io.IoHandler;
34 import org.apache.mina.io.IoSession;
35 import org.apache.mina.io.IoSessionManagerFilterChain;
36 import org.apache.mina.util.ExceptionUtil;
37 import org.apache.mina.util.Queue;
38
39 /***
40 * {@link IoConnector} for datagram transport (UDP/IP).
41 *
42 * @author Trustin Lee (trustin@apache.org)
43 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
44 */
45 public class DatagramConnector extends DatagramSessionManager implements IoConnector
46 {
47 private static volatile int nextId = 0;
48
49 private final IoSessionManagerFilterChain filters =
50 new DatagramSessionManagerFilterChain( this );
51
52 private final int id = nextId ++ ;
53
54 private Selector selector;
55
56 private final Queue registerQueue = new Queue();
57
58 private final Queue cancelQueue = new Queue();
59
60 private final Queue flushingSessions = new Queue();
61
62 private Worker worker;
63
64 /***
65 * Creates a new instance.
66 */
67 public DatagramConnector()
68 {
69 }
70
71 public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
72 {
73 return connect( address, null, handler);
74 }
75
76 public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
77 {
78 return connect( address, null, handler );
79 }
80
81 public IoSession connect( SocketAddress address, SocketAddress localAddress, int timeout, IoHandler handler ) throws IOException
82 {
83 return connect( address, localAddress, handler );
84 }
85
86 public IoSession connect( SocketAddress address, SocketAddress localAddress,
87 IoHandler handler ) throws IOException
88 {
89 if( address == null )
90 throw new NullPointerException( "address" );
91 if( handler == null )
92 throw new NullPointerException( "handler" );
93
94 if( !( address instanceof InetSocketAddress ) )
95 throw new IllegalArgumentException( "Unexpected address type: "
96 + address.getClass() );
97
98 if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
99 {
100 throw new IllegalArgumentException( "Unexpected local address type: "
101 + localAddress.getClass() );
102 }
103
104 DatagramChannel ch = DatagramChannel.open();
105 boolean initialized = false;
106 try
107 {
108 ch.socket().setReuseAddress( true );
109 if( localAddress != null )
110 {
111 ch.socket().bind( localAddress );
112 }
113 ch.connect( address );
114 ch.configureBlocking( false );
115 initialized = true;
116 }
117 finally
118 {
119 if( !initialized )
120 {
121 ch.close();
122 }
123 }
124
125 RegistrationRequest request = new RegistrationRequest( ch, handler );
126 synchronized( this )
127 {
128 synchronized( registerQueue )
129 {
130 registerQueue.push( request );
131 }
132 startupWorker();
133 }
134
135 selector.wakeup();
136
137 synchronized( request )
138 {
139 while( !request.done )
140 {
141 try
142 {
143 request.wait();
144 }
145 catch( InterruptedException e )
146 {
147 }
148 }
149 }
150
151 if( request.exception != null )
152 {
153 ExceptionUtil.throwException( request.exception );
154 }
155
156 return request.session;
157 }
158
159 private synchronized void startupWorker() throws IOException
160 {
161 if( worker == null )
162 {
163 selector = Selector.open();
164 worker = new Worker();
165 worker.start();
166 }
167 }
168
169 void closeSession( DatagramSession session )
170 {
171 synchronized( this )
172 {
173 try
174 {
175 startupWorker();
176 }
177 catch( IOException e )
178 {
179
180
181
182
183
184 return;
185 }
186
187 synchronized( cancelQueue )
188 {
189 cancelQueue.push( session );
190 }
191 }
192
193 selector.wakeup();
194 }
195
196 void flushSession( DatagramSession session )
197 {
198 scheduleFlush( session );
199 selector.wakeup();
200 }
201
202 private void scheduleFlush( DatagramSession session )
203 {
204 synchronized( flushingSessions )
205 {
206 flushingSessions.push( session );
207 }
208 }
209
210 private class Worker extends Thread
211 {
212 public Worker()
213 {
214 super( "DatagramAcceptor-" + id );
215 }
216
217 public void run()
218 {
219 for( ;; )
220 {
221 try
222 {
223 int nKeys = selector.select();
224
225 registerNew();
226
227 if( nKeys > 0 )
228 {
229 processReadySessions( selector.selectedKeys() );
230 }
231
232 flushSessions();
233 cancelKeys();
234
235 if( selector.keys().isEmpty() )
236 {
237 synchronized( DatagramConnector.this )
238 {
239 if( selector.keys().isEmpty() &&
240 registerQueue.isEmpty() &&
241 cancelQueue.isEmpty() )
242 {
243 worker = null;
244 try
245 {
246 selector.close();
247 }
248 catch( IOException e )
249 {
250 exceptionMonitor.exceptionCaught( DatagramConnector.this, e );
251 }
252 finally
253 {
254 selector = null;
255 }
256 break;
257 }
258 }
259 }
260 }
261 catch( IOException e )
262 {
263 exceptionMonitor.exceptionCaught( DatagramConnector.this,
264 e );
265
266 try
267 {
268 Thread.sleep( 1000 );
269 }
270 catch( InterruptedException e1 )
271 {
272 }
273 }
274 }
275 }
276 }
277
278 private void processReadySessions( Set keys )
279 {
280 Iterator it = keys.iterator();
281 while( it.hasNext() )
282 {
283 SelectionKey key = ( SelectionKey ) it.next();
284 it.remove();
285
286 DatagramSession session = ( DatagramSession ) key.attachment();
287
288 if( key.isReadable() )
289 {
290 readSession( session );
291 }
292
293 if( key.isWritable() )
294 {
295 scheduleFlush( session );
296 }
297 }
298 }
299
300 private void readSession( DatagramSession session )
301 {
302
303 ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
304 try
305 {
306 int readBytes = session.getChannel().read( readBuf.buf() );
307 if( readBytes > 0 )
308 {
309 readBuf.flip();
310 ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
311 newBuf.put( readBuf );
312 newBuf.flip();
313
314 session.increaseReadBytes( readBytes );
315 filters.dataRead( session, newBuf );
316 }
317 }
318 catch( IOException e )
319 {
320 filters.exceptionCaught( session, e );
321 }
322 finally
323 {
324 readBuf.release();
325 }
326 }
327
328 private void flushSessions()
329 {
330 if( flushingSessions.size() == 0 )
331 return;
332
333 for( ;; )
334 {
335 DatagramSession session;
336
337 synchronized( flushingSessions )
338 {
339 session = ( DatagramSession ) flushingSessions.pop();
340 }
341
342 if( session == null )
343 break;
344
345 try
346 {
347 flush( session );
348 }
349 catch( IOException e )
350 {
351 session.getManagerFilterChain().exceptionCaught( session, e );
352 }
353 }
354 }
355
356 private void flush( DatagramSession session ) throws IOException
357 {
358 DatagramChannel ch = session.getChannel();
359
360 Queue writeBufferQueue = session.getWriteBufferQueue();
361 Queue writeMarkerQueue = session.getWriteMarkerQueue();
362
363 ByteBuffer buf;
364 Object marker;
365 for( ;; )
366 {
367 synchronized( writeBufferQueue )
368 {
369 buf = ( ByteBuffer ) writeBufferQueue.first();
370 marker = writeMarkerQueue.first();
371 }
372
373 if( buf == null )
374 break;
375
376 if( buf.remaining() == 0 )
377 {
378
379 synchronized( writeBufferQueue )
380 {
381 writeBufferQueue.pop();
382 writeMarkerQueue.pop();
383 }
384
385 try
386 {
387 buf.release();
388 }
389 catch( IllegalStateException e )
390 {
391 session.getManagerFilterChain().exceptionCaught( session, e );
392 }
393
394 session.increaseWrittenWriteRequests();
395 session.getManagerFilterChain().dataWritten( session, marker );
396 continue;
397 }
398
399 int writtenBytes = ch.write( buf.buf() );
400
401 SelectionKey key = session.getSelectionKey();
402 if( writtenBytes == 0 )
403 {
404
405 key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
406 }
407 else if( writtenBytes > 0 )
408 {
409 key.interestOps( key.interestOps()
410 & ( ~SelectionKey.OP_WRITE ) );
411
412
413 synchronized( writeBufferQueue )
414 {
415 writeBufferQueue.pop();
416 writeMarkerQueue.pop();
417 }
418
419 session.increaseWrittenBytes( writtenBytes );
420 session.increaseWrittenWriteRequests();
421 session.getManagerFilterChain().dataWritten( session, marker );
422 }
423 }
424 }
425
426 private void registerNew()
427 {
428 if( registerQueue.isEmpty() )
429 return;
430
431 for( ;; )
432 {
433 RegistrationRequest req;
434 synchronized( registerQueue )
435 {
436 req = ( RegistrationRequest ) registerQueue.pop();
437 }
438
439 if( req == null )
440 break;
441
442 DatagramSession session = new DatagramSession(
443 filters, req.channel, req.handler );
444
445 try
446 {
447 req.handler.sessionCreated( session );
448
449 SelectionKey key = req.channel.register( selector,
450 SelectionKey.OP_READ, session );
451
452 session.setSelectionKey( key );
453 }
454 catch( Throwable t )
455 {
456 req.exception = t;
457 }
458 finally
459 {
460 synchronized( req )
461 {
462 req.done = true;
463 req.session = session;
464 req.notify();
465 }
466
467 if( req.exception != null )
468 {
469 try
470 {
471 req.channel.close();
472 }
473 catch (IOException e)
474 {
475 exceptionMonitor.exceptionCaught( this, e );
476 }
477 }
478 }
479 }
480 }
481
482 private void cancelKeys()
483 {
484 if( cancelQueue.isEmpty() )
485 return;
486
487 for( ;; )
488 {
489 DatagramSession session;
490 synchronized( cancelQueue )
491 {
492 session = ( DatagramSession ) cancelQueue.pop();
493 }
494 if( session == null )
495 break;
496 else
497 {
498 SelectionKey key = session.getSelectionKey();
499 DatagramChannel ch = ( DatagramChannel ) key.channel();
500 try
501 {
502 ch.close();
503 }
504 catch( IOException e )
505 {
506 exceptionMonitor.exceptionCaught( this, e );
507 }
508 session.notifyClose();
509 key.cancel();
510 selector.wakeup();
511 }
512 }
513 }
514
515 public IoFilterChain getFilterChain()
516 {
517 return filters;
518 }
519
520 private static class RegistrationRequest
521 {
522 private final DatagramChannel channel;
523
524 private final IoHandler handler;
525
526 private boolean done;
527
528 private DatagramSession session;
529
530 private Throwable exception;
531
532 private RegistrationRequest( DatagramChannel channel,
533 IoHandler handler )
534 {
535 this.channel = channel;
536 this.handler = handler;
537 }
538 }
539 }