View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.buffer;
21  
22  import java.io.BufferedOutputStream;
23  import java.util.concurrent.ConcurrentHashMap;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.filterchain.IoFilter;
27  import org.apache.mina.core.filterchain.IoFilterAdapter;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.core.write.DefaultWriteRequest;
30  import org.apache.mina.core.write.WriteRequest;
31  import org.apache.mina.filter.codec.ProtocolCodecFilter;
32  import org.apache.mina.util.LazyInitializedCacheMap;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * An {@link IoFilter} implementation used to buffer outgoing {@link WriteRequest} almost 
38   * like what {@link BufferedOutputStream} does. Using this filter allows to be less dependent 
39   * from network latency. It is also useful when a session is generating very small messages 
40   * too frequently and consequently generating unnecessary traffic overhead.
41   * 
42   * Please note that it should always be placed before the {@link ProtocolCodecFilter} 
43   * as it only handles {@link WriteRequest}'s carrying {@link IoBuffer} objects.
44   * 
45   * @author The Apache MINA Project (dev@mina.apache.org)
46   * @version $Rev$, $Date$
47   * @since MINA 2.0.0-M2
48   */
49  public final class BufferedWriteFilter extends IoFilterAdapter {
50      private final Logger logger = LoggerFactory
51              .getLogger(BufferedWriteFilter.class);
52  
53      /**
54       * Default buffer size value in bytes.
55       */
56      public final static int DEFAULT_BUFFER_SIZE = 8192;
57  
58      /**
59       * The buffer size allocated for each new session's buffer.
60       */
61      private int bufferSize = DEFAULT_BUFFER_SIZE;
62  
63      /**
64       * The map that matches an {@link IoSession} and it's {@link IoBuffer}
65       * buffer.
66       */
67      private final LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap;
68  
69      /**
70       * Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE}
71       * bytes. Uses a default instance of {@link ConcurrentHashMap}.
72       */
73      public BufferedWriteFilter() {
74          this(DEFAULT_BUFFER_SIZE, null);
75      }
76  
77      /**
78       * Constructor which sets buffer size to <code>bufferSize</code>.Uses a default 
79       * instance of {@link ConcurrentHashMap}.
80       * 
81       * @param bufferSize the new buffer size
82       */
83      public BufferedWriteFilter(int bufferSize) {
84          this(bufferSize, null);
85      }
86  
87      /**
88       * Constructor which sets buffer size to <code>bufferSize</code>. If 
89       * <code>buffersMap</code> is null then a default instance of {@link ConcurrentHashMap} 
90       * is created else the provided instance is used.
91       * 
92       * @param bufferSize the new buffer size
93       * @param buffersMap the map to use for storing each session buffer 
94       */
95      public BufferedWriteFilter(int bufferSize,
96              LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap) {
97          super();
98          this.bufferSize = bufferSize;
99          if (buffersMap == null) {
100             this.buffersMap = new LazyInitializedCacheMap<IoSession, IoBuffer>();
101         } else {
102             this.buffersMap = buffersMap;
103         }
104     }
105 
106     /**
107      * Returns buffer size.
108      */
109     public int getBufferSize() {
110         return bufferSize;
111     }
112 
113     /**
114      * Sets the buffer size but only for the newly created buffers.
115      * 
116      * @param bufferSize the new buffer size
117      */
118     public void setBufferSize(int bufferSize) {
119         this.bufferSize = bufferSize;
120     }
121 
122     /**
123      * {@inheritDoc}
124      * 
125      * @throws Exception if <code>writeRequest.message</code> isn't an
126      *                   {@link IoBuffer} instance.
127      */
128     @Override
129     public void filterWrite(NextFilter nextFilter, IoSession session,
130             WriteRequest writeRequest) throws Exception {
131 
132         Object data = writeRequest.getMessage();
133 
134         if (data instanceof IoBuffer) {
135             write(session, (IoBuffer) data);
136         } else {
137             throw new IllegalArgumentException(
138                     "This filter should only buffer IoBuffer objects");
139         }
140     }
141 
142     /**
143      * Writes an {@link IoBuffer} to the session's buffer.
144      * 
145      * @param session the session to which a write is requested
146      * @param data the data to buffer
147      */
148     private void write(IoSession session, IoBuffer data) {
149         IoBuffer dest = buffersMap.putIfAbsent(session,
150                 new IoBufferLazyInitializer(bufferSize));
151 
152         write(session, data, dest);
153     }
154 
155     /**
156      * Writes <code>data</code> {@link IoBuffer} to the <code>buf</code>
157      * {@link IoBuffer} which buffers write requests for the
158      * <code>session</code> {@ link IoSession} until buffer is full 
159      * or manually flushed.
160      * 
161      * @param session the session where buffer will be written
162      * @param data the data to buffer
163      * @param buf the buffer where data will be temporarily written 
164      */
165     private void write(IoSession session, IoBuffer data, IoBuffer buf) {
166         try {
167             int len = data.remaining();
168             if (len >= buf.capacity()) {
169                 /*
170                  * If the request length exceeds the size of the output buffer,
171                  * flush the output buffer and then write the data directly.
172                  */
173                 NextFilter nextFilter = session.getFilterChain().getNextFilter(
174                         this);
175                 internalFlush(nextFilter, session, buf);
176                 nextFilter.filterWrite(session, new DefaultWriteRequest(data));
177                 return;
178             }
179             if (len > (buf.limit() - buf.position())) {
180                 internalFlush(session.getFilterChain().getNextFilter(this),
181                         session, buf);
182             }
183             synchronized (buf) {
184                 buf.put(data);
185             }
186         } catch (Throwable e) {
187             session.getFilterChain().fireExceptionCaught(e);
188         }
189     }
190 
191     /**
192      * Internal method that actually flushes the buffered data.
193      * 
194      * @param nextFilter the {@link NextFilter} of this filter
195      * @param session the session where buffer will be written
196      * @param buf the data to write
197      * @throws Exception if a write operation fails
198      */
199     private void internalFlush(NextFilter nextFilter, IoSession session,
200             IoBuffer buf) throws Exception {
201         IoBuffer tmp = null;
202         synchronized (buf) {
203             buf.flip();
204             tmp = buf.duplicate();
205             buf.clear();
206         }
207         logger.debug("Flushing buffer: {}", tmp);
208         nextFilter.filterWrite(session, new DefaultWriteRequest(tmp));
209     }
210 
211     /**
212      * Flushes the buffered data.
213      * 
214      * @param session the session where buffer will be written
215      */
216     public void flush(IoSession session) {
217         try {
218             internalFlush(session.getFilterChain().getNextFilter(this),
219                     session, buffersMap.get(session));
220         } catch (Throwable e) {
221             session.getFilterChain().fireExceptionCaught(e);
222         }
223     }
224 
225     /**
226      * Internal method that actually frees the {@link IoBuffer} that contains
227      * the buffered data that has not been flushed.
228      * 
229      * @param session the session we operate on
230      */
231     private void free(IoSession session) {
232         IoBuffer buf = buffersMap.remove(session);
233         if (buf != null) {
234             buf.free();
235         }
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     @Override
242     public void exceptionCaught(NextFilter nextFilter, IoSession session,
243             Throwable cause) throws Exception {
244         free(session);
245     }
246 
247     /**
248      * {@inheritDoc}
249      */
250     @Override
251     public void sessionClosed(NextFilter nextFilter, IoSession session)
252             throws Exception {
253         free(session);
254     }
255 }