1 package org.apache.mina.io.handler;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.OutputStream;
6 import java.io.PipedOutputStream;
7 import java.net.SocketTimeoutException;
8
9 import org.apache.mina.common.ByteBuffer;
10 import org.apache.mina.common.IdleStatus;
11 import org.apache.mina.io.IoHandler;
12 import org.apache.mina.io.IoHandlerAdapter;
13 import org.apache.mina.io.IoSession;
14
15 /***
16 * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
17 * <p>
18 * Please extend this class and implement
19 * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
20 * execute your stream I/O logic; <b>please note that you must forward
21 * the process request to other thread or thread pool.</b>
22 *
23 * @author The Apache Directory Project (dev@directory.apache.org)
24 * @author Trustin Lee (trustin@apache.org)
25 * @version $Rev: 264677 $, $Date: 2005-08-30 11:44:35 +0900 $
26 */
27 public abstract class StreamIoHandler extends IoHandlerAdapter
28 {
29 private static final String KEY_IN = "BlockingIoHandler.in";
30 private static final String KEY_OUT = "BlockingIoHandler.out";
31 private static final String KEY_STARTED = "BlockingIoHandler.started";
32
33 private int readTimeout;
34
35 private int writeTimeout;
36
37 protected StreamIoHandler()
38 {
39 }
40
41 /***
42 * Implement this method to execute your stream I/O logic;
43 * <b>please note that you must forward the process request to other
44 * thread or thread pool.</b>
45 */
46 protected abstract void processStreamIo( IoSession session,
47 InputStream in, OutputStream out );
48
49 /***
50 * Returns read timeout in seconds.
51 * The default value is <tt>0</tt> (disabled).
52 */
53 public int getReadTimeout()
54 {
55 return readTimeout;
56 }
57
58 /***
59 * Sets read timeout in seconds.
60 * The default value is <tt>0</tt> (disabled).
61 */
62 public void setReadTimeout( int readTimeout )
63 {
64 this.readTimeout = readTimeout;
65 }
66
67 /***
68 * Returns write timeout in seconds.
69 * The default value is <tt>0</tt> (disabled).
70 */
71 public int getWriteTimeout()
72 {
73 return writeTimeout;
74 }
75
76 /***
77 * Sets write timeout in seconds.
78 * The default value is <tt>0</tt> (disabled).
79 */
80 public void setWriteTimeout( int writeTimeout )
81 {
82 this.writeTimeout = writeTimeout;
83 }
84
85 /***
86 * Initializes streams and timeout settings.
87 */
88 public void sessionOpened( IoSession session )
89 {
90
91 session.getConfig().setWriteTimeout( writeTimeout );
92 session.getConfig().setIdleTime( IdleStatus.READER_IDLE, readTimeout );
93
94
95 PipedOutputStream out = new PipedOutputStream();
96 session.setAttribute( KEY_OUT, out );
97 try
98 {
99 session.setAttribute( KEY_IN, new PipedInputStream( out ) );
100 }
101 catch( IOException e )
102 {
103 throw new StreamIoException( e );
104 }
105 }
106
107 /***
108 * Closes input stream.
109 */
110 public void sessionClosed( IoSession session )
111 {
112 final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
113 try {
114 out.close();
115 }
116 catch( IOException e )
117 {
118 throw new StreamIoException( e );
119 }
120 }
121
122 /***
123 * Forwards read data to input stream.
124 */
125 public void dataRead( IoSession session, ByteBuffer buf )
126 {
127 final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
128 final PipedOutputStream out = ( PipedOutputStream ) session.getAttribute( KEY_OUT );
129
130 java.nio.ByteBuffer nioBuf = buf.buf();
131 int offset = nioBuf.position();
132 int length = nioBuf.limit() - offset;
133 if( !nioBuf.hasArray() )
134 {
135 ByteBuffer heapBuf = ByteBuffer.allocate( length, false );
136 heapBuf.put( buf );
137 heapBuf.flip();
138 nioBuf = heapBuf.buf();
139 offset = 0;
140 }
141
142 try
143 {
144 out.write( nioBuf.array(), offset, length );
145 }
146 catch( IOException e )
147 {
148 throw new StreamIoException( e );
149 }
150 finally
151 {
152 beginService( session, in );
153 }
154 }
155
156 /***
157 * Forwards caught exceptions to input stream.
158 */
159 public void exceptionCaught( IoSession session, Throwable cause )
160 {
161 final PipedInputStream in = ( PipedInputStream ) session.getAttribute( KEY_IN );
162
163 IOException e = null;
164 if( cause instanceof StreamIoException )
165 {
166 e = ( IOException ) cause.getCause();
167 }
168 else if( cause instanceof IOException )
169 {
170 e = ( IOException ) cause;
171 }
172
173 if( e != null && in != null )
174 {
175 in.setException( e );
176 beginService( session, in );
177 }
178 else
179 {
180 cause.printStackTrace();
181 session.close();
182 }
183 }
184
185 /***
186 * Handles read timeout.
187 */
188 public void sessionIdle( IoSession session, IdleStatus status )
189 {
190 if( status == IdleStatus.READER_IDLE )
191 {
192 throw new StreamIoException(
193 new SocketTimeoutException( "Read timeout" ) );
194 }
195 }
196
197 private void beginService( IoSession session, PipedInputStream in )
198 {
199 if( session.getAttribute( KEY_STARTED ) == null )
200 {
201 session.setAttribute( KEY_STARTED, Boolean.TRUE );
202 processStreamIo( session, in, new ServiceOutputStream( session ) );
203 }
204 }
205
206 private static class PipedInputStream extends java.io.PipedInputStream
207 {
208 private IOException exception;
209
210 public PipedInputStream(PipedOutputStream src) throws IOException
211 {
212 super( src );
213 }
214
215 public void setException( IOException e )
216 {
217 this.exception = e;
218 }
219
220 public synchronized int read() throws IOException
221 {
222 throwException();
223 return super.read();
224 }
225
226 public synchronized int read( byte[] b, int off, int len ) throws IOException
227 {
228 throwException();
229 return super.read( b, off, len );
230 }
231
232 private void throwException() throws IOException
233 {
234 if( exception != null )
235 {
236 throw exception;
237 }
238 }
239 }
240
241 private static class ServiceOutputStream extends OutputStream
242 {
243 private final IoSession session;
244
245 public ServiceOutputStream( IoSession session )
246 {
247 this.session = session;
248 }
249
250 public void close()
251 {
252 session.close( true );
253 }
254
255 public void flush()
256 {
257 }
258
259 public void write( byte[] b, int off, int len )
260 {
261 ByteBuffer buf = ByteBuffer.wrap( b, off, len );
262 buf.acquire();
263 session.write( buf, null );
264 }
265
266 public void write( byte[] b )
267 {
268 ByteBuffer buf = ByteBuffer.wrap( b );
269 buf.acquire();
270 session.write( buf, null );
271 }
272
273 public void write( int b )
274 {
275 ByteBuffer buf = ByteBuffer.allocate( 1 );
276 buf.put( ( byte ) b );
277 buf.flip();
278 session.write( buf, null );
279 }
280 }
281
282 private static class StreamIoException extends RuntimeException
283 {
284 private static final long serialVersionUID = 3976736960742503222L;
285
286 public StreamIoException( IOException cause )
287 {
288 super(cause);
289 }
290 }
291 }