View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.handler.stream;
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.net.SocketTimeoutException;
26  
27  import org.apache.mina.core.buffer.IoBuffer;
28  import org.apache.mina.core.service.IoHandler;
29  import org.apache.mina.core.service.IoHandlerAdapter;
30  import org.apache.mina.core.session.AttributeKey;
31  import org.apache.mina.core.session.IdleStatus;
32  import org.apache.mina.core.session.IoSession;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
38   * <p>
39   * Please extend this class and implement
40   * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
41   * execute your stream I/O logic; <b>please note that you must forward
42   * the process request to other thread or thread pool.</b>
43   *
44   * @author The Apache MINA Project (dev@mina.apache.org)
45   */
46  public abstract class StreamIoHandler extends IoHandlerAdapter {
47      private final static Logger LOGGER = LoggerFactory.getLogger(StreamIoHandler.class);
48      
49      private static final AttributeKey KEY_IN = new AttributeKey(StreamIoHandler.class, "in");
50      private static final AttributeKey KEY_OUT = new AttributeKey(StreamIoHandler.class, "out");
51  
52      private int readTimeout;
53  
54      private int writeTimeout;
55  
56      protected StreamIoHandler() {
57          // Do nothing
58      }
59  
60      /**
61       * Implement this method to execute your stream I/O logic;
62       * <b>please note that you must forward the process request to other
63       * thread or thread pool.</b>
64       */
65      protected abstract void processStreamIo(IoSession session, InputStream in,
66              OutputStream out);
67  
68      /**
69       * Returns read timeout in seconds.
70       * The default value is <tt>0</tt> (disabled).
71       */
72      public int getReadTimeout() {
73          return readTimeout;
74      }
75  
76      /**
77       * Sets read timeout in seconds.
78       * The default value is <tt>0</tt> (disabled).
79       */
80      public void setReadTimeout(int readTimeout) {
81          this.readTimeout = readTimeout;
82      }
83  
84      /**
85       * Returns write timeout in seconds.
86       * The default value is <tt>0</tt> (disabled).
87       */
88      public int getWriteTimeout() {
89          return writeTimeout;
90      }
91  
92      /**
93       * Sets write timeout in seconds.
94       * The default value is <tt>0</tt> (disabled).
95       */
96      public void setWriteTimeout(int writeTimeout) {
97          this.writeTimeout = writeTimeout;
98      }
99  
100     /**
101      * Initializes streams and timeout settings.
102      */
103     @Override
104     public void sessionOpened(IoSession session) {
105         // Set timeouts
106         session.getConfig().setWriteTimeout(writeTimeout);
107         session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout);
108 
109         // Create streams
110         InputStream in = new IoSessionInputStream();
111         OutputStream out = new IoSessionOutputStream(session);
112         session.setAttribute(KEY_IN, in);
113         session.setAttribute(KEY_OUT, out);
114         processStreamIo(session, in, out);
115     }
116 
117     /**
118      * Closes streams
119      */
120     @Override
121     public void sessionClosed(IoSession session) throws Exception {
122         final InputStream in = (InputStream) session.getAttribute(KEY_IN);
123         final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT);
124         try {
125             in.close();
126         } finally {
127             out.close();
128         }
129     }
130 
131     /**
132      * Forwards read data to input stream.
133      */
134     @Override
135     public void messageReceived(IoSession session, Object buf) {
136         final IoSessionInputStream in = (IoSessionInputStream) session
137                 .getAttribute(KEY_IN);
138         in.write((IoBuffer) buf);
139     }
140 
141     /**
142      * Forwards caught exceptions to input stream.
143      */
144     @Override
145     public void exceptionCaught(IoSession session, Throwable cause) {
146         final IoSessionInputStream in = (IoSessionInputStream) session
147                 .getAttribute(KEY_IN);
148 
149         IOException e = null;
150         if (cause instanceof StreamIoException) {
151             e = (IOException) cause.getCause();
152         } else if (cause instanceof IOException) {
153             e = (IOException) cause;
154         }
155 
156         if (e != null && in != null) {
157             in.throwException(e);
158         } else {
159             LOGGER.warn("Unexpected exception.", cause);
160             session.close(true);
161         }
162     }
163 
164     /**
165      * Handles read timeout.
166      */
167     @Override
168     public void sessionIdle(IoSession session, IdleStatus status) {
169         if (status == IdleStatus.READER_IDLE) {
170             throw new StreamIoException(new SocketTimeoutException(
171                     "Read timeout"));
172         }
173     }
174 
175     private static class StreamIoException extends RuntimeException {
176         private static final long serialVersionUID = 3976736960742503222L;
177 
178         public StreamIoException(IOException cause) {
179             super(cause);
180         }
181     }
182 }