1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.io.socket;
20
21 import java.io.IOException;
22 import java.nio.channels.SelectionKey;
23 import java.nio.channels.Selector;
24 import java.nio.channels.SocketChannel;
25 import java.util.Iterator;
26 import java.util.Set;
27
28 import org.apache.mina.common.ByteBuffer;
29 import org.apache.mina.common.IdleStatus;
30 import org.apache.mina.common.SessionConfig;
31 import org.apache.mina.io.WriteTimeoutException;
32 import org.apache.mina.util.Queue;
33
34 /***
35 * Performs all I/O operations for sockets which is connected or bound.
36 * This class is used by MINA internally.
37 *
38 * @author Trustin Lee (trustin@apache.org)
39 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $,
40 */
41 class SocketIoProcessor
42 {
43 private static final SocketIoProcessor instance;
44
45 static
46 {
47 SocketIoProcessor tmp;
48
49 try
50 {
51 tmp = new SocketIoProcessor();
52 }
53 catch( IOException e )
54 {
55 InternalError error = new InternalError(
56 "Failed to open selector." );
57 error.initCause( e );
58 throw error;
59 }
60
61 instance = tmp;
62 }
63
64 private final Selector selector;
65
66 private final Queue newSessions = new Queue();
67
68 private final Queue removingSessions = new Queue();
69
70 private final Queue flushingSessions = new Queue();
71
72 private final Queue readableSessions = new Queue();
73
74 private Worker worker;
75
76 private long lastIdleCheckTime = System.currentTimeMillis();
77
78 private SocketIoProcessor() throws IOException
79 {
80 selector = Selector.open();
81 }
82
83 static SocketIoProcessor getInstance()
84 {
85 return instance;
86 }
87
88 void addSession( SocketSession session )
89 {
90 synchronized( this )
91 {
92 synchronized( newSessions )
93 {
94 newSessions.push( session );
95 }
96 startupWorker();
97 }
98
99 selector.wakeup();
100 }
101
102 void removeSession( SocketSession session )
103 {
104 scheduleRemove( session );
105 startupWorker();
106 selector.wakeup();
107 }
108
109 private synchronized void startupWorker()
110 {
111 if( worker == null )
112 {
113 worker = new Worker();
114 worker.start();
115 }
116 }
117
118 void flushSession( SocketSession session )
119 {
120 scheduleFlush( session );
121 selector.wakeup();
122 }
123
124 void addReadableSession( SocketSession session )
125 {
126 synchronized( readableSessions )
127 {
128 readableSessions.push( session );
129 }
130 selector.wakeup();
131 }
132
133 private void addSessions()
134 {
135 if( newSessions.isEmpty() )
136 return;
137
138 SocketSession session;
139
140 for( ;; )
141 {
142 synchronized( newSessions )
143 {
144 session = ( SocketSession ) newSessions.pop();
145 }
146
147 if( session == null )
148 break;
149
150 SocketChannel ch = session.getChannel();
151 boolean registered;
152
153 try
154 {
155 ch.configureBlocking( false );
156 session.setSelectionKey( ch.register( selector,
157 SelectionKey.OP_READ,
158 session ) );
159 registered = true;
160 }
161 catch( IOException e )
162 {
163 registered = false;
164 session.getManagerFilterChain().exceptionCaught( session, e );
165 }
166
167 if( registered )
168 {
169 session.getManagerFilterChain().sessionOpened( session );
170 }
171 }
172 }
173
174 private void removeSessions()
175 {
176 if( removingSessions.isEmpty() )
177 return;
178
179 for( ;; )
180 {
181 SocketSession session;
182
183 synchronized( removingSessions )
184 {
185 session = ( SocketSession ) removingSessions.pop();
186 }
187
188 if( session == null )
189 break;
190
191 SocketChannel ch = session.getChannel();
192 SelectionKey key = session.getSelectionKey();
193
194
195 if( key == null )
196 {
197 scheduleRemove( session );
198 break;
199 }
200
201
202 if( !key.isValid() )
203 {
204 continue;
205 }
206
207 try
208 {
209 key.cancel();
210 ch.close();
211 }
212 catch( IOException e )
213 {
214 session.getManagerFilterChain().exceptionCaught( session, e );
215 }
216 finally
217 {
218 releaseWriteBuffers( session );
219
220 session.getManagerFilterChain().sessionClosed( session );
221 session.notifyClose();
222 }
223 }
224 }
225
226 private void processSessions( Set selectedKeys )
227 {
228 Iterator it = selectedKeys.iterator();
229
230 while( it.hasNext() )
231 {
232 SelectionKey key = ( SelectionKey ) it.next();
233 SocketSession session = ( SocketSession ) key.attachment();
234
235 if( key.isReadable() )
236 {
237 read( session );
238 }
239
240 if( key.isWritable() )
241 {
242 scheduleFlush( session );
243 }
244 }
245
246 selectedKeys.clear();
247 }
248
249 private void read( SocketSession session )
250 {
251 ByteBuffer buf = ByteBuffer.allocate(
252 (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() );
253 SocketChannel ch = session.getChannel();
254
255 try
256 {
257 int readBytes = 0;
258 int ret;
259
260 buf.clear();
261
262 try
263 {
264 while( ( ret = ch.read( buf.buf() ) ) > 0 )
265 {
266 readBytes += ret;
267 }
268 }
269 finally
270 {
271 buf.flip();
272 }
273
274 session.increaseReadBytes( readBytes );
275 session.setIdle( IdleStatus.BOTH_IDLE, false );
276 session.setIdle( IdleStatus.READER_IDLE, false );
277
278 if( readBytes > 0 )
279 {
280 ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
281 newBuf.put( buf );
282 newBuf.flip();
283 session.getManagerFilterChain().dataRead( session, newBuf );
284 }
285 if( ret < 0 )
286 {
287 scheduleRemove( session );
288 }
289 }
290 catch( Throwable e )
291 {
292 if( e instanceof IOException )
293 scheduleRemove( session );
294 session.getManagerFilterChain().exceptionCaught( session, e );
295 }
296 finally
297 {
298 buf.release();
299 }
300 }
301
302 private void scheduleRemove( SocketSession session )
303 {
304 synchronized( removingSessions )
305 {
306 removingSessions.push( session );
307 }
308 }
309
310 private void scheduleFlush( SocketSession session )
311 {
312 synchronized( flushingSessions )
313 {
314 flushingSessions.push( session );
315 }
316 }
317
318 private void notifyIdleSessions()
319 {
320 Set keys = selector.keys();
321 Iterator it;
322 SocketSession session;
323
324
325 long currentTime = System.currentTimeMillis();
326
327 if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
328 {
329 lastIdleCheckTime = currentTime;
330 it = keys.iterator();
331
332 while( it.hasNext() )
333 {
334 SelectionKey key = ( SelectionKey ) it.next();
335 session = ( SocketSession ) key.attachment();
336
337 notifyIdleSession( session, currentTime );
338 }
339 }
340 }
341
342 private void notifyIdleSession( SocketSession session, long currentTime )
343 {
344 SessionConfig config = session.getConfig();
345
346 notifyIdleSession0( session, currentTime, config
347 .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
348 IdleStatus.BOTH_IDLE, session.getLastIoTime() );
349 notifyIdleSession0( session, currentTime, config
350 .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
351 IdleStatus.READER_IDLE, session.getLastReadTime() );
352 notifyIdleSession0( session, currentTime, config
353 .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
354 IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
355
356 notifyWriteTimeoutSession( session, currentTime, config
357 .getWriteTimeoutInMillis(), session.getLastWriteTime() );
358 }
359
360 private void notifyIdleSession0( SocketSession session, long currentTime,
361 long idleTime, IdleStatus status,
362 long lastIoTime )
363 {
364 if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
365 && ( currentTime - lastIoTime ) >= idleTime )
366 {
367 session.setIdle( status, true );
368 session.getManagerFilterChain().sessionIdle( session, status );
369 }
370 }
371
372 private void notifyWriteTimeoutSession( SocketSession session,
373 long currentTime,
374 long writeTimeout, long lastIoTime )
375 {
376 if( writeTimeout > 0
377 && ( currentTime - lastIoTime ) >= writeTimeout
378 && session.getSelectionKey() != null
379 && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
380 {
381 session
382 .getManagerFilterChain()
383 .exceptionCaught( session, new WriteTimeoutException() );
384 }
385 }
386
387 private void flushSessions()
388 {
389 if( flushingSessions.size() == 0 )
390 return;
391
392 for( ;; )
393 {
394 SocketSession session;
395
396 synchronized( flushingSessions )
397 {
398 session = ( SocketSession ) flushingSessions.pop();
399 }
400
401 if( session == null )
402 break;
403
404 if( !session.isConnected() )
405 {
406 releaseWriteBuffers( session );
407 continue;
408 }
409
410
411
412 if( session.getSelectionKey() == null )
413 {
414
415 scheduleFlush( session );
416 break;
417 }
418 else
419 {
420 try
421 {
422 flush( session );
423 }
424 catch( IOException e )
425 {
426 scheduleRemove( session );
427 session.getManagerFilterChain().exceptionCaught( session, e );
428 }
429 }
430 }
431 }
432
433 private void releaseWriteBuffers( SocketSession session )
434 {
435 Queue writeBufferQueue = session.getWriteBufferQueue();
436 session.getWriteMarkerQueue().clear();
437 ByteBuffer buf;
438
439 while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
440 {
441 try
442 {
443 buf.release();
444 }
445 catch( IllegalStateException e )
446 {
447 session.getManagerFilterChain().exceptionCaught( session, e );
448 }
449 }
450 }
451
452 private void flush( SocketSession session ) throws IOException
453 {
454 SocketChannel ch = session.getChannel();
455
456 Queue writeBufferQueue = session.getWriteBufferQueue();
457 Queue writeMarkerQueue = session.getWriteMarkerQueue();
458
459 ByteBuffer buf;
460 Object marker;
461 for( ;; )
462 {
463 synchronized( writeBufferQueue )
464 {
465 buf = ( ByteBuffer ) writeBufferQueue.first();
466 marker = writeMarkerQueue.first();
467 }
468
469 if( buf == null )
470 break;
471
472 if( buf.remaining() == 0 )
473 {
474 synchronized( writeBufferQueue )
475 {
476 writeBufferQueue.pop();
477 writeMarkerQueue.pop();
478 }
479 try
480 {
481 buf.release();
482 }
483 catch( IllegalStateException e )
484 {
485 session.getManagerFilterChain().exceptionCaught( session, e );
486 }
487
488 session.increaseWrittenWriteRequests();
489 session.getManagerFilterChain().dataWritten( session, marker );
490 continue;
491 }
492
493 int writtenBytes = 0;
494 try
495 {
496 writtenBytes = ch.write( buf.buf() );
497 }
498 finally
499 {
500 if( writtenBytes > 0 )
501 {
502 session.increaseWrittenBytes( writtenBytes );
503 session.setIdle( IdleStatus.BOTH_IDLE, false );
504 session.setIdle( IdleStatus.WRITER_IDLE, false );
505 }
506
507 SelectionKey key = session.getSelectionKey();
508 if( buf.hasRemaining() )
509 {
510
511 key
512 .interestOps( key.interestOps()
513 | SelectionKey.OP_WRITE );
514 break;
515 }
516 else
517 {
518 key.interestOps( key.interestOps()
519 & ( ~SelectionKey.OP_WRITE ) );
520 }
521 }
522 }
523 }
524
525 private class Worker extends Thread
526 {
527 public Worker()
528 {
529 super( "SocketIoProcessor" );
530 }
531
532 public void run()
533 {
534 for( ;; )
535 {
536 try
537 {
538 int nKeys = selector.select( 1000 );
539 addSessions();
540
541 if( nKeys > 0 )
542 {
543 processSessions( selector.selectedKeys() );
544 }
545
546 flushSessions();
547 removeSessions();
548 notifyIdleSessions();
549
550 if( selector.keys().isEmpty() )
551 {
552 synchronized( SocketIoProcessor.this )
553 {
554 if( selector.keys().isEmpty() &&
555 newSessions.isEmpty() )
556 {
557 worker = null;
558 break;
559 }
560 }
561 }
562 }
563 catch( IOException e )
564 {
565 e.printStackTrace();
566
567 try
568 {
569 Thread.sleep( 1000 );
570 }
571 catch( InterruptedException e1 )
572 {
573 }
574 }
575 }
576 }
577 }
578 }