1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec.statemachine;
21
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.session.IoSession;
27 import org.apache.mina.filter.codec.ProtocolDecoder;
28 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
29
30
31
32
33
34
35
36
37
38
39
40 public class DecodingStateProtocolDecoder implements ProtocolDecoder {
41 private final DecodingState state;
42
43 private final Queue<IoBuffer> undecodedBuffers = new ConcurrentLinkedQueue<>();
44
45 private IoSession session;
46
47
48
49
50
51
52
53
54 public DecodingStateProtocolDecoder(DecodingState state) {
55 if (state == null) {
56 throw new IllegalArgumentException("state");
57 }
58 this.state = state;
59 }
60
61
62
63
64 @Override
65 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
66 if (this.session == null) {
67 this.session = session;
68 } else if (this.session != session) {
69 throw new IllegalStateException(getClass().getSimpleName() + " is a stateful decoder. "
70 + "You have to create one per session.");
71 }
72
73 undecodedBuffers.offer(in);
74
75 for (;;) {
76 IoBuffer b = undecodedBuffers.peek();
77 if (b == null) {
78 break;
79 }
80
81 int oldRemaining = b.remaining();
82 state.decode(b, out);
83 int newRemaining = b.remaining();
84
85 if (newRemaining != 0) {
86 if (oldRemaining == newRemaining) {
87 throw new IllegalStateException(DecodingState.class.getSimpleName() + " must "
88 + "consume at least one byte per decode().");
89 }
90 } else {
91 undecodedBuffers.poll();
92 }
93 }
94 }
95
96
97
98
99 @Override
100 public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
101 state.finishDecode(out);
102 }
103
104
105
106
107 @Override
108 public void dispose(IoSession session) throws Exception {
109
110 }
111 }