View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec.statemachine;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.filterchain.IoFilter.NextFilter;
27  import org.apache.mina.core.session.IoSession;
28  import org.apache.mina.filter.codec.ProtocolCodecFilter;
29  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  /**
34   * Abstract base class for decoder state machines. Calls init() to
35   * get the start {@link DecodingState} of the state machine. Calls 
36   * destroy() when the state machine has reached its end state or when
37   * the session is closed.
38   * <p>
39   * NOTE: The {@link ProtocolDecoderOutput} used by this class when calling 
40   * {@link DecodingState#decode(IoBuffer, ProtocolDecoderOutput)} buffers decoded
41   * messages in a {@link List}. Once the state machine has reached its end state
42   * this class will call finishDecode(List, ProtocolDecoderOutput). The 
43   * implementation will have to take care of writing the decoded messages to the 
44   * real {@link ProtocolDecoderOutput} used by the configured 
45   * {@link ProtocolCodecFilter}.
46   * </p>
47   * 
48   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
49   */
50  public abstract class DecodingStateMachine implements DecodingState {
51      private static final Logger LOGGER = LoggerFactory.getLogger(DecodingStateMachine.class);
52  
53      private final List<Object> childProducts = new ArrayList<>();
54  
55      private final ProtocolDecoderOutputerOutput.html#ProtocolDecoderOutput">ProtocolDecoderOutput childOutput = new ProtocolDecoderOutput() {
56          /**
57           * {@inheritDoc}
58           */
59          @Override
60          public void flush(NextFilter nextFilter, IoSession session) {
61              // Do nothing
62          }
63  
64          /**
65           * {@inheritDoc}
66           */
67          @Override
68          public void write(Object message) {
69              childProducts.add(message);
70          }
71      };
72  
73      private DecodingState currentState;
74  
75      private boolean initialized;
76  
77      /**
78       * Invoked to initialize this state machine.
79       * 
80       * @return the start {@link DecodingState}.
81       * @throws Exception if the initialization failed
82       */
83      protected abstract DecodingState init() throws Exception;
84  
85      /**
86       * Called once the state machine has reached its end.
87       * 
88       * @param childProducts contains the messages generated by each of the 
89       *        {@link DecodingState}s which were exposed to the received data 
90       *        during the life time of this state machine.
91       * @param out the real {@link ProtocolDecoderOutput} used by the 
92       *        {@link ProtocolCodecFilter}.
93       * @return the next state if the state machine should resume.
94       * @throws Exception if the decoding end failed
95       */
96      protected abstract DecodingState finishDecode(List<Object> childProducts, ProtocolDecoderOutput out)
97              throws Exception;
98  
99      /**
100      * Invoked to destroy this state machine once the end state has been reached
101      * or the session has been closed.
102      * 
103      * @throws Exception if the destruction failed
104      */
105     protected abstract void destroy() throws Exception;
106 
107     /**
108      * {@inheritDoc}
109      */
110     @Override
111     public DecodingState decode(IoBuffer in, ProtocolDecoderOutput out) throws Exception {
112         DecodingState state = getCurrentState();
113 
114         final int limit = in.limit();
115         int pos = in.position();
116 
117         try {
118             for (;;) {
119                 // Wait for more data if all data is consumed.
120                 if (pos == limit) {
121                     break;
122                 }
123 
124                 DecodingState oldState = state;
125                 state = state.decode(in, childOutput);
126 
127                 // If finished, call finishDecode
128                 if (state == null) {
129                     return finishDecode(childProducts, out);
130                 }
131 
132                 int newPos = in.position();
133 
134                 // Wait for more data if nothing is consumed and state didn't change.
135                 if (newPos == pos && oldState == state) {
136                     break;
137                 }
138                 pos = newPos;
139             }
140 
141             return this;
142         } catch (Exception e) {
143             state = null;
144             throw e;
145         } finally {
146             this.currentState = state;
147 
148             // Destroy if decoding is finished or failed.
149             if (state == null) {
150                 cleanup();
151             }
152         }
153     }
154 
155     /**
156      * {@inheritDoc}
157      */
158     @Override
159     public DecodingState finishDecode(ProtocolDecoderOutput out) throws Exception {
160         DecodingState nextState;
161         DecodingState state = getCurrentState();
162         try {
163             for (;;) {
164                 DecodingState oldState = state;
165                 state = state.finishDecode(childOutput);
166                 if (state == null) {
167                     // Finished
168                     break;
169                 }
170 
171                 // Exit if state didn't change.
172                 if (oldState == state) {
173                     break;
174                 }
175             }
176         } catch (Exception e) {
177             state = null;
178             
179             if (LOGGER.isDebugEnabled()) {
180                 LOGGER.debug("Ignoring the exception caused by a closed session.", e);
181             }
182         } finally {
183             this.currentState = state;
184             nextState = finishDecode(childProducts, out);
185             if (state == null) {
186                 cleanup();
187             }
188         }
189         return nextState;
190     }
191 
192     private void cleanup() {
193         if (!initialized) {
194             throw new IllegalStateException();
195         }
196 
197         initialized = false;
198         childProducts.clear();
199         try {
200             destroy();
201         } catch (Exception e2) {
202             if (LOGGER.isDebugEnabled()) {
203                 LOGGER.warn("Failed to destroy a decoding state machine.", e2);
204             }
205         }
206     }
207 
208     private DecodingState getCurrentState() throws Exception {
209         DecodingState state = this.currentState;
210         if (state == null) {
211             state = init();
212             initialized = true;
213         }
214         return state;
215     }
216 }