001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.compression;
021
022import java.io.IOException;
023
024import org.apache.mina.core.buffer.IoBuffer;
025import org.apache.mina.core.filterchain.IoFilter;
026import org.apache.mina.core.filterchain.IoFilterChain;
027import org.apache.mina.core.session.AttributeKey;
028import org.apache.mina.core.session.IoSession;
029import org.apache.mina.core.write.WriteRequest;
030import org.apache.mina.filter.util.WriteRequestFilter;
031
032/**
033 * An {@link IoFilter} which compresses all data using
034 * <a href="http://www.jcraft.com/jzlib/">JZlib</a>.
035 * Support for the LZW (DLCZ) algorithm is also planned.
036 * <p>
037 * This filter only supports compression using the <tt>PARTIAL FLUSH</tt> method,
038 * since that is the only method useful when doing stream level compression.
039 * <p>
040 * This filter supports compression/decompression of the input and output
041 * channels selectively.  It can also be enabled/disabled on the fly.
042 * <p>
043 * This filter does not discard the zlib objects, keeping them around for the
044 * entire life of the filter.  This is because the zlib dictionary needs to
045 * be built up over time, which is used during compression and decompression.
046 * Over time, as repetitive data is sent over the wire, the compression efficiency
047 * steadily increases.
048 * <p>
049 * Note that the zlib header is written only once. It is not necessary that
050 * the data received after processing by this filter may not be complete due
051 * to packet fragmentation.
052 * <p>
053 * It goes without saying that the other end of this stream should also have a
054 * compatible compressor/decompressor using the same algorithm.
055 *
056 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
057 */
058public class CompressionFilter extends WriteRequestFilter {
059    /**
060     * Max compression level.  Will give the highest compression ratio, but
061     * will also take more cpu time and is the slowest.
062     */
063    public static final int COMPRESSION_MAX = Zlib.COMPRESSION_MAX;
064
065    /**
066     * Provides the best speed at the price of a low compression ratio.
067     */
068    public static final int COMPRESSION_MIN = Zlib.COMPRESSION_MIN;
069
070    /**
071     * No compression done on the data.
072     */
073    public static final int COMPRESSION_NONE = Zlib.COMPRESSION_NONE;
074
075    /**
076     * The default compression level used. Provides the best balance
077     * between speed and compression
078     */
079    public static final int COMPRESSION_DEFAULT = Zlib.COMPRESSION_DEFAULT;
080
081    /**
082     * A session attribute that stores the {@link Zlib} object used for compression.
083     */
084    private final AttributeKey DEFLATER = new AttributeKey(getClass(), "deflater");
085
086    /**
087     * A session attribute that stores the {@link Zlib} object used for decompression.
088     */
089    private final AttributeKey INFLATER = new AttributeKey(getClass(), "inflater");
090
091    /**
092     * A flag that allows you to disable compression once.
093     */
094    public static final AttributeKey DISABLE_COMPRESSION_ONCE = new AttributeKey(CompressionFilter.class, "disableOnce");
095
096    private boolean compressInbound = true;
097
098    private boolean compressOutbound = true;
099
100    private int compressionLevel;
101
102    /**
103     * Creates a new instance which compresses outboud data and decompresses
104     * inbound data with default compression level.
105     */
106    public CompressionFilter() {
107        this(true, true, COMPRESSION_DEFAULT);
108    }
109
110    /**
111     * Creates a new instance which compresses outboud data and decompresses
112     * inbound data with the specified <tt>compressionLevel</tt>.
113     *
114     * @param compressionLevel the level of compression to be used. Must
115     *                         be one of {@link #COMPRESSION_DEFAULT},
116     *                         {@link #COMPRESSION_MAX},
117     *                         {@link #COMPRESSION_MIN}, and
118     *                         {@link #COMPRESSION_NONE}.
119     */
120    public CompressionFilter(final int compressionLevel) {
121        this(true, true, compressionLevel);
122    }
123
124    /**
125     * Creates a new instance.
126     *
127     * @param compressInbound <tt>true</tt> if data read is to be decompressed
128     * @param compressOutbound <tt>true</tt> if data written is to be compressed
129     * @param compressionLevel the level of compression to be used. Must
130     *                         be one of {@link #COMPRESSION_DEFAULT},
131     *                         {@link #COMPRESSION_MAX},
132     *                         {@link #COMPRESSION_MIN}, and
133     *                         {@link #COMPRESSION_NONE}.
134     */
135    public CompressionFilter(final boolean compressInbound, final boolean compressOutbound, final int compressionLevel) {
136        this.compressionLevel = compressionLevel;
137        this.compressInbound = compressInbound;
138        this.compressOutbound = compressOutbound;
139    }
140
141    @Override
142    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
143        if (!compressInbound || !(message instanceof IoBuffer)) {
144            nextFilter.messageReceived(session, message);
145            return;
146        }
147
148        Zlib inflater = (Zlib) session.getAttribute(INFLATER);
149        if (inflater == null) {
150            throw new IllegalStateException();
151        }
152
153        IoBuffer inBuffer = (IoBuffer) message;
154        IoBuffer outBuffer = inflater.inflate(inBuffer);
155        nextFilter.messageReceived(session, outBuffer);
156    }
157
158    /*
159     * @see org.apache.mina.core.IoFilter#filterWrite(org.apache.mina.core.IoFilter.NextFilter, org.apache.mina.core.IoSession, org.apache.mina.core.IoFilter.WriteRequest)
160     */
161    @Override
162    protected Object doFilterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)
163            throws IOException {
164        if (!compressOutbound) {
165            return null;
166        }
167
168        if (session.containsAttribute(DISABLE_COMPRESSION_ONCE)) {
169            // Remove the marker attribute because it is temporary.
170            session.removeAttribute(DISABLE_COMPRESSION_ONCE);
171            return null;
172        }
173
174        Zlib deflater = (Zlib) session.getAttribute(DEFLATER);
175        if (deflater == null) {
176            throw new IllegalStateException();
177        }
178
179        IoBuffer inBuffer = (IoBuffer) writeRequest.getMessage();
180        if (!inBuffer.hasRemaining()) {
181            // Ignore empty buffers
182            return null;
183        } else {
184            return deflater.deflate(inBuffer);
185        }
186    }
187
188    @Override
189    public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
190        if (parent.contains(CompressionFilter.class)) {
191            throw new IllegalStateException("Only one " + CompressionFilter.class + " is permitted.");
192        }
193
194        Zlib deflater = new Zlib(compressionLevel, Zlib.MODE_DEFLATER);
195        Zlib inflater = new Zlib(compressionLevel, Zlib.MODE_INFLATER);
196
197        IoSession session = parent.getSession();
198
199        session.setAttribute(DEFLATER, deflater);
200        session.setAttribute(INFLATER, inflater);
201    }
202
203    /**
204     * @return <tt>true</tt> if incoming data is being compressed.
205     */
206    public boolean isCompressInbound() {
207        return compressInbound;
208    }
209
210    /**
211     * Sets if incoming data has to be compressed.
212     * 
213     * @param compressInbound <tt>true</tt> if the incoming data has to be compressed
214     */
215    public void setCompressInbound(boolean compressInbound) {
216        this.compressInbound = compressInbound;
217    }
218
219    /**
220     * @return <tt>true</tt> if the filter is compressing data being written.
221     */
222    public boolean isCompressOutbound() {
223        return compressOutbound;
224    }
225
226    /**
227     * Set if outgoing data has to be compressed.
228     * 
229     * @param compressOutbound <tt>true</tt> if the outgoing data has to be compressed
230     */
231    public void setCompressOutbound(boolean compressOutbound) {
232        this.compressOutbound = compressOutbound;
233    }
234
235    @Override
236    public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
237        super.onPostRemove(parent, name, nextFilter);
238        IoSession session = parent.getSession();
239        if (session == null) {
240            return;
241        }
242
243        Zlib inflater = (Zlib) session.getAttribute(INFLATER);
244        Zlib deflater = (Zlib) session.getAttribute(DEFLATER);
245        if (deflater != null) {
246            deflater.cleanUp();
247        }
248
249        if (inflater != null) {
250            inflater.cleanUp();
251        }
252    }
253}