001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.buffer;
021
022import java.io.BufferedOutputStream;
023import java.util.concurrent.ConcurrentHashMap;
024
025import org.apache.mina.core.buffer.IoBuffer;
026import org.apache.mina.core.filterchain.IoFilter;
027import org.apache.mina.core.filterchain.IoFilterAdapter;
028import org.apache.mina.core.session.IoSession;
029import org.apache.mina.core.write.DefaultWriteRequest;
030import org.apache.mina.core.write.WriteRequest;
031import org.apache.mina.filter.codec.ProtocolCodecFilter;
032import org.apache.mina.util.LazyInitializedCacheMap;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * An {@link IoFilter} implementation used to buffer outgoing {@link WriteRequest} almost
038 * like what {@link BufferedOutputStream} does. Using this filter allows to be less dependent
039 * from network latency. It is also useful when a session is generating very small messages
040 * too frequently and consequently generating unnecessary traffic overhead.
041 * 
042 * Please note that it should always be placed before the {@link ProtocolCodecFilter}
043 * as it only handles {@link WriteRequest}'s carrying {@link IoBuffer} objects.
044 * 
045 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
046 * @since MINA 2.0.0-M2
047 * @org.apache.xbean.XBean
048 */
049public final class BufferedWriteFilter extends IoFilterAdapter {
050    private final Logger logger = LoggerFactory.getLogger(BufferedWriteFilter.class);
051
052    /**
053     * Default buffer size value in bytes.
054     */
055    public final static int DEFAULT_BUFFER_SIZE = 8192;
056
057    /**
058     * The buffer size allocated for each new session's buffer.
059     */
060    private int bufferSize = DEFAULT_BUFFER_SIZE;
061
062    /**
063     * The map that matches an {@link IoSession} and it's {@link IoBuffer}
064     * buffer.
065     */
066    private final LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap;
067
068    /**
069     * Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE}
070     * bytes. Uses a default instance of {@link ConcurrentHashMap}.
071     */
072    public BufferedWriteFilter() {
073        this(DEFAULT_BUFFER_SIZE, null);
074    }
075
076    /**
077     * Constructor which sets buffer size to <code>bufferSize</code>.Uses a default
078     * instance of {@link ConcurrentHashMap}.
079     * 
080     * @param bufferSize the new buffer size
081     */
082    public BufferedWriteFilter(int bufferSize) {
083        this(bufferSize, null);
084    }
085
086    /**
087     * Constructor which sets buffer size to <code>bufferSize</code>. If
088     * <code>buffersMap</code> is null then a default instance of {@link ConcurrentHashMap}
089     * is created else the provided instance is used.
090     * 
091     * @param bufferSize the new buffer size
092     * @param buffersMap the map to use for storing each session buffer
093     */
094    public BufferedWriteFilter(int bufferSize, LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap) {
095        super();
096        this.bufferSize = bufferSize;
097        if (buffersMap == null) {
098            this.buffersMap = new LazyInitializedCacheMap<IoSession, IoBuffer>();
099        } else {
100            this.buffersMap = buffersMap;
101        }
102    }
103
104    /**
105     * @return The buffer size.
106     */
107    public int getBufferSize() {
108        return bufferSize;
109    }
110
111    /**
112     * Sets the buffer size but only for the newly created buffers.
113     * 
114     * @param bufferSize the new buffer size
115     */
116    public void setBufferSize(int bufferSize) {
117        this.bufferSize = bufferSize;
118    }
119
120    /**
121     * {@inheritDoc}
122     * 
123     * @throws Exception if <code>writeRequest.message</code> isn't an
124     *                   {@link IoBuffer} instance.
125     */
126    @Override
127    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
128
129        Object data = writeRequest.getMessage();
130
131        if (data instanceof IoBuffer) {
132            write(session, (IoBuffer) data);
133        } else {
134            throw new IllegalArgumentException("This filter should only buffer IoBuffer objects");
135        }
136    }
137
138    /**
139     * Writes an {@link IoBuffer} to the session's buffer.
140     * 
141     * @param session the session to which a write is requested
142     * @param data the data to buffer
143     */
144    private void write(IoSession session, IoBuffer data) {
145        IoBuffer dest = buffersMap.putIfAbsent(session, new IoBufferLazyInitializer(bufferSize));
146
147        write(session, data, dest);
148    }
149
150    /**
151     * Writes <code>data</code> {@link IoBuffer} to the <code>buf</code>
152     * {@link IoBuffer} which buffers write requests for the
153     * <code>session</code> {@ link IoSession} until buffer is full
154     * or manually flushed.
155     * 
156     * @param session the session where buffer will be written
157     * @param data the data to buffer
158     * @param buf the buffer where data will be temporarily written
159     */
160    private void write(IoSession session, IoBuffer data, IoBuffer buf) {
161        try {
162            int len = data.remaining();
163            if (len >= buf.capacity()) {
164                /*
165                 * If the request length exceeds the size of the output buffer,
166                 * flush the output buffer and then write the data directly.
167                 */
168                NextFilter nextFilter = session.getFilterChain().getNextFilter(this);
169                internalFlush(nextFilter, session, buf);
170                nextFilter.filterWrite(session, new DefaultWriteRequest(data));
171                return;
172            }
173            if (len > (buf.limit() - buf.position())) {
174                internalFlush(session.getFilterChain().getNextFilter(this), session, buf);
175            }
176            synchronized (buf) {
177                buf.put(data);
178            }
179        } catch (Exception e) {
180            session.getFilterChain().fireExceptionCaught(e);
181        }
182    }
183
184    /**
185     * Internal method that actually flushes the buffered data.
186     * 
187     * @param nextFilter the {@link NextFilter} of this filter
188     * @param session the session where buffer will be written
189     * @param buf the data to write
190     * @throws Exception if a write operation fails
191     */
192    private void internalFlush(NextFilter nextFilter, IoSession session, IoBuffer buf) throws Exception {
193        IoBuffer tmp = null;
194        synchronized (buf) {
195            buf.flip();
196            tmp = buf.duplicate();
197            buf.clear();
198        }
199        logger.debug("Flushing buffer: {}", tmp);
200        nextFilter.filterWrite(session, new DefaultWriteRequest(tmp));
201    }
202
203    /**
204     * Flushes the buffered data.
205     * 
206     * @param session the session where buffer will be written
207     */
208    public void flush(IoSession session) {
209        try {
210            internalFlush(session.getFilterChain().getNextFilter(this), session, buffersMap.get(session));
211        } catch (Exception e) {
212            session.getFilterChain().fireExceptionCaught(e);
213        }
214    }
215
216    /**
217     * Internal method that actually frees the {@link IoBuffer} that contains
218     * the buffered data that has not been flushed.
219     * 
220     * @param session the session we operate on
221     */
222    private void free(IoSession session) {
223        IoBuffer buf = buffersMap.remove(session);
224        if (buf != null) {
225            buf.free();
226        }
227    }
228
229    /**
230     * {@inheritDoc}
231     */
232    @Override
233    public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
234        free(session);
235        nextFilter.exceptionCaught(session, cause);
236    }
237
238    /**
239     * {@inheritDoc}
240     */
241    @Override
242    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
243        free(session);
244        nextFilter.sessionClosed(session);
245    }
246}