1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.mina.protocol.io;
20
21
22 import org.apache.mina.common.ByteBuffer;
23 import org.apache.mina.common.IdleStatus;
24 import org.apache.mina.io.IoHandler;
25 import org.apache.mina.io.IoSession;
26 import org.apache.mina.protocol.ProtocolCodecFactory;
27 import org.apache.mina.protocol.ProtocolDecoder;
28 import org.apache.mina.protocol.ProtocolEncoder;
29 import org.apache.mina.protocol.ProtocolHandler;
30 import org.apache.mina.protocol.ProtocolFilterChain;
31 import org.apache.mina.protocol.ProtocolProvider;
32 import org.apache.mina.protocol.ProtocolSession;
33 import org.apache.mina.protocol.ProtocolViolationException;
34 import org.apache.mina.util.Queue;
35
36 /***
37 * Adapts the specified {@link ProtocolProvider} to {@link IoHandler}.
38 * This class is used by {@link IoProtocolAcceptor} and {@link IoProtocolConnector}
39 * internally.
40 * <p>
41 * It is a bridge between I/O layer and Protocol layer. Protocol layer itself
42 * cannot do any real I/O, but it translates I/O events to more higher level
43 * ones and vice versa.
44 *
45 * @author Trustin Lee (trustin@apache.org)
46 * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
47 */
48 class IoAdapter
49 {
50 private static final String KEY = "IoAdapter.ProtocolSession";
51
52 private final IoProtocolSessionManagerFilterChain managerFilterChain;
53
54 IoAdapter( IoProtocolSessionManagerFilterChain filters )
55 {
56 this.managerFilterChain = filters;
57 }
58
59 public ProtocolFilterChain getFilterChain()
60 {
61 return managerFilterChain;
62 }
63
64 /***
65 * Converts the specified <code>protocolProvider</code> to {@link IoAdapter}
66 * to use for actual I/O.
67 *
68 * @return a new I/O handler for the specified <code>protocolProvider</code>
69 */
70 public IoHandler adapt( ProtocolProvider protocolProvider )
71 {
72 return new SessionHandlerAdapter( protocolProvider );
73 }
74
75 /***
76 * Returns {@link ProtocolSession} of the specified {@link IoSession}.
77 */
78 public ProtocolSession toProtocolSession( IoSession session )
79 {
80 IoHandler handler = session.getHandler();
81 if( handler instanceof SessionHandlerAdapter )
82 {
83 SessionHandlerAdapter sha = ( SessionHandlerAdapter ) handler;
84 return sha.getProtocolSession( session );
85 }
86 else
87 {
88 throw new IllegalArgumentException( "Not adapted from IoAdapter." );
89 }
90 }
91
92 class SessionHandlerAdapter implements IoHandler
93 {
94 final ProtocolCodecFactory codecFactory;
95 final ProtocolHandler handler;
96
97 public SessionHandlerAdapter( ProtocolProvider protocolProvider )
98 {
99 codecFactory = protocolProvider.getCodecFactory();
100 this.handler = protocolProvider.getHandler();
101 }
102
103 public void sessionCreated( IoSession session ) throws Exception
104 {
105 handler.sessionCreated( getProtocolSession( session ) );
106 }
107
108 public void sessionOpened( IoSession session )
109 {
110 managerFilterChain.sessionOpened( getProtocolSession( session ) );
111 }
112
113 public void sessionClosed( IoSession session )
114 {
115 managerFilterChain.sessionClosed( getProtocolSession( session ) );
116 }
117
118 public void sessionIdle( IoSession session, IdleStatus status )
119 {
120 managerFilterChain.sessionIdle( getProtocolSession( session ), status );
121 }
122
123 public void exceptionCaught( IoSession session, Throwable cause )
124 {
125 managerFilterChain.exceptionCaught( getProtocolSession( session ), cause );
126 }
127
128 public void dataRead( IoSession session, ByteBuffer in )
129 {
130 IoProtocolSession psession = getProtocolSession( session );
131 ProtocolDecoder decoder = psession.decoder;
132 try
133 {
134 synchronized( decoder )
135 {
136 decoder.decode( psession, in, psession.decOut );
137 }
138
139 Queue queue = psession.decOut.getMessageQueue();
140 synchronized( queue )
141 {
142 if( !queue.isEmpty() )
143 {
144 do
145 {
146 managerFilterChain.messageReceived( psession, queue.pop() );
147 }
148 while( !queue.isEmpty() );
149 }
150 }
151 }
152 catch( ProtocolViolationException pve )
153 {
154 pve.setBuffer( in );
155 managerFilterChain.exceptionCaught( psession, pve );
156 }
157 catch( Throwable t )
158 {
159 managerFilterChain.exceptionCaught( psession, t );
160 }
161 }
162
163 public void dataWritten( IoSession session, Object marker )
164 {
165 if( marker == null )
166 return;
167 managerFilterChain.messageSent( getProtocolSession( session ),
168 marker );
169 }
170
171 void doWrite( IoSession session )
172 {
173 IoProtocolSession psession = getProtocolSession( session );
174 ProtocolEncoder encoder = psession.encoder;
175 Queue writeQueue = psession.writeQueue;
176
177 if( writeQueue.isEmpty() )
178 {
179 return;
180 }
181
182 try
183 {
184 while( !writeQueue.isEmpty() )
185 {
186 synchronized( writeQueue )
187 {
188 Object message = writeQueue.pop();
189 if( message == null )
190 break;
191
192 Queue queue = psession.encOut.getBufferQueue();
193 encoder.encode( psession, message, psession.encOut );
194 for( ;; )
195 {
196 ByteBuffer buf = ( ByteBuffer ) queue.pop();
197 if( buf == null )
198 break;
199
200 Object marker = queue.isEmpty() ? message : null;
201 session.write( buf, marker );
202 }
203 }
204 }
205 }
206 catch( Throwable t )
207 {
208 managerFilterChain.exceptionCaught( psession, t );
209 }
210 }
211
212 private IoProtocolSession getProtocolSession( IoSession session )
213 {
214 IoProtocolSession psession =
215 ( IoProtocolSession ) session.getAttribute( KEY );
216 if( psession == null )
217 {
218 synchronized( session )
219 {
220 psession =
221 ( IoProtocolSession ) session.getAttribute( KEY );
222 if( psession == null )
223 {
224 psession = new IoProtocolSession(
225 IoAdapter.this.managerFilterChain, session, this );
226 session.setAttribute( KEY, psession );
227 }
228 }
229 }
230
231 return psession;
232 }
233 }
234 }