001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.buffer; 021 022import java.io.BufferedOutputStream; 023import java.util.concurrent.ConcurrentHashMap; 024 025import org.apache.mina.core.buffer.IoBuffer; 026import org.apache.mina.core.filterchain.IoFilter; 027import org.apache.mina.core.filterchain.IoFilterAdapter; 028import org.apache.mina.core.session.IoSession; 029import org.apache.mina.core.write.DefaultWriteRequest; 030import org.apache.mina.core.write.WriteRequest; 031import org.apache.mina.filter.codec.ProtocolCodecFilter; 032import org.apache.mina.util.LazyInitializedCacheMap; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * An {@link IoFilter} implementation used to buffer outgoing {@link WriteRequest} almost 038 * like what {@link BufferedOutputStream} does. Using this filter allows to be less dependent 039 * from network latency. It is also useful when a session is generating very small messages 040 * too frequently and consequently generating unnecessary traffic overhead. 041 * 042 * Please note that it should always be placed before the {@link ProtocolCodecFilter} 043 * as it only handles {@link WriteRequest}'s carrying {@link IoBuffer} objects. 044 * 045 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 046 * @since MINA 2.0.0-M2 047 * @org.apache.xbean.XBean 048 */ 049public final class BufferedWriteFilter extends IoFilterAdapter { 050 private final Logger logger = LoggerFactory.getLogger(BufferedWriteFilter.class); 051 052 /** 053 * Default buffer size value in bytes. 054 */ 055 public final static int DEFAULT_BUFFER_SIZE = 8192; 056 057 /** 058 * The buffer size allocated for each new session's buffer. 059 */ 060 private int bufferSize = DEFAULT_BUFFER_SIZE; 061 062 /** 063 * The map that matches an {@link IoSession} and it's {@link IoBuffer} 064 * buffer. 065 */ 066 private final LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap; 067 068 /** 069 * Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE} 070 * bytes. Uses a default instance of {@link ConcurrentHashMap}. 071 */ 072 public BufferedWriteFilter() { 073 this(DEFAULT_BUFFER_SIZE, null); 074 } 075 076 /** 077 * Constructor which sets buffer size to <code>bufferSize</code>.Uses a default 078 * instance of {@link ConcurrentHashMap}. 079 * 080 * @param bufferSize the new buffer size 081 */ 082 public BufferedWriteFilter(int bufferSize) { 083 this(bufferSize, null); 084 } 085 086 /** 087 * Constructor which sets buffer size to <code>bufferSize</code>. If 088 * <code>buffersMap</code> is null then a default instance of {@link ConcurrentHashMap} 089 * is created else the provided instance is used. 090 * 091 * @param bufferSize the new buffer size 092 * @param buffersMap the map to use for storing each session buffer 093 */ 094 public BufferedWriteFilter(int bufferSize, LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap) { 095 super(); 096 this.bufferSize = bufferSize; 097 if (buffersMap == null) { 098 this.buffersMap = new LazyInitializedCacheMap<IoSession, IoBuffer>(); 099 } else { 100 this.buffersMap = buffersMap; 101 } 102 } 103 104 /** 105 * @return The buffer size. 106 */ 107 public int getBufferSize() { 108 return bufferSize; 109 } 110 111 /** 112 * Sets the buffer size but only for the newly created buffers. 113 * 114 * @param bufferSize the new buffer size 115 */ 116 public void setBufferSize(int bufferSize) { 117 this.bufferSize = bufferSize; 118 } 119 120 /** 121 * {@inheritDoc} 122 * 123 * @throws Exception if <code>writeRequest.message</code> isn't an 124 * {@link IoBuffer} instance. 125 */ 126 @Override 127 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { 128 129 Object data = writeRequest.getMessage(); 130 131 if (data instanceof IoBuffer) { 132 write(session, (IoBuffer) data); 133 } else { 134 throw new IllegalArgumentException("This filter should only buffer IoBuffer objects"); 135 } 136 } 137 138 /** 139 * Writes an {@link IoBuffer} to the session's buffer. 140 * 141 * @param session the session to which a write is requested 142 * @param data the data to buffer 143 */ 144 private void write(IoSession session, IoBuffer data) { 145 IoBuffer dest = buffersMap.putIfAbsent(session, new IoBufferLazyInitializer(bufferSize)); 146 147 write(session, data, dest); 148 } 149 150 /** 151 * Writes <code>data</code> {@link IoBuffer} to the <code>buf</code> 152 * {@link IoBuffer} which buffers write requests for the 153 * <code>session</code> {@ link IoSession} until buffer is full 154 * or manually flushed. 155 * 156 * @param session the session where buffer will be written 157 * @param data the data to buffer 158 * @param buf the buffer where data will be temporarily written 159 */ 160 private void write(IoSession session, IoBuffer data, IoBuffer buf) { 161 try { 162 int len = data.remaining(); 163 if (len >= buf.capacity()) { 164 /* 165 * If the request length exceeds the size of the output buffer, 166 * flush the output buffer and then write the data directly. 167 */ 168 NextFilter nextFilter = session.getFilterChain().getNextFilter(this); 169 internalFlush(nextFilter, session, buf); 170 nextFilter.filterWrite(session, new DefaultWriteRequest(data)); 171 return; 172 } 173 if (len > (buf.limit() - buf.position())) { 174 internalFlush(session.getFilterChain().getNextFilter(this), session, buf); 175 } 176 synchronized (buf) { 177 buf.put(data); 178 } 179 } catch (Exception e) { 180 session.getFilterChain().fireExceptionCaught(e); 181 } 182 } 183 184 /** 185 * Internal method that actually flushes the buffered data. 186 * 187 * @param nextFilter the {@link NextFilter} of this filter 188 * @param session the session where buffer will be written 189 * @param buf the data to write 190 * @throws Exception if a write operation fails 191 */ 192 private void internalFlush(NextFilter nextFilter, IoSession session, IoBuffer buf) throws Exception { 193 IoBuffer tmp = null; 194 synchronized (buf) { 195 buf.flip(); 196 tmp = buf.duplicate(); 197 buf.clear(); 198 } 199 logger.debug("Flushing buffer: {}", tmp); 200 nextFilter.filterWrite(session, new DefaultWriteRequest(tmp)); 201 } 202 203 /** 204 * Flushes the buffered data. 205 * 206 * @param session the session where buffer will be written 207 */ 208 public void flush(IoSession session) { 209 try { 210 internalFlush(session.getFilterChain().getNextFilter(this), session, buffersMap.get(session)); 211 } catch (Exception e) { 212 session.getFilterChain().fireExceptionCaught(e); 213 } 214 } 215 216 /** 217 * Internal method that actually frees the {@link IoBuffer} that contains 218 * the buffered data that has not been flushed. 219 * 220 * @param session the session we operate on 221 */ 222 private void free(IoSession session) { 223 IoBuffer buf = buffersMap.remove(session); 224 if (buf != null) { 225 buf.free(); 226 } 227 } 228 229 /** 230 * {@inheritDoc} 231 */ 232 @Override 233 public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { 234 free(session); 235 nextFilter.exceptionCaught(session, cause); 236 } 237 238 /** 239 * {@inheritDoc} 240 */ 241 @Override 242 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { 243 free(session); 244 nextFilter.sessionClosed(session); 245 } 246}