View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.DefaultWriteRequest;
35  import org.apache.mina.core.write.NothingWrittenException;
36  import org.apache.mina.core.write.WriteRequest;
37  import org.apache.mina.core.write.WriteRequestWrapper;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message object and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author The Apache MINA Project (dev@mina.apache.org)
47   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
48   */
49  public class ProtocolCodecFilter extends IoFilterAdapter {
50  
51      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
52      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
53  
54      private final AttributeKey ENCODER = new AttributeKey(getClass(), "encoder");
55      private final AttributeKey DECODER = new AttributeKey(getClass(), "decoder");
56      private final AttributeKey DECODER_OUT = new AttributeKey(getClass(), "decoderOut");
57      private final ProtocolCodecFactory factory;
58  
59      private final Logger logger = LoggerFactory.getLogger(getClass());
60  
61      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
62          if (factory == null) {
63              throw new NullPointerException("factory");
64          }
65          this.factory = factory;
66      }
67  
68      public ProtocolCodecFilter(final ProtocolEncoder encoder,
69              final ProtocolDecoder decoder) {
70          if (encoder == null) {
71              throw new NullPointerException("encoder");
72          }
73          if (decoder == null) {
74              throw new NullPointerException("decoder");
75          }
76  
77          this.factory = new ProtocolCodecFactory() {
78              public ProtocolEncoder getEncoder(IoSession session) {
79                  return encoder;
80              }
81  
82              public ProtocolDecoder getDecoder(IoSession session) {
83                  return decoder;
84              }
85          };
86      }
87  
88      public ProtocolCodecFilter(
89              final Class<? extends ProtocolEncoder> encoderClass,
90              final Class<? extends ProtocolDecoder> decoderClass) {
91          if (encoderClass == null) {
92              throw new NullPointerException("encoderClass");
93          }
94          if (decoderClass == null) {
95              throw new NullPointerException("decoderClass");
96          }
97          if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
98              throw new IllegalArgumentException("encoderClass: "
99                      + encoderClass.getName());
100         }
101         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
102             throw new IllegalArgumentException("decoderClass: "
103                     + decoderClass.getName());
104         }
105         try {
106             encoderClass.getConstructor(EMPTY_PARAMS);
107         } catch (NoSuchMethodException e) {
108             throw new IllegalArgumentException(
109                     "encoderClass doesn't have a public default constructor.");
110         }
111         try {
112             decoderClass.getConstructor(EMPTY_PARAMS);
113         } catch (NoSuchMethodException e) {
114             throw new IllegalArgumentException(
115                     "decoderClass doesn't have a public default constructor.");
116         }
117 
118         this.factory = new ProtocolCodecFactory() {
119             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
120                 return encoderClass.newInstance();
121             }
122 
123             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
124                 return decoderClass.newInstance();
125             }
126         };
127     }
128 
129     public ProtocolEncoder getEncoder(IoSession session) {
130         return (ProtocolEncoder) session.getAttribute(ENCODER);
131     }
132 
133     public ProtocolDecoder getDecoder(IoSession session) {
134         return (ProtocolDecoder) session.getAttribute(DECODER);
135     }
136 
137     @Override
138     public void onPreAdd(IoFilterChain parent, String name,
139             NextFilter nextFilter) throws Exception {
140         if (parent.contains(this)) {
141             throw new IllegalArgumentException(
142                     "You can't add the same filter instance more than once.  Create another instance and add it.");
143         }
144     }
145 
146     @Override
147     public void onPostRemove(IoFilterChain parent, String name,
148             NextFilter nextFilter) throws Exception {
149         disposeEncoder(parent.getSession());
150         disposeDecoder(parent.getSession());
151         disposeDecoderOut(parent.getSession());
152     }
153 
154     @Override
155     public void messageReceived(NextFilter nextFilter, IoSession session,
156             Object message) throws Exception {
157         if (!(message instanceof IoBuffer)) {
158             nextFilter.messageReceived(session, message);
159             return;
160         }
161 
162         IoBuffer in = (IoBuffer) message;
163         ProtocolDecoder decoder = getDecoder0(session);
164         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
165 
166         while (in.hasRemaining()) {
167             int oldPos = in.position();
168             try {
169                 synchronized (decoderOut) {
170                     decoder.decode(session, in, decoderOut);
171                 }
172                 // Finish decoding if no exception was thrown.
173                 decoderOut.flush();
174                 break;
175             } catch (Throwable t) {
176                 ProtocolDecoderException pde;
177                 if (t instanceof ProtocolDecoderException) {
178                     pde = (ProtocolDecoderException) t;
179                 } else {
180                     pde = new ProtocolDecoderException(t);
181                 }
182                 
183                 if (pde.getHexdump() == null) {
184                     int curPos = in.position();
185                     in.position(oldPos);
186                     pde.setHexdump(in.getHexDump());
187                     in.position(curPos);
188                 }
189 
190                 // Fire the exceptionCaught event.
191                 decoderOut.flush();
192                 nextFilter.exceptionCaught(session, pde);
193 
194                 // Retry only if the type of the caught exception is
195                 // recoverable and the buffer position has changed.
196                 // We check buffer position additionally to prevent an
197                 // infinite loop.
198                 if (!(t instanceof RecoverableProtocolDecoderException) ||
199                         in.position() == oldPos) {
200                     break;
201                 }
202             }
203         }
204     }
205 
206     @Override
207     public void messageSent(NextFilter nextFilter, IoSession session,
208             WriteRequest writeRequest) throws Exception {
209         if (writeRequest instanceof EncodedWriteRequest) {
210             return;
211         }
212 
213         if (!(writeRequest instanceof MessageWriteRequest)) {
214             nextFilter.messageSent(session, writeRequest);
215             return;
216         }
217 
218         MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
219         nextFilter.messageSent(session, wrappedRequest.getParentRequest());
220     }
221 
222     @Override
223     public void filterWrite(NextFilter nextFilter, IoSession session,
224             WriteRequest writeRequest) throws Exception {
225         Object message = writeRequest.getMessage();
226         if (message instanceof IoBuffer || message instanceof FileRegion) {
227             nextFilter.filterWrite(session, writeRequest);
228             return;
229         }
230 
231         ProtocolEncoder encoder = getEncoder0(session);
232         ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
233                 nextFilter, writeRequest);
234 
235         try {
236             encoder.encode(session, message, encoderOut);
237             encoderOut.flushWithoutFuture();
238             nextFilter.filterWrite(session, new MessageWriteRequest(
239                     writeRequest));
240         } catch (Throwable t) {
241             ProtocolEncoderException pee;
242             if (t instanceof ProtocolEncoderException) {
243                 pee = (ProtocolEncoderException) t;
244             } else {
245                 pee = new ProtocolEncoderException(t);
246             }
247             throw pee;
248         }
249     }
250 
251     @Override
252     public void sessionClosed(NextFilter nextFilter, IoSession session)
253             throws Exception {
254         // Call finishDecode() first when a connection is closed.
255         ProtocolDecoder decoder = getDecoder0(session);
256         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
257         try {
258             decoder.finishDecode(session, decoderOut);
259         } catch (Throwable t) {
260             ProtocolDecoderException pde;
261             if (t instanceof ProtocolDecoderException) {
262                 pde = (ProtocolDecoderException) t;
263             } else {
264                 pde = new ProtocolDecoderException(t);
265             }
266             throw pde;
267         } finally {
268             // Dispose all.
269             disposeEncoder(session);
270             disposeDecoder(session);
271             disposeDecoderOut(session);
272             decoderOut.flush();
273         }
274 
275         nextFilter.sessionClosed(session);
276     }
277 
278     private ProtocolEncoder getEncoder0(IoSession session) throws Exception {
279         ProtocolEncoder encoder = (ProtocolEncoder) session
280                 .getAttribute(ENCODER);
281         if (encoder == null) {
282             encoder = factory.getEncoder(session);
283             session.setAttribute(ENCODER, encoder);
284         }
285         return encoder;
286     }
287 
288     private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
289             NextFilter nextFilter, WriteRequest writeRequest) {
290         return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
291     }
292 
293     private ProtocolDecoder getDecoder0(IoSession session) throws Exception {
294         ProtocolDecoder decoder = (ProtocolDecoder) session
295                 .getAttribute(DECODER);
296         if (decoder == null) {
297             decoder = factory.getDecoder(session);
298             session.setAttribute(DECODER, decoder);
299         }
300         return decoder;
301     }
302 
303     private ProtocolDecoderOutput getDecoderOut(IoSession session,
304             NextFilter nextFilter) {
305         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
306         if (out == null) {
307             out = new ProtocolDecoderOutputImpl(session, nextFilter);
308             session.setAttribute(DECODER_OUT, out);
309         }
310         return out;
311     }
312 
313     private void disposeEncoder(IoSession session) {
314         ProtocolEncoder encoder = (ProtocolEncoder) session
315                 .removeAttribute(ENCODER);
316         if (encoder == null) {
317             return;
318         }
319 
320         try {
321             encoder.dispose(session);
322         } catch (Throwable t) {
323             logger.warn(
324                     "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
325         }
326     }
327 
328     private void disposeDecoder(IoSession session) {
329         ProtocolDecoder decoder = (ProtocolDecoder) session
330                 .removeAttribute(DECODER);
331         if (decoder == null) {
332             return;
333         }
334 
335         try {
336             decoder.dispose(session);
337         } catch (Throwable t) {
338             logger.warn(
339                     "Falied to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
340         }
341     }
342 
343     private void disposeDecoderOut(IoSession session) {
344         session.removeAttribute(DECODER_OUT);
345     }
346 
347     private static class EncodedWriteRequest extends DefaultWriteRequest {
348         private EncodedWriteRequest(Object encodedMessage,
349                 WriteFuture future, SocketAddress destination) {
350             super(encodedMessage, future, destination);
351         }
352     }
353 
354     private static class MessageWriteRequest extends WriteRequestWrapper {
355         private MessageWriteRequest(WriteRequest writeRequest) {
356             super(writeRequest);
357         }
358 
359         @Override
360         public Object getMessage() {
361             return EMPTY_BUFFER;
362         }
363     }
364 
365     private static class ProtocolDecoderOutputImpl extends
366             AbstractProtocolDecoderOutput {
367         private final IoSession session;
368         private final NextFilter nextFilter;
369 
370         public ProtocolDecoderOutputImpl(
371                 IoSession session, NextFilter nextFilter) {
372             this.session = session;
373             this.nextFilter = nextFilter;
374         }
375 
376         public void flush() {
377             Queue<Object> messageQueue = getMessageQueue();
378             while (!messageQueue.isEmpty()) {
379                 nextFilter.messageReceived(session, messageQueue.poll());
380             }
381         }
382     }
383 
384     private static class ProtocolEncoderOutputImpl extends
385             AbstractProtocolEncoderOutput {
386         private final IoSession session;
387 
388         private final NextFilter nextFilter;
389 
390         private final WriteRequest writeRequest;
391 
392         public ProtocolEncoderOutputImpl(IoSession session,
393                 NextFilter nextFilter, WriteRequest writeRequest) {
394             this.session = session;
395             this.nextFilter = nextFilter;
396             this.writeRequest = writeRequest;
397         }
398 
399         public WriteFuture flush() {
400             Queue<Object> bufferQueue = getMessageQueue();
401             WriteFuture future = null;
402             for (;;) {
403                 Object encodedMessage = bufferQueue.poll();
404                 if (encodedMessage == null) {
405                     break;
406                 }
407 
408                 // Flush only when the buffer has remaining.
409                 if (!(encodedMessage instanceof IoBuffer) ||
410                         ((IoBuffer) encodedMessage).hasRemaining()) {
411                     future = new DefaultWriteFuture(session);
412                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
413                             future, writeRequest.getDestination()));
414                 }
415             }
416 
417             if (future == null) {
418                 future = DefaultWriteFuture.newNotWrittenFuture(
419                         session, new NothingWrittenException(writeRequest));
420             }
421 
422             return future;
423         }
424         
425         public void flushWithoutFuture() {
426             Queue<Object> bufferQueue = getMessageQueue();
427             for (;;) {
428                 Object encodedMessage = bufferQueue.poll();
429                 if (encodedMessage == null) {
430                     break;
431                 }
432 
433                 // Flush only when the buffer has remaining.
434                 if (!(encodedMessage instanceof IoBuffer) ||
435                         ((IoBuffer) encodedMessage).hasRemaining()) {
436                     nextFilter.filterWrite(
437                             session, new EncodedWriteRequest(
438                                     encodedMessage, null, writeRequest.getDestination()));
439                 }
440             }
441         }
442     }
443 }