View Javadoc

1   /*
2    *   @(#) $Id: SocketIoProcessor.java 210062 2005-07-11 03:52:38Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.io.socket;
20  
21  import java.io.IOException;
22  import java.nio.channels.SelectionKey;
23  import java.nio.channels.Selector;
24  import java.nio.channels.SocketChannel;
25  import java.util.Iterator;
26  import java.util.Set;
27  
28  import org.apache.mina.common.ByteBuffer;
29  import org.apache.mina.common.IdleStatus;
30  import org.apache.mina.common.SessionConfig;
31  import org.apache.mina.io.WriteTimeoutException;
32  import org.apache.mina.util.Queue;
33  
34  /***
35   * Performs all I/O operations for sockets which is connected or bound.
36   * This class is used by MINA internally.
37   * 
38   * @author Trustin Lee (trustin@apache.org)
39   * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $,
40   */
41  class SocketIoProcessor
42  {
43      private static final SocketIoProcessor instance;
44  
45      static
46      {
47          SocketIoProcessor tmp;
48  
49          try
50          {
51              tmp = new SocketIoProcessor();
52          }
53          catch( IOException e )
54          {
55              InternalError error = new InternalError(
56                                                       "Failed to open selector." );
57              error.initCause( e );
58              throw error;
59          }
60  
61          instance = tmp;
62      }
63  
64      private final Selector selector;
65  
66      private final Queue newSessions = new Queue();
67  
68      private final Queue removingSessions = new Queue();
69  
70      private final Queue flushingSessions = new Queue();
71  
72      private final Queue readableSessions = new Queue();
73  
74      private Worker worker;
75  
76      private long lastIdleCheckTime = System.currentTimeMillis();
77  
78      private SocketIoProcessor() throws IOException
79      {
80          selector = Selector.open();
81      }
82  
83      static SocketIoProcessor getInstance()
84      {
85          return instance;
86      }
87  
88      void addSession( SocketSession session )
89      {
90          synchronized( this )
91          {
92              synchronized( newSessions )
93              {
94                  newSessions.push( session );
95              }
96              startupWorker();
97          }
98  
99          selector.wakeup();
100     }
101 
102     void removeSession( SocketSession session )
103     {
104         scheduleRemove( session );
105         startupWorker();
106         selector.wakeup();
107     }
108 
109     private synchronized void startupWorker()
110     {
111         if( worker == null )
112         {
113             worker = new Worker();
114             worker.start();
115         }
116     }
117 
118     void flushSession( SocketSession session )
119     {
120         scheduleFlush( session );
121         selector.wakeup();
122     }
123 
124     void addReadableSession( SocketSession session )
125     {
126         synchronized( readableSessions )
127         {
128             readableSessions.push( session );
129         }
130         selector.wakeup();
131     }
132 
133     private void addSessions()
134     {
135         if( newSessions.isEmpty() )
136             return;
137 
138         SocketSession session;
139 
140         for( ;; )
141         {
142             synchronized( newSessions )
143             {
144                 session = ( SocketSession ) newSessions.pop();
145             }
146 
147             if( session == null )
148                 break;
149 
150             SocketChannel ch = session.getChannel();
151             boolean registered;
152 
153             try
154             {
155                 ch.configureBlocking( false );
156                 session.setSelectionKey( ch.register( selector,
157                                                       SelectionKey.OP_READ,
158                                                       session ) );
159                 registered = true;
160             }
161             catch( IOException e )
162             {
163                 registered = false;
164                 session.getManagerFilterChain().exceptionCaught( session, e );
165             }
166 
167             if( registered )
168             {
169                 session.getManagerFilterChain().sessionOpened( session );
170             }
171         }
172     }
173 
174     private void removeSessions()
175     {
176         if( removingSessions.isEmpty() )
177             return;
178 
179         for( ;; )
180         {
181             SocketSession session;
182 
183             synchronized( removingSessions )
184             {
185                 session = ( SocketSession ) removingSessions.pop();
186             }
187 
188             if( session == null )
189                 break;
190 
191             SocketChannel ch = session.getChannel();
192             SelectionKey key = session.getSelectionKey();
193             // Retry later if session is not yet fully initialized.
194             // (In case that Session.close() is called before addSession() is processed)
195             if( key == null )
196             {
197                 scheduleRemove( session );
198                 break;
199             }
200 
201             // skip if channel is already closed
202             if( !key.isValid() )
203             {
204                 continue;
205             }
206 
207             try
208             {
209                 key.cancel();
210                 ch.close();
211             }
212             catch( IOException e )
213             {
214                 session.getManagerFilterChain().exceptionCaught( session, e );
215             }
216             finally
217             {
218                 releaseWriteBuffers( session );
219 
220                 session.getManagerFilterChain().sessionClosed( session );
221                 session.notifyClose();
222             }
223         }
224     }
225 
226     private void processSessions( Set selectedKeys )
227     {
228         Iterator it = selectedKeys.iterator();
229 
230         while( it.hasNext() )
231         {
232             SelectionKey key = ( SelectionKey ) it.next();
233             SocketSession session = ( SocketSession ) key.attachment();
234 
235             if( key.isReadable() )
236             {
237                 read( session );
238             }
239 
240             if( key.isWritable() )
241             {
242                 scheduleFlush( session );
243             }
244         }
245 
246         selectedKeys.clear();
247     }
248 
249     private void read( SocketSession session )
250     {
251         ByteBuffer buf = ByteBuffer.allocate(
252                 (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); 
253         SocketChannel ch = session.getChannel();
254 
255         try
256         {
257             int readBytes = 0;
258             int ret;
259 
260             buf.clear();
261 
262             try
263             {
264                 while( ( ret = ch.read( buf.buf() ) ) > 0 )
265                 {
266                     readBytes += ret;
267                 }
268             }
269             finally
270             {
271                 buf.flip();
272             }
273 
274             session.increaseReadBytes( readBytes );
275             session.setIdle( IdleStatus.BOTH_IDLE, false );
276             session.setIdle( IdleStatus.READER_IDLE, false );
277 
278             if( readBytes > 0 )
279             {
280                 ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
281                 newBuf.put( buf );
282                 newBuf.flip();
283                 session.getManagerFilterChain().dataRead( session, newBuf );
284             }
285             if( ret < 0 )
286             {
287                 scheduleRemove( session );
288             }
289         }
290         catch( Throwable e )
291         {
292             if( e instanceof IOException )
293                 scheduleRemove( session );
294             session.getManagerFilterChain().exceptionCaught( session, e );
295         }
296         finally
297         {
298             buf.release();
299         }
300     }
301 
302     private void scheduleRemove( SocketSession session )
303     {
304         synchronized( removingSessions )
305         {
306             removingSessions.push( session );
307         }
308     }
309 
310     private void scheduleFlush( SocketSession session )
311     {
312         synchronized( flushingSessions )
313         {
314             flushingSessions.push( session );
315         }
316     }
317 
318     private void notifyIdleSessions()
319     {
320         Set keys = selector.keys();
321         Iterator it;
322         SocketSession session;
323 
324         // process idle sessions
325         long currentTime = System.currentTimeMillis();
326 
327         if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
328         {
329             lastIdleCheckTime = currentTime;
330             it = keys.iterator();
331 
332             while( it.hasNext() )
333             {
334                 SelectionKey key = ( SelectionKey ) it.next();
335                 session = ( SocketSession ) key.attachment();
336 
337                 notifyIdleSession( session, currentTime );
338             }
339         }
340     }
341 
342     private void notifyIdleSession( SocketSession session, long currentTime )
343     {
344         SessionConfig config = session.getConfig();
345 
346         notifyIdleSession0( session, currentTime, config
347                 .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
348                             IdleStatus.BOTH_IDLE, session.getLastIoTime() );
349         notifyIdleSession0( session, currentTime, config
350                 .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
351                             IdleStatus.READER_IDLE, session.getLastReadTime() );
352         notifyIdleSession0( session, currentTime, config
353                 .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
354                             IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
355 
356         notifyWriteTimeoutSession( session, currentTime, config
357                 .getWriteTimeoutInMillis(), session.getLastWriteTime() );
358     }
359 
360     private void notifyIdleSession0( SocketSession session, long currentTime,
361                                     long idleTime, IdleStatus status,
362                                     long lastIoTime )
363     {
364         if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
365             && ( currentTime - lastIoTime ) >= idleTime )
366         {
367             session.setIdle( status, true );
368             session.getManagerFilterChain().sessionIdle( session, status );
369         }
370     }
371 
372     private void notifyWriteTimeoutSession( SocketSession session,
373                                            long currentTime,
374                                            long writeTimeout, long lastIoTime )
375     {
376         if( writeTimeout > 0
377             && ( currentTime - lastIoTime ) >= writeTimeout
378             && session.getSelectionKey() != null
379             && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
380         {
381             session
382                     .getManagerFilterChain()
383                     .exceptionCaught( session, new WriteTimeoutException() );
384         }
385     }
386 
387     private void flushSessions()
388     {
389         if( flushingSessions.size() == 0 )
390             return;
391 
392         for( ;; )
393         {
394             SocketSession session;
395 
396             synchronized( flushingSessions )
397             {
398                 session = ( SocketSession ) flushingSessions.pop();
399             }
400 
401             if( session == null )
402                 break;
403 
404             if( !session.isConnected() )
405             {
406                 releaseWriteBuffers( session );
407                 continue;
408             }
409 
410             // If encountered write request before session is initialized, 
411             // (In case that Session.write() is called before addSession() is processed)
412             if( session.getSelectionKey() == null )
413             {
414                 // Reschedule for later write
415                 scheduleFlush( session );
416                 break;
417             }
418             else
419             {
420                 try
421                 {
422                     flush( session );
423                 }
424                 catch( IOException e )
425                 {
426                     scheduleRemove( session );
427                     session.getManagerFilterChain().exceptionCaught( session, e );
428                 }
429             }
430         }
431     }
432     
433     private void releaseWriteBuffers( SocketSession session )
434     {
435         Queue writeBufferQueue = session.getWriteBufferQueue();
436         session.getWriteMarkerQueue().clear();
437         ByteBuffer buf;
438         
439         while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
440         {
441             try
442             {
443                 buf.release();
444             }
445             catch( IllegalStateException e )
446             {
447                 session.getManagerFilterChain().exceptionCaught( session, e );
448             }
449         }
450     }
451 
452     private void flush( SocketSession session ) throws IOException
453     {
454         SocketChannel ch = session.getChannel();
455 
456         Queue writeBufferQueue = session.getWriteBufferQueue();
457         Queue writeMarkerQueue = session.getWriteMarkerQueue();
458 
459         ByteBuffer buf;
460         Object marker;
461         for( ;; )
462         {
463             synchronized( writeBufferQueue )
464             {
465                 buf = ( ByteBuffer ) writeBufferQueue.first();
466                 marker = writeMarkerQueue.first();
467             }
468 
469             if( buf == null )
470                 break;
471 
472             if( buf.remaining() == 0 )
473             {
474                 synchronized( writeBufferQueue )
475                 {
476                     writeBufferQueue.pop();
477                     writeMarkerQueue.pop();
478                 }
479                 try
480                 {
481                     buf.release();
482                 }
483                 catch( IllegalStateException e )
484                 {
485                     session.getManagerFilterChain().exceptionCaught( session, e );
486                 }
487 
488                 session.increaseWrittenWriteRequests();
489                 session.getManagerFilterChain().dataWritten( session, marker );
490                 continue;
491             }
492 
493             int writtenBytes = 0;
494             try
495             {
496                 writtenBytes = ch.write( buf.buf() );
497             }
498             finally
499             {
500                 if( writtenBytes > 0 )
501                 {
502                     session.increaseWrittenBytes( writtenBytes );
503                     session.setIdle( IdleStatus.BOTH_IDLE, false );
504                     session.setIdle( IdleStatus.WRITER_IDLE, false );
505                 }
506 
507                 SelectionKey key = session.getSelectionKey();
508                 if( buf.hasRemaining() )
509                 {
510                     // Kernel buffer is full
511                     key
512                             .interestOps( key.interestOps()
513                                           | SelectionKey.OP_WRITE );
514                     break;
515                 }
516                 else
517                 {
518                     key.interestOps( key.interestOps()
519                                      & ( ~SelectionKey.OP_WRITE ) );
520                 }
521             }
522         }
523     }
524 
525     private class Worker extends Thread
526     {
527         public Worker()
528         {
529             super( "SocketIoProcessor" );
530         }
531 
532         public void run()
533         {
534             for( ;; )
535             {
536                 try
537                 {
538                     int nKeys = selector.select( 1000 );
539                     addSessions();
540 
541                     if( nKeys > 0 )
542                     {
543                         processSessions( selector.selectedKeys() );
544                     }
545 
546                     flushSessions();
547                     removeSessions();
548                     notifyIdleSessions();
549 
550                     if( selector.keys().isEmpty() )
551                     {
552                         synchronized( SocketIoProcessor.this )
553                         {
554                             if( selector.keys().isEmpty() &&
555                                 newSessions.isEmpty() )
556                             {
557                                 worker = null;
558                                 break;
559                             }
560                         }
561                     }
562                 }
563                 catch( IOException e )
564                 {
565                     e.printStackTrace();
566 
567                     try
568                     {
569                         Thread.sleep( 1000 );
570                     }
571                     catch( InterruptedException e1 )
572                     {
573                     }
574                 }
575             }
576         }
577     }
578 }