001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.handler.stream;
021
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.net.SocketTimeoutException;
026
027import org.apache.mina.core.buffer.IoBuffer;
028import org.apache.mina.core.service.IoHandler;
029import org.apache.mina.core.service.IoHandlerAdapter;
030import org.apache.mina.core.session.AttributeKey;
031import org.apache.mina.core.session.IdleStatus;
032import org.apache.mina.core.session.IoSession;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
038 * <p>
039 * Please extend this class and implement
040 * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
041 * execute your stream I/O logic; <b>please note that you must forward
042 * the process request to other thread or thread pool.</b>
043 *
044 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
045 */
046public abstract class StreamIoHandler extends IoHandlerAdapter {
047    private final static Logger LOGGER = LoggerFactory.getLogger(StreamIoHandler.class);
048
049    private static final AttributeKey KEY_IN = new AttributeKey(StreamIoHandler.class, "in");
050
051    private static final AttributeKey KEY_OUT = new AttributeKey(StreamIoHandler.class, "out");
052
053    private int readTimeout;
054
055    private int writeTimeout;
056
057    protected StreamIoHandler() {
058        // Do nothing
059    }
060
061    /**
062     * Implement this method to execute your stream I/O logic;
063     * <b>please note that you must forward the process request to other
064     * thread or thread pool.</b>
065     * 
066     * @param session The current session
067     * @param in The input stream
068     * @param out The output stream
069     */
070    protected abstract void processStreamIo(IoSession session, InputStream in, OutputStream out);
071
072    /**
073     * @return read timeout in seconds.
074     * The default value is <tt>0</tt> (disabled).
075     */
076    public int getReadTimeout() {
077        return readTimeout;
078    }
079
080    /**
081     * Sets read timeout in seconds.
082     * The default value is <tt>0</tt> (disabled).
083     * @param readTimeout The Read timeout
084     */
085    public void setReadTimeout(int readTimeout) {
086        this.readTimeout = readTimeout;
087    }
088
089    /**
090     * @return write timeout in seconds.
091     * The default value is <tt>0</tt> (disabled).
092     */
093    public int getWriteTimeout() {
094        return writeTimeout;
095    }
096
097    /**
098     * Sets write timeout in seconds.
099     * The default value is <tt>0</tt> (disabled).
100     * 
101     * @param writeTimeout The Write timeout
102     */
103    public void setWriteTimeout(int writeTimeout) {
104        this.writeTimeout = writeTimeout;
105    }
106
107    /**
108     * Initializes streams and timeout settings.
109     */
110    @Override
111    public void sessionOpened(IoSession session) {
112        // Set timeouts
113        session.getConfig().setWriteTimeout(writeTimeout);
114        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout);
115
116        // Create streams
117        InputStream in = new IoSessionInputStream();
118        OutputStream out = new IoSessionOutputStream(session);
119        session.setAttribute(KEY_IN, in);
120        session.setAttribute(KEY_OUT, out);
121        processStreamIo(session, in, out);
122    }
123
124    /**
125     * Closes streams
126     */
127    @Override
128    public void sessionClosed(IoSession session) throws Exception {
129        final InputStream in = (InputStream) session.getAttribute(KEY_IN);
130        final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT);
131        try {
132            in.close();
133        } finally {
134            out.close();
135        }
136    }
137
138    /**
139     * Forwards read data to input stream.
140     */
141    @Override
142    public void messageReceived(IoSession session, Object buf) {
143        final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN);
144        in.write((IoBuffer) buf);
145    }
146
147    /**
148     * Forwards caught exceptions to input stream.
149     */
150    @Override
151    public void exceptionCaught(IoSession session, Throwable cause) {
152        final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN);
153
154        IOException e = null;
155        if (cause instanceof StreamIoException) {
156            e = (IOException) cause.getCause();
157        } else if (cause instanceof IOException) {
158            e = (IOException) cause;
159        }
160
161        if (e != null && in != null) {
162            in.throwException(e);
163        } else {
164            LOGGER.warn("Unexpected exception.", cause);
165            session.close(true);
166        }
167    }
168
169    /**
170     * Handles read timeout.
171     */
172    @Override
173    public void sessionIdle(IoSession session, IdleStatus status) {
174        if (status == IdleStatus.READER_IDLE) {
175            throw new StreamIoException(new SocketTimeoutException("Read timeout"));
176        }
177    }
178
179    private static class StreamIoException extends RuntimeException {
180        private static final long serialVersionUID = 3976736960742503222L;
181
182        public StreamIoException(IOException cause) {
183            super(cause);
184        }
185    }
186}