View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec.statemachine;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
27  import org.apache.mina.core.session.IoSession;
28  import org.apache.mina.filter.codec.ProtocolCodecFilter;
29  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  /**
34   * Abstract base class for decoder state machines. Calls {@link #init()} to
35   * get the start {@link DecodingState} of the state machine. Calls 
36   * {@link #destroy()} when the state machine has reached its end state or when
37   * the session is closed.
38   * <p>
39   * NOTE: The {@link ProtocolDecoderOutput} used by this class when calling 
40   * {@link DecodingState#decode(IoBuffer, ProtocolDecoderOutput)} buffers decoded
41   * messages in a {@link List}. Once the state machine has reached its end state
42   * this class will call {@link #finishDecode(List, ProtocolDecoderOutput)}. The 
43   * implementation will have to take care of writing the decoded messages to the 
44   * real {@link ProtocolDecoderOutput} used by the configured 
45   * {@link ProtocolCodecFilter}.
46   * </p>
47   * 
48   * @author The Apache MINA Project (dev@mina.apache.org)
49   */
50  public abstract class DecodingStateMachine implements DecodingState {
51      private final Logger log = LoggerFactory
52              .getLogger(DecodingStateMachine.class);
53  
54      private final List<Object> childProducts = new ArrayList<Object>();
55  
56      private final ProtocolDecoderOutput childOutput = new ProtocolDecoderOutput() {
57          public void flush(NextFilter nextFilter, IoSession session) {
58              // Do nothing
59          }
60  
61          public void write(Object message) {
62              childProducts.add(message);
63          }
64      };
65  
66      private DecodingState currentState;
67      private boolean initialized;
68  
69      /**
70       * Invoked to initialize this state machine.
71       * 
72       * @return the start {@link DecodingState}.
73       */
74      protected abstract DecodingState init() throws Exception;
75  
76      /**
77       * Called once the state machine has reached its end.
78       * 
79       * @param childProducts contains the messages generated by each of the 
80       *        {@link DecodingState}s which were exposed to the received data 
81       *        during the life time of this state machine.
82       * @param out the real {@link ProtocolDecoderOutput} used by the 
83       *        {@link ProtocolCodecFilter}.
84       * @return the next state if the state machine should resume.
85       */
86      protected abstract DecodingState finishDecode(List<Object> childProducts,
87              ProtocolDecoderOutput out) throws Exception;
88  
89      /**
90       * Invoked to destroy this state machine once the end state has been reached
91       * or the session has been closed.
92       */
93      protected abstract void destroy() throws Exception;
94  
95      /**
96       * {@inheritDoc}
97       */
98      public DecodingState decode(IoBuffer in, ProtocolDecoderOutput out)
99              throws Exception {
100         DecodingState state = getCurrentState();
101 
102         final int limit = in.limit();
103         int pos = in.position();
104 
105         try {
106             for (;;) {
107                 // Wait for more data if all data is consumed.
108                 if (pos == limit) {
109                     break;
110                 }
111 
112                 DecodingState oldState = state;
113                 state = state.decode(in, childOutput);
114 
115                 // If finished, call finishDecode
116                 if (state == null) {
117                     return finishDecode(childProducts, out);
118                 }
119 
120                 int newPos = in.position();
121 
122                 // Wait for more data if nothing is consumed and state didn't change.
123                 if (newPos == pos && oldState == state) {
124                     break;
125                 }
126                 pos = newPos;
127             }
128 
129             return this;
130         } catch (Exception e) {
131             state = null;
132             throw e;
133         } finally {
134             this.currentState = state;
135 
136             // Destroy if decoding is finished or failed.
137             if (state == null) {
138                 cleanup();
139             }
140         }
141     }
142 
143     /**
144      * {@inheritDoc}
145      */
146     public DecodingState finishDecode(ProtocolDecoderOutput out)
147             throws Exception {
148         DecodingState nextState;
149         DecodingState state = getCurrentState();
150         try {
151             for (;;) {
152                 DecodingState oldState = state;
153                 state = state.finishDecode(childOutput);
154                 if (state == null) {
155                     // Finished
156                     break;
157                 }
158     
159                 // Exit if state didn't change.
160                 if (oldState == state) {
161                     break;
162                 }
163             }
164         } catch (Exception e) {
165             state = null;
166             log.debug(
167                     "Ignoring the exception caused by a closed session.", e);
168         } finally {
169             this.currentState = state;
170             nextState = finishDecode(childProducts, out);
171             if (state == null) {
172                 cleanup();
173             }
174         }
175         return nextState;
176     }
177 
178     private void cleanup() {
179         if (!initialized) {
180             throw new IllegalStateException();
181         }
182         
183         initialized = false;
184         childProducts.clear();
185         try {
186             destroy();
187         } catch (Exception e2) {
188             log.warn("Failed to destroy a decoding state machine.", e2);
189         }
190     }
191 
192     private DecodingState getCurrentState() throws Exception {
193         DecodingState state = this.currentState;
194         if (state == null) {
195             state = init();
196             initialized = true;
197         }
198         return state;
199     }
200 }