001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.codec.statemachine;
021
022import java.util.ArrayList;
023import java.util.List;
024
025import org.apache.mina.core.buffer.IoBuffer;
026import org.apache.mina.core.filterchain.IoFilter.NextFilter;
027import org.apache.mina.core.session.IoSession;
028import org.apache.mina.filter.codec.ProtocolCodecFilter;
029import org.apache.mina.filter.codec.ProtocolDecoderOutput;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Abstract base class for decoder state machines. Calls {@link #init()} to
035 * get the start {@link DecodingState} of the state machine. Calls 
036 * {@link #destroy()} when the state machine has reached its end state or when
037 * the session is closed.
038 * <p>
039 * NOTE: The {@link ProtocolDecoderOutput} used by this class when calling 
040 * {@link DecodingState#decode(IoBuffer, ProtocolDecoderOutput)} buffers decoded
041 * messages in a {@link List}. Once the state machine has reached its end state
042 * this class will call {@link #finishDecode(List, ProtocolDecoderOutput)}. The 
043 * implementation will have to take care of writing the decoded messages to the 
044 * real {@link ProtocolDecoderOutput} used by the configured 
045 * {@link ProtocolCodecFilter}.
046 * </p>
047 * 
048 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
049 */
050public abstract class DecodingStateMachine implements DecodingState {
051    private final Logger log = LoggerFactory.getLogger(DecodingStateMachine.class);
052
053    private final List<Object> childProducts = new ArrayList<Object>();
054
055    private final ProtocolDecoderOutput childOutput = new ProtocolDecoderOutput() {
056        public void flush(NextFilter nextFilter, IoSession session) {
057            // Do nothing
058        }
059
060        public void write(Object message) {
061            childProducts.add(message);
062        }
063    };
064
065    private DecodingState currentState;
066
067    private boolean initialized;
068
069    /**
070     * Invoked to initialize this state machine.
071     * 
072     * @return the start {@link DecodingState}.
073     * @throws Exception if the initialization failed
074     */
075    protected abstract DecodingState init() throws Exception;
076
077    /**
078     * Called once the state machine has reached its end.
079     * 
080     * @param childProducts contains the messages generated by each of the 
081     *        {@link DecodingState}s which were exposed to the received data 
082     *        during the life time of this state machine.
083     * @param out the real {@link ProtocolDecoderOutput} used by the 
084     *        {@link ProtocolCodecFilter}.
085     * @return the next state if the state machine should resume.
086     * @throws Exception if the decoding end failed
087     */
088    protected abstract DecodingState finishDecode(List<Object> childProducts, ProtocolDecoderOutput out)
089            throws Exception;
090
091    /**
092     * Invoked to destroy this state machine once the end state has been reached
093     * or the session has been closed.
094     * 
095     * @throws Exception if the destruction failed
096     */
097    protected abstract void destroy() throws Exception;
098
099    /**
100     * {@inheritDoc}
101     */
102    public DecodingState decode(IoBuffer in, ProtocolDecoderOutput out) throws Exception {
103        DecodingState state = getCurrentState();
104
105        final int limit = in.limit();
106        int pos = in.position();
107
108        try {
109            for (;;) {
110                // Wait for more data if all data is consumed.
111                if (pos == limit) {
112                    break;
113                }
114
115                DecodingState oldState = state;
116                state = state.decode(in, childOutput);
117
118                // If finished, call finishDecode
119                if (state == null) {
120                    return finishDecode(childProducts, out);
121                }
122
123                int newPos = in.position();
124
125                // Wait for more data if nothing is consumed and state didn't change.
126                if (newPos == pos && oldState == state) {
127                    break;
128                }
129                pos = newPos;
130            }
131
132            return this;
133        } catch (Exception e) {
134            state = null;
135            throw e;
136        } finally {
137            this.currentState = state;
138
139            // Destroy if decoding is finished or failed.
140            if (state == null) {
141                cleanup();
142            }
143        }
144    }
145
146    /**
147     * {@inheritDoc}
148     */
149    public DecodingState finishDecode(ProtocolDecoderOutput out) throws Exception {
150        DecodingState nextState;
151        DecodingState state = getCurrentState();
152        try {
153            for (;;) {
154                DecodingState oldState = state;
155                state = state.finishDecode(childOutput);
156                if (state == null) {
157                    // Finished
158                    break;
159                }
160
161                // Exit if state didn't change.
162                if (oldState == state) {
163                    break;
164                }
165            }
166        } catch (Exception e) {
167            state = null;
168            log.debug("Ignoring the exception caused by a closed session.", e);
169        } finally {
170            this.currentState = state;
171            nextState = finishDecode(childProducts, out);
172            if (state == null) {
173                cleanup();
174            }
175        }
176        return nextState;
177    }
178
179    private void cleanup() {
180        if (!initialized) {
181            throw new IllegalStateException();
182        }
183
184        initialized = false;
185        childProducts.clear();
186        try {
187            destroy();
188        } catch (Exception e2) {
189            log.warn("Failed to destroy a decoding state machine.", e2);
190        }
191    }
192
193    private DecodingState getCurrentState() throws Exception {
194        DecodingState state = this.currentState;
195        if (state == null) {
196            state = init();
197            initialized = true;
198        }
199        return state;
200    }
201}