View Javadoc

1   package org.apache.mina.io.handler;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   import java.io.PipedOutputStream;
7   import java.net.SocketTimeoutException;
8   
9   import org.apache.mina.common.ByteBuffer;
10  import org.apache.mina.common.IdleStatus;
11  import org.apache.mina.io.IoHandler;
12  import org.apache.mina.io.IoHandlerAdapter;
13  import org.apache.mina.io.IoSession;
14  
15  /***
16   * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
17   * <p>
18   * Please extend this class and implement
19   * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
20   * execute your stream I/O logic; <b>please note that you must forward
21   * the process request to other thread or thread pool.</b>
22   * 
23   * @author The Apache Directory Project (dev@directory.apache.org)
24   * @author Trustin Lee (trustin@apache.org)
25   * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $
26   */
27  public abstract class StreamIoHandler extends IoHandlerAdapter
28  {
29      private static final String KEY_IN = "BlockingIoHandler.in";
30      private static final String KEY_OUT = "BlockingIoHandler.out";
31      private static final String KEY_STARTED = "BlockingIoHandler.started";
32      
33      private int readTimeout;
34      
35      private int writeTimeout;
36      
37      protected StreamIoHandler()
38      {
39      }
40      
41      /***
42       * Implement this method to execute your stream I/O logic;
43       * <b>please note that you must forward the process request to other
44       * thread or thread pool.</b>
45       */
46      protected abstract void processStreamIo( IoSession session,
47                                               InputStream in, OutputStream out );
48      
49      /***
50       * Returns read timeout in seconds.
51       * The default value is <tt>0</tt> (disabled).
52       */
53      public int getReadTimeout()
54      {
55          return readTimeout;
56      }
57      
58      /***
59       * Sets read timeout in seconds.
60       * The default value is <tt>0</tt> (disabled).
61       */
62      public void setReadTimeout( int readTimeout )
63      {
64          this.readTimeout = readTimeout;
65      }
66      
67      /***
68       * Returns write timeout in seconds.
69       * The default value is <tt>0</tt> (disabled).
70       */
71      public int getWriteTimeout()
72      {
73          return writeTimeout;
74      }
75      
76      /***
77       * Sets write timeout in seconds.
78       * The default value is <tt>0</tt> (disabled).
79       */
80      public void setWriteTimeout( int writeTimeout )
81      {
82          this.writeTimeout = writeTimeout;
83      }
84  
85      /***
86       * Initializes streams and timeout settings.
87       */
88      public void sessionOpened( IoSession session )
89      {
90          // Set timeouts
91          session.getConfig().setWriteTimeout( writeTimeout );
92          session.getConfig().setIdleTime( IdleStatus.READER_IDLE, readTimeout );
93  
94          // Create streams
95          PipedOutputStream out = new PipedOutputStream();
96          session.setAttribute( KEY_OUT, out );
97          try
98          {
99              session.setAttribute( KEY_IN, new PipedInputStream( out ) );
100         }
101         catch( IOException e )
102         {
103             throw new StreamIoException( e );
104         }
105     }
106     
107     /***
108      * Closes input stream.
109      */
110     public void sessionClosed( IoSession session )
111     {
112         final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
113         try {
114             out.close();
115         }
116         catch( IOException e )
117         {
118             throw new StreamIoException( e );
119         }
120     }
121 
122     /***
123      * Forwards read data to input stream.
124      */
125     public void dataRead( IoSession session, ByteBuffer buf )
126     {
127         final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
128         final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
129         
130         java.nio.ByteBuffer nioBuf = buf.buf();
131         int offset = nioBuf.position();
132         int length = nioBuf.limit() - offset;
133         if( !nioBuf.hasArray() )
134         {
135             ByteBuffer heapBuf = ByteBuffer.allocate( length, false );
136             heapBuf.put( buf );
137             heapBuf.flip();
138             nioBuf = heapBuf.buf();
139             offset = 0;
140         }
141         
142         try
143         {
144             out.write( nioBuf.array(), offset, length );
145         }
146         catch( IOException e )
147         {
148             throw new StreamIoException( e );
149         }
150         finally
151         {
152             beginService( session, in );
153         }
154     }
155 
156     /***
157      * Forwards caught exceptions to input stream.
158      */
159     public void exceptionCaught( IoSession session, Throwable cause )
160     {
161         final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
162         
163         IOException e = null;
164         if( cause instanceof StreamIoException )
165         {
166             e = ( IOException ) cause.getCause();
167         }
168         else if( cause instanceof IOException )
169         {
170             e = ( IOException ) cause;
171         }
172         
173         if( e != null && in != null )
174         {
175             in.setException( e );
176             beginService( session, in );
177         }
178         else
179         {
180             cause.printStackTrace();
181             session.close();
182         }
183     }
184 
185     /***
186      * Handles read timeout.
187      */
188     public void sessionIdle( IoSession session, IdleStatus status )
189     {
190         if( status == IdleStatus.READER_IDLE )
191         {
192             throw new StreamIoException(
193                     new SocketTimeoutException( "Read timeout" ) );
194         }
195     }
196 
197     private void beginService( IoSession session, PipedInputStream in )
198     {
199         if( session.getAttribute( KEY_STARTED ) == null )
200         {
201             session.setAttribute( KEY_STARTED, Boolean.TRUE );
202             processStreamIo( session, in, new ServiceOutputStream( session ) );
203         }
204     }
205 
206     private static class PipedInputStream extends java.io.PipedInputStream
207     {
208         private IOException exception;
209 
210         public PipedInputStream(PipedOutputStream src) throws IOException
211         {
212             super( src );
213         }
214         
215         public void setException( IOException e )
216         {
217             this.exception = e;
218         }
219 
220         public synchronized int read() throws IOException
221         {
222             throwException();
223             return super.read();
224         }
225 
226         public synchronized int read( byte[] b, int off, int len ) throws IOException
227         {
228             throwException();
229             return super.read( b, off, len );
230         }
231         
232         private void throwException() throws IOException
233         {
234             if( exception != null )
235             {
236                 throw exception;
237             }
238         }
239     }
240 
241     private static class ServiceOutputStream extends OutputStream
242     {
243         private final IoSession session;
244         
245         public ServiceOutputStream( IoSession session )
246         {
247             this.session = session;
248         }
249 
250         public void close()
251         {
252             session.close( true );
253         }
254 
255         public void flush()
256         {
257         }
258 
259         public void write( byte[] b, int off, int len )
260         {
261             ByteBuffer buf = ByteBuffer.wrap( b, off, len );
262             buf.acquire(); // prevent from being pooled.
263             session.write( buf, null );
264         }
265 
266         public void write( byte[] b )
267         {
268             ByteBuffer buf = ByteBuffer.wrap( b );
269             buf.acquire(); // prevent from being pooled.
270             session.write( buf, null );
271         }
272 
273         public void write( int b )
274         {
275             ByteBuffer buf = ByteBuffer.allocate( 1 );
276             buf.put( ( byte ) b );
277             buf.flip();
278             session.write( buf, null );
279         }
280     }
281     
282     private static class StreamIoException extends RuntimeException
283     {
284         private static final long serialVersionUID = 3976736960742503222L;
285 
286         public StreamIoException( IOException cause )
287         {
288             super(cause);
289         }
290     }
291 }