001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.handler.stream; 021 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.OutputStream; 025import java.net.SocketTimeoutException; 026 027import org.apache.mina.core.buffer.IoBuffer; 028import org.apache.mina.core.service.IoHandler; 029import org.apache.mina.core.service.IoHandlerAdapter; 030import org.apache.mina.core.session.AttributeKey; 031import org.apache.mina.core.session.IdleStatus; 032import org.apache.mina.core.session.IoSession; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O. 038 * <p> 039 * Please extend this class and implement 040 * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to 041 * execute your stream I/O logic; <b>please note that you must forward 042 * the process request to other thread or thread pool.</b> 043 * 044 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 045 */ 046public abstract class StreamIoHandler extends IoHandlerAdapter { 047 private final static Logger LOGGER = LoggerFactory.getLogger(StreamIoHandler.class); 048 049 private static final AttributeKey KEY_IN = new AttributeKey(StreamIoHandler.class, "in"); 050 051 private static final AttributeKey KEY_OUT = new AttributeKey(StreamIoHandler.class, "out"); 052 053 private int readTimeout; 054 055 private int writeTimeout; 056 057 protected StreamIoHandler() { 058 // Do nothing 059 } 060 061 /** 062 * Implement this method to execute your stream I/O logic; 063 * <b>please note that you must forward the process request to other 064 * thread or thread pool.</b> 065 * 066 * @param session The current session 067 * @param in The input stream 068 * @param out The output stream 069 */ 070 protected abstract void processStreamIo(IoSession session, InputStream in, OutputStream out); 071 072 /** 073 * @return read timeout in seconds. 074 * The default value is <tt>0</tt> (disabled). 075 */ 076 public int getReadTimeout() { 077 return readTimeout; 078 } 079 080 /** 081 * Sets read timeout in seconds. 082 * The default value is <tt>0</tt> (disabled). 083 * @param readTimeout The Read timeout 084 */ 085 public void setReadTimeout(int readTimeout) { 086 this.readTimeout = readTimeout; 087 } 088 089 /** 090 * @return write timeout in seconds. 091 * The default value is <tt>0</tt> (disabled). 092 */ 093 public int getWriteTimeout() { 094 return writeTimeout; 095 } 096 097 /** 098 * Sets write timeout in seconds. 099 * The default value is <tt>0</tt> (disabled). 100 * 101 * @param writeTimeout The Write timeout 102 */ 103 public void setWriteTimeout(int writeTimeout) { 104 this.writeTimeout = writeTimeout; 105 } 106 107 /** 108 * Initializes streams and timeout settings. 109 */ 110 @Override 111 public void sessionOpened(IoSession session) { 112 // Set timeouts 113 session.getConfig().setWriteTimeout(writeTimeout); 114 session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout); 115 116 // Create streams 117 InputStream in = new IoSessionInputStream(); 118 OutputStream out = new IoSessionOutputStream(session); 119 session.setAttribute(KEY_IN, in); 120 session.setAttribute(KEY_OUT, out); 121 processStreamIo(session, in, out); 122 } 123 124 /** 125 * Closes streams 126 */ 127 @Override 128 public void sessionClosed(IoSession session) throws Exception { 129 final InputStream in = (InputStream) session.getAttribute(KEY_IN); 130 final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT); 131 try { 132 in.close(); 133 } finally { 134 out.close(); 135 } 136 } 137 138 /** 139 * Forwards read data to input stream. 140 */ 141 @Override 142 public void messageReceived(IoSession session, Object buf) { 143 final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN); 144 in.write((IoBuffer) buf); 145 } 146 147 /** 148 * Forwards caught exceptions to input stream. 149 */ 150 @Override 151 public void exceptionCaught(IoSession session, Throwable cause) { 152 final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN); 153 154 IOException e = null; 155 if (cause instanceof StreamIoException) { 156 e = (IOException) cause.getCause(); 157 } else if (cause instanceof IOException) { 158 e = (IOException) cause; 159 } 160 161 if (e != null && in != null) { 162 in.throwException(e); 163 } else { 164 LOGGER.warn("Unexpected exception.", cause); 165 session.close(true); 166 } 167 } 168 169 /** 170 * Handles read timeout. 171 */ 172 @Override 173 public void sessionIdle(IoSession session, IdleStatus status) { 174 if (status == IdleStatus.READER_IDLE) { 175 throw new StreamIoException(new SocketTimeoutException("Read timeout")); 176 } 177 } 178 179 private static class StreamIoException extends RuntimeException { 180 private static final long serialVersionUID = 3976736960742503222L; 181 182 public StreamIoException(IOException cause) { 183 super(cause); 184 } 185 } 186}