View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import org.apache.mina.core.buffer.IoBuffer;
23  import org.apache.mina.core.service.TransportMetadata;
24  import org.apache.mina.core.session.AttributeKey;
25  import org.apache.mina.core.session.IoSession;
26  
27  /**
28   * A {@link ProtocolDecoder} that cumulates the content of received
29   * buffers to a <em>cumulative buffer</em> to help users implement decoders.
30   * <p>
31   * If the received {@link IoBuffer} is only a part of a message.
32   * decoders should cumulate received buffers to make a message complete or
33   * to postpone decoding until more buffers arrive.
34   * <p>
35   * Here is an example decoder that decodes CRLF terminated lines into
36   * <code>Command</code> objects:
37   * <pre>
38   * public class CrLfTerminatedCommandLineDecoder
39   *         extends CumulativeProtocolDecoder {
40   *
41   *     private Command parseCommand(IoBuffer in) {
42   *         // Convert the bytes in the specified buffer to a
43   *         // Command object.
44   *         ...
45   *     }
46   *
47   *     protected boolean doDecode(
48   *             IoSession session, IoBuffer in, ProtocolDecoderOutput out)
49   *             throws Exception {
50   *
51   *         // Remember the initial position.
52   *         int start = in.position();
53   *
54   *         // Now find the first CRLF in the buffer.
55   *         byte previous = 0;
56   *         while (in.hasRemaining()) {
57   *             byte current = in.get();
58   *
59   *             if (previous == '\r' && current == '\n') {
60   *                 // Remember the current position and limit.
61   *                 int position = in.position();
62   *                 int limit = in.limit();
63   *                 try {
64   *                     in.position(start);
65   *                     in.limit(position);
66   *                     // The bytes between in.position() and in.limit()
67   *                     // now contain a full CRLF terminated line.
68   *                     out.write(parseCommand(in.slice()));
69   *                 } finally {
70   *                     // Set the position to point right after the
71   *                     // detected line and set the limit to the old
72   *                     // one.
73   *                     in.position(position);
74   *                     in.limit(limit);
75   *                 }
76   *                 // Decoded one line; CumulativeProtocolDecoder will
77   *                 // call me again until I return false. So just
78   *                 // return true until there are no more lines in the
79   *                 // buffer.
80   *                 return true;
81   *             }
82   *
83   *             previous = current;
84   *         }
85   *
86   *         // Could not find CRLF in the buffer. Reset the initial
87   *         // position to the one we recorded above.
88   *         in.position(start);
89   *
90   *         return false;
91   *     }
92   * }
93   * </pre>
94   * <p>
95   * Please note that this decoder simply forward the call to
96   * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
97   * underlying transport doesn't have a packet fragmentation.  Whether the
98   * transport has fragmentation or not is determined by querying
99   * {@link TransportMetadata}.
100  *
101  * @author The Apache MINA Project (dev@mina.apache.org)
102  * @version $Rev: 680990 $, $Date: 2008-07-30 13:58:19 +0200 (mer, 30 jui 2008) $
103  */
104 public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
105 
106     private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
107 
108     /**
109      * Creates a new instance.
110      */
111     protected CumulativeProtocolDecoder() {
112     }
113 
114     /**
115      * Cumulates content of <tt>in</tt> into internal buffer and forwards
116      * decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
117      * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
118      * and the cumulative buffer is compacted after decoding ends.
119      *
120      * @throws IllegalStateException if your <tt>doDecode()</tt> returned
121      *                               <tt>true</tt> not consuming the cumulative buffer.
122      */
123     public void decode(IoSession session, IoBuffer in,
124             ProtocolDecoderOutput out) throws Exception {
125         if (!session.getTransportMetadata().hasFragmentation()) {
126             doDecode(session, in, out);
127             return;
128         }
129 
130         boolean usingSessionBuffer = true;
131         IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
132         // If we have a session buffer, append data to that; otherwise
133         // use the buffer read from the network directly.
134         if (buf != null) {
135             boolean appended = false;
136             // Make sure that the buffer is auto-expanded.
137             if (buf.isAutoExpand()) {
138                 try {
139                     buf.put(in);
140                     appended = true;
141                 } catch (IllegalStateException e) {
142                     // A user called derivation method (e.g. slice()),
143                     // which disables auto-expansion of the parent buffer.
144                 } catch (IndexOutOfBoundsException e) {
145                     // A user disabled auto-expansion.
146                 }
147             }
148 
149             if (appended) {
150                 buf.flip();
151             } else {
152                 // Reallocate the buffer if append operation failed due to
153                 // derivation or disabled auto-expansion.
154                 buf.flip();
155                 IoBuffer newBuf = IoBuffer.allocate(
156                         buf.remaining() + in.remaining()).setAutoExpand(true);
157                 newBuf.order(buf.order());
158                 newBuf.put(buf);
159                 newBuf.put(in);
160                 newBuf.flip();
161                 buf = newBuf;
162 
163                 // Update the session attribute.
164                 session.setAttribute(BUFFER, buf);
165             }
166         } else {
167             buf = in;
168             usingSessionBuffer = false;
169         }
170 
171         for (;;) {
172             int oldPos = buf.position();
173             boolean decoded = doDecode(session, buf, out);
174             if (decoded) {
175                 if (buf.position() == oldPos) {
176                     throw new IllegalStateException(
177                             "doDecode() can't return true when buffer is not consumed.");
178                 }
179 
180                 if (!buf.hasRemaining()) {
181                     break;
182                 }
183             } else {
184                 break;
185             }
186         }
187 
188         // if there is any data left that cannot be decoded, we store
189         // it in a buffer in the session and next time this decoder is
190         // invoked the session buffer gets appended to
191         if (buf.hasRemaining()) {
192             if (usingSessionBuffer && buf.isAutoExpand()) {
193                 buf.compact();
194             } else {
195                 storeRemainingInSession(buf, session);
196             }
197         } else {
198             if (usingSessionBuffer) {
199                 removeSessionBuffer(session);
200             }
201         }
202     }
203 
204     /**
205      * Implement this method to consume the specified cumulative buffer and
206      * decode its content into message(s).
207      *
208      * @param in the cumulative buffer
209      * @return <tt>true</tt> if and only if there's more to decode in the buffer
210      *         and you want to have <tt>doDecode</tt> method invoked again.
211      *         Return <tt>false</tt> if remaining data is not enough to decode,
212      *         then this method will be invoked again when more data is cumulated.
213      * @throws Exception if cannot decode <tt>in</tt>.
214      */
215     protected abstract boolean doDecode(IoSession session, IoBuffer in,
216             ProtocolDecoderOutput out) throws Exception;
217 
218     /**
219      * Releases the cumulative buffer used by the specified <tt>session</tt>.
220      * Please don't forget to call <tt>super.dispose( session )</tt> when
221      * you override this method.
222      */
223     @Override
224     public void dispose(IoSession session) throws Exception {
225         removeSessionBuffer(session);
226     }
227 
228     private void removeSessionBuffer(IoSession session) {
229         session.removeAttribute(BUFFER);
230     }
231 
232     private void storeRemainingInSession(IoBuffer buf, IoSession session) {
233         final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true);
234 
235         remainingBuf.order(buf.order());
236         remainingBuf.put(buf);
237 
238         session.setAttribute(BUFFER, remainingBuf);
239     }
240 }