View Javadoc

1   /*
2    *   @(#) $Id: IoAdapter.java 210062 2005-07-11 03:52:38Z trustin $
3    *
4    *   Copyright 2004 The Apache Software Foundation
5    *
6    *   Licensed under the Apache License, Version 2.0 (the "License");
7    *   you may not use this file except in compliance with the License.
8    *   You may obtain a copy of the License at
9    *
10   *       http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing, software
13   *   distributed under the License is distributed on an "AS IS" BASIS,
14   *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   *   See the License for the specific language governing permissions and
16   *   limitations under the License.
17   *
18   */
19  package org.apache.mina.protocol.io;
20  
21  
22  import org.apache.mina.common.ByteBuffer;
23  import org.apache.mina.common.IdleStatus;
24  import org.apache.mina.io.IoHandler;
25  import org.apache.mina.io.IoSession;
26  import org.apache.mina.protocol.ProtocolCodecFactory;
27  import org.apache.mina.protocol.ProtocolDecoder;
28  import org.apache.mina.protocol.ProtocolEncoder;
29  import org.apache.mina.protocol.ProtocolHandler;
30  import org.apache.mina.protocol.ProtocolFilterChain;
31  import org.apache.mina.protocol.ProtocolProvider;
32  import org.apache.mina.protocol.ProtocolSession;
33  import org.apache.mina.protocol.ProtocolViolationException;
34  import org.apache.mina.util.Queue;
35  
36  /***
37   * Adapts the specified {@link ProtocolProvider} to {@link IoHandler}.
38   * This class is used by {@link IoProtocolAcceptor} and {@link IoProtocolConnector}
39   * internally.
40   * <p>
41   * It is a bridge between I/O layer and Protocol layer.  Protocol layer itself
42   * cannot do any real I/O, but it translates I/O events to more higher level
43   * ones and vice versa.
44   * 
45   * @author Trustin Lee (trustin@apache.org)
46   * @version $Rev: 210062 $, $Date: 2005-07-11 12:52:38 +0900 $
47   */
48  class IoAdapter
49  {
50      private static final String KEY = "IoAdapter.ProtocolSession";
51  
52      private final IoProtocolSessionManagerFilterChain managerFilterChain;
53  
54      IoAdapter( IoProtocolSessionManagerFilterChain filters )
55      {
56          this.managerFilterChain = filters;
57      }
58      
59      public ProtocolFilterChain getFilterChain()
60      {
61          return managerFilterChain;
62      }
63  
64      /***
65       * Converts the specified <code>protocolProvider</code> to {@link IoAdapter}
66       * to use for actual I/O.
67       * 
68       * @return a new I/O handler for the specified <code>protocolProvider</code>
69       */
70      public IoHandler adapt( ProtocolProvider protocolProvider )
71      {
72          return new SessionHandlerAdapter( protocolProvider );
73      }
74  
75      /***
76       * Returns {@link ProtocolSession} of the specified {@link IoSession}.
77       */
78      public ProtocolSession toProtocolSession( IoSession session )
79      {
80          IoHandler handler = session.getHandler();
81          if( handler instanceof SessionHandlerAdapter )
82          {
83              SessionHandlerAdapter sha = ( SessionHandlerAdapter ) handler;
84              return sha.getProtocolSession( session );
85          }
86          else
87          {
88              throw new IllegalArgumentException( "Not adapted from IoAdapter." );
89          }
90      }
91  
92      class SessionHandlerAdapter implements IoHandler
93      {
94          final ProtocolCodecFactory codecFactory;
95          final ProtocolHandler handler;
96  
97          public SessionHandlerAdapter( ProtocolProvider protocolProvider )
98          {
99              codecFactory = protocolProvider.getCodecFactory();
100             this.handler = protocolProvider.getHandler();
101         }
102         
103         public void sessionCreated( IoSession session ) throws Exception
104         {
105             handler.sessionCreated( getProtocolSession( session ) );
106         }
107 
108         public void sessionOpened( IoSession session )
109         {
110             managerFilterChain.sessionOpened( getProtocolSession( session ) );
111         }
112 
113         public void sessionClosed( IoSession session )
114         {
115             managerFilterChain.sessionClosed( getProtocolSession( session ) );
116         }
117 
118         public void sessionIdle( IoSession session, IdleStatus status )
119         {
120             managerFilterChain.sessionIdle( getProtocolSession( session ), status );
121         }
122 
123         public void exceptionCaught( IoSession session, Throwable cause )
124         {
125             managerFilterChain.exceptionCaught( getProtocolSession( session ), cause );
126         }
127 
128         public void dataRead( IoSession session, ByteBuffer in )
129         {
130             IoProtocolSession psession = getProtocolSession( session );
131             ProtocolDecoder decoder = psession.decoder;
132             try
133             {
134                 synchronized( decoder )
135                 {
136                     decoder.decode( psession, in, psession.decOut );
137                 }
138 
139                 Queue queue = psession.decOut.getMessageQueue();
140                 synchronized( queue )
141                 {
142                     if( !queue.isEmpty() )
143                     {
144                         do
145                         {
146                             managerFilterChain.messageReceived( psession, queue.pop() );
147                         }
148                         while( !queue.isEmpty() );
149                     }
150                 }
151             }
152             catch( ProtocolViolationException pve )
153             {
154                 pve.setBuffer( in );
155                 managerFilterChain.exceptionCaught( psession, pve );
156             }
157             catch( Throwable t )
158             {
159                 managerFilterChain.exceptionCaught( psession, t );
160             }
161         }
162 
163         public void dataWritten( IoSession session, Object marker )
164         {
165             if( marker == null )
166                 return;
167             managerFilterChain.messageSent( getProtocolSession( session ),
168                                  marker );
169         }
170 
171         void doWrite( IoSession session )
172         {
173             IoProtocolSession psession = getProtocolSession( session );
174             ProtocolEncoder encoder = psession.encoder;
175             Queue writeQueue = psession.writeQueue;
176 
177             if( writeQueue.isEmpty() )
178             {
179                 return;
180             }
181 
182             try
183             {
184                 while( !writeQueue.isEmpty() )
185                 {
186                     synchronized( writeQueue )
187                     {
188                         Object message = writeQueue.pop();
189                         if( message == null )
190                             break;
191 
192                         Queue queue = psession.encOut.getBufferQueue();
193                         encoder.encode( psession, message, psession.encOut );
194                         for( ;; )
195                         {
196                             ByteBuffer buf = ( ByteBuffer ) queue.pop();
197                             if( buf == null )
198                                 break;
199                             // use marker only if it is the last ByteBuffer
200                             Object marker = queue.isEmpty() ? message : null;
201                             session.write( buf, marker );
202                         }
203                     }
204                 }
205             }
206             catch( Throwable t )
207             {
208                 managerFilterChain.exceptionCaught( psession, t );
209             }
210         }
211 
212         private IoProtocolSession getProtocolSession( IoSession session )
213         {
214             IoProtocolSession psession =
215                 ( IoProtocolSession ) session.getAttribute( KEY );
216             if( psession == null )
217             {
218                 synchronized( session )
219                 {
220                     psession =
221                         ( IoProtocolSession ) session.getAttribute( KEY );
222                     if( psession == null )
223                     {
224                         psession = new IoProtocolSession(
225                                 IoAdapter.this.managerFilterChain, session, this );
226                         session.setAttribute( KEY, psession );
227                     }
228                 }
229             }
230 
231             return psession;
232         }
233     }
234 }