001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.codec;
021
022import org.apache.mina.core.buffer.IoBuffer;
023import org.apache.mina.core.service.TransportMetadata;
024import org.apache.mina.core.session.AttributeKey;
025import org.apache.mina.core.session.IoSession;
026
027/**
028 * A {@link ProtocolDecoder} that cumulates the content of received buffers to a
029 * <em>cumulative buffer</em> to help users implement decoders.
030 * <p>
031 * If the received {@link IoBuffer} is only a part of a message. decoders should
032 * cumulate received buffers to make a message complete or to postpone decoding
033 * until more buffers arrive.
034 * <p>
035 * Here is an example decoder that decodes CRLF terminated lines into
036 * <code>Command</code> objects:
037 * 
038 * <pre>
039 * public class CrLfTerminatedCommandLineDecoder extends CumulativeProtocolDecoder {
040 * 
041 *     private Command parseCommand(IoBuffer in) {
042 *         // Convert the bytes in the specified buffer to a
043 *         // Command object.
044 *         ...
045 *     }
046 * 
047 *     protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
048 * 
049 *         // Remember the initial position.
050 *         int start = in.position();
051 * 
052 *         // Now find the first CRLF in the buffer.
053 *         byte previous = 0;
054 *         while (in.hasRemaining()) {
055 *             byte current = in.get();
056 * 
057 *             if (previous == '\r' &amp;&amp; current == '\n') {
058 *                 // Remember the current position and limit.
059 *                 int position = in.position();
060 *                 int limit = in.limit();
061 *                 try {
062 *                     in.position(start);
063 *                     in.limit(position);
064 *                     // The bytes between in.position() and in.limit()
065 *                     // now contain a full CRLF terminated line.
066 *                     out.write(parseCommand(in.slice()));
067 *                 } finally {
068 *                     // Set the position to point right after the
069 *                     // detected line and set the limit to the old
070 *                     // one.
071 *                     in.position(position);
072 *                     in.limit(limit);
073 *                 }
074 *                 // Decoded one line; CumulativeProtocolDecoder will
075 *                 // call me again until I return false. So just
076 *                 // return true until there are no more lines in the
077 *                 // buffer.
078 *                 return true;
079 *             }
080 * 
081 *             previous = current;
082 *         }
083 * 
084 *         // Could not find CRLF in the buffer. Reset the initial
085 *         // position to the one we recorded above.
086 *         in.position(start);
087 * 
088 *         return false;
089 *     }
090 * }
091 * </pre>
092 * <p>
093 * Please note that this decoder simply forward the call to
094 * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
095 * underlying transport doesn't have a packet fragmentation. Whether the
096 * transport has fragmentation or not is determined by querying
097 * {@link TransportMetadata}.
098 *
099 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
100 */
101public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
102
103    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
104
105    /**
106     * Creates a new instance.
107     */
108    protected CumulativeProtocolDecoder() {
109        // Do nothing
110    }
111
112    /**
113     * Cumulates content of <tt>in</tt> into internal buffer and forwards
114     * decoding request to
115     * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
116     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
117     * and the cumulative buffer is compacted after decoding ends.
118     *
119     * @throws IllegalStateException
120     *             if your <tt>doDecode()</tt> returned <tt>true</tt> not
121     *             consuming the cumulative buffer.
122     */
123    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
124        if (!session.getTransportMetadata().hasFragmentation()) {
125            while (in.hasRemaining()) {
126                if (!doDecode(session, in, out)) {
127                    break;
128                }
129            }
130
131            return;
132        }
133
134        boolean usingSessionBuffer = true;
135        IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
136        // If we have a session buffer, append data to that; otherwise
137        // use the buffer read from the network directly.
138        if (buf != null) {
139            boolean appended = false;
140            // Make sure that the buffer is auto-expanded.
141            if (buf.isAutoExpand()) {
142                try {
143                    buf.put(in);
144                    appended = true;
145                } catch (IllegalStateException e) {
146                    // A user called derivation method (e.g. slice()),
147                    // which disables auto-expansion of the parent buffer.
148                } catch (IndexOutOfBoundsException e) {
149                    // A user disabled auto-expansion.
150                }
151            }
152
153            if (appended) {
154                buf.flip();
155            } else {
156                // Reallocate the buffer if append operation failed due to
157                // derivation or disabled auto-expansion.
158                buf.flip();
159                IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
160                newBuf.order(buf.order());
161                newBuf.put(buf);
162                newBuf.put(in);
163                newBuf.flip();
164                buf = newBuf;
165
166                // Update the session attribute.
167                session.setAttribute(BUFFER, buf);
168            }
169        } else {
170            buf = in;
171            usingSessionBuffer = false;
172        }
173
174        for (;;) {
175            int oldPos = buf.position();
176            boolean decoded = doDecode(session, buf, out);
177            if (decoded) {
178                if (buf.position() == oldPos) {
179                    throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
180                }
181
182                if (!buf.hasRemaining()) {
183                    break;
184                }
185            } else {
186                break;
187            }
188        }
189
190        // if there is any data left that cannot be decoded, we store
191        // it in a buffer in the session and next time this decoder is
192        // invoked the session buffer gets appended to
193        if (buf.hasRemaining()) {
194            if (usingSessionBuffer && buf.isAutoExpand()) {
195                buf.compact();
196            } else {
197                storeRemainingInSession(buf, session);
198            }
199        } else {
200            if (usingSessionBuffer) {
201                removeSessionBuffer(session);
202            }
203        }
204    }
205
206    /**
207     * Implement this method to consume the specified cumulative buffer and
208     * decode its content into message(s).
209     *
210     * @param session The current Session
211     * @param in the cumulative buffer
212     * @param out The {@link ProtocolDecoderOutput} that will receive the decoded message
213     * @return <tt>true</tt> if and only if there's more to decode in the buffer
214     *         and you want to have <tt>doDecode</tt> method invoked again.
215     *         Return <tt>false</tt> if remaining data is not enough to decode,
216     *         then this method will be invoked again when more data is
217     *         cumulated.
218     * @throws Exception if cannot decode <tt>in</tt>.
219     */
220    protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
221
222    /**
223     * Releases the cumulative buffer used by the specified <tt>session</tt>.
224     * Please don't forget to call <tt>super.dispose( session )</tt> when you
225     * override this method.
226     */
227    @Override
228    public void dispose(IoSession session) throws Exception {
229        removeSessionBuffer(session);
230    }
231
232    private void removeSessionBuffer(IoSession session) {
233        session.removeAttribute(BUFFER);
234    }
235
236    private void storeRemainingInSession(IoBuffer buf, IoSession session) {
237        final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true);
238
239        remainingBuf.order(buf.order());
240        remainingBuf.put(buf);
241
242        session.setAttribute(BUFFER, remainingBuf);
243    }
244}