View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.DefaultWriteRequest;
35  import org.apache.mina.core.write.NothingWrittenException;
36  import org.apache.mina.core.write.WriteRequest;
37  import org.apache.mina.core.write.WriteRequestWrapper;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message object and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author The Apache MINA Project (dev@mina.apache.org)
47   * @version $Rev: 756234 $, $Date: 2009-03-19 23:07:05 +0100 (Thu, 19 Mar 2009) $
48   * @org.apache.xbean.XBean
49   */
50  public class ProtocolCodecFilter extends IoFilterAdapter {
51  
52      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
53      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
54  
55      private final AttributeKey ENCODER = new AttributeKey(getClass(), "encoder");
56      private final AttributeKey DECODER = new AttributeKey(getClass(), "decoder");
57      private final AttributeKey DECODER_OUT = new AttributeKey(getClass(), "decoderOut");
58      private final AttributeKey ENCODER_OUT = new AttributeKey(getClass(), "encoderOut");
59      
60      /** The factory responsible for creating the encoder and decoder */
61      private final ProtocolCodecFactory factory;
62  
63      private final Logger logger = LoggerFactory.getLogger(getClass());
64  
65      /**
66       * 
67       * Creates a new instance of ProtocolCodecFilter, associating a factory
68       * for the creation of the encoder and decoder.
69       *
70       * @param factory The associated factory
71       */
72      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
73          if (factory == null) {
74              throw new NullPointerException("factory");
75          }
76          this.factory = factory;
77      }
78  
79      
80      /**
81       * Creates a new instance of ProtocolCodecFilter, without any factory.
82       * The encoder/decoder factory will be created as an inner class, using
83       * the two parameters (encoder and decoder). 
84       * 
85       * @param encoder The class responsible for encoding the message
86       * @param decoder The class responsible for decoding the message
87       */
88      public ProtocolCodecFilter(final ProtocolEncoder encoder,
89              final ProtocolDecoder decoder) {
90          if (encoder == null) {
91              throw new NullPointerException("encoder");
92          }
93          if (decoder == null) {
94              throw new NullPointerException("decoder");
95          }
96  
97          // Create the inner Factory based on the two parameters
98          this.factory = new ProtocolCodecFactory() {
99              public ProtocolEncoder getEncoder(IoSession session) {
100                 return encoder;
101             }
102 
103             public ProtocolDecoder getDecoder(IoSession session) {
104                 return decoder;
105             }
106         };
107     }
108 
109     /**
110      * Creates a new instance of ProtocolCodecFilter, without any factory.
111      * The encoder/decoder factory will be created as an inner class, using
112      * the two parameters (encoder and decoder), which are class names. Instances
113      * for those classes will be created in this constructor.
114      * 
115      * @param encoder The class responsible for encoding the message
116      * @param decoder The class responsible for decoding the message
117      */
118     public ProtocolCodecFilter(
119             final Class<? extends ProtocolEncoder> encoderClass,
120             final Class<? extends ProtocolDecoder> decoderClass) {
121         if (encoderClass == null) {
122             throw new NullPointerException("encoderClass");
123         }
124         if (decoderClass == null) {
125             throw new NullPointerException("decoderClass");
126         }
127         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
128             throw new IllegalArgumentException("encoderClass: "
129                     + encoderClass.getName());
130         }
131         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
132             throw new IllegalArgumentException("decoderClass: "
133                     + decoderClass.getName());
134         }
135         try {
136             encoderClass.getConstructor(EMPTY_PARAMS);
137         } catch (NoSuchMethodException e) {
138             throw new IllegalArgumentException(
139                     "encoderClass doesn't have a public default constructor.");
140         }
141         try {
142             decoderClass.getConstructor(EMPTY_PARAMS);
143         } catch (NoSuchMethodException e) {
144             throw new IllegalArgumentException(
145                     "decoderClass doesn't have a public default constructor.");
146         }
147 
148         // Create the inner Factory based on the two parameters. We instanciate
149         // the encoder and decoder locally.
150         this.factory = new ProtocolCodecFactory() {
151             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
152                 return encoderClass.newInstance();
153             }
154 
155             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
156                 return decoderClass.newInstance();
157             }
158         };
159     }
160 
161     
162     /**
163      * Get the encoder instance from a given session.
164      *
165      * @param session The associated session we will get the encoder from
166      * @return The encoder instance, if any
167      */
168     public ProtocolEncoder getEncoder(IoSession session) {
169         return (ProtocolEncoder) session.getAttribute(ENCODER);
170     }
171 
172     @Override
173     public void onPreAdd(IoFilterChain parent, String name,
174             NextFilter nextFilter) throws Exception {
175         if (parent.contains(this)) {
176             throw new IllegalArgumentException(
177                     "You can't add the same filter instance more than once.  Create another instance and add it.");
178         }
179         
180         // Initialize the encoder and decoder
181         initCodec(parent.getSession());
182     }
183 
184     @Override
185     public void onPostRemove(IoFilterChain parent, String name,
186             NextFilter nextFilter) throws Exception {
187         // Clean everything
188         disposeCodec(parent.getSession());
189     }
190 
191     /**
192      * Process the incoming message, calling the session decoder. As the incoming
193      * buffer might contains more than one messages, we have to loop until the decoder
194      * throws an exception.
195      * 
196      *  while ( buffer not empty )
197      *    try 
198      *      decode ( buffer )
199      *    catch
200      *      break;
201      *    
202      */
203     @Override
204     public void messageReceived(NextFilter nextFilter, IoSession session,
205             Object message) throws Exception {
206         if (!(message instanceof IoBuffer)) {
207             nextFilter.messageReceived(session, message);
208             return;
209         }
210 
211         IoBuffer in = (IoBuffer) message;
212         ProtocolDecoder decoder = getDecoder(session);
213         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
214         
215         // Loop until we don't have anymore byte in the buffer,
216         // or until the decoder throws an unrecoverable exception or 
217         // can't decoder a message, because there are not enough 
218         // data in the buffer
219         while (in.hasRemaining()) {
220             int oldPos = in.position();
221             try {
222                 synchronized (decoderOut) {
223                     // Call the decoder with the read bytes
224                     decoder.decode(session, in, decoderOut);
225                 }
226                 
227                 // Finish decoding if no exception was thrown.
228                 decoderOut.flush(nextFilter, session);
229             } catch (Throwable t) {
230                 ProtocolDecoderException pde;
231                 if (t instanceof ProtocolDecoderException) {
232                     pde = (ProtocolDecoderException) t;
233                 } else {
234                     pde = new ProtocolDecoderException(t);
235                 }
236                 
237                 if (pde.getHexdump() == null) {
238                     // Generate a message hex dump
239                     int curPos = in.position();
240                     in.position(oldPos);
241                     pde.setHexdump(in.getHexDump());
242                     in.position(curPos);
243                 }
244 
245                 // Fire the exceptionCaught event.
246                 decoderOut.flush(nextFilter, session);
247                 nextFilter.exceptionCaught(session, pde);
248 
249                 // Retry only if the type of the caught exception is
250                 // recoverable and the buffer position has changed.
251                 // We check buffer position additionally to prevent an
252                 // infinite loop.
253                 if (!(t instanceof RecoverableProtocolDecoderException) ||
254                         (in.position() == oldPos)) {
255                     break;
256                 }
257             }
258         }
259     }
260 
261     @Override
262     public void messageSent(NextFilter nextFilter, IoSession session,
263             WriteRequest writeRequest) throws Exception {
264         if (writeRequest instanceof EncodedWriteRequest) {
265             return;
266         }
267 
268         if (!(writeRequest instanceof MessageWriteRequest)) {
269             nextFilter.messageSent(session, writeRequest);
270             return;
271         }
272 
273         MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
274         nextFilter.messageSent(session, wrappedRequest.getParentRequest());
275     }
276 
277     @Override
278     public void filterWrite(NextFilter nextFilter, IoSession session,
279             WriteRequest writeRequest) throws Exception {
280         Object message = writeRequest.getMessage();
281         
282         // Bypass the encoding if the message is contained in a ByteBuffer,
283         // as it has already been encoded before
284         if (message instanceof IoBuffer || message instanceof FileRegion) {
285             nextFilter.filterWrite(session, writeRequest);
286             return;
287         }
288 
289         // Get the encoder in the session
290         ProtocolEncoder encoder = getEncoder(session);
291 
292         ProtocolEncoderOutput encoderOut = getEncoderOut(session,
293                 nextFilter, writeRequest);
294 
295         try {
296             // Now we can try to encode the response
297             encoder.encode(session, message, encoderOut);
298             
299             // Send it directly
300             ((ProtocolEncoderOutputImpl)encoderOut).flushWithoutFuture();
301             
302             // Call the next filter
303             nextFilter.filterWrite(session, new MessageWriteRequest(
304                     writeRequest));
305         } catch (Throwable t) {
306             ProtocolEncoderException pee;
307             
308             // Generate the correct exception
309             if (t instanceof ProtocolEncoderException) {
310                 pee = (ProtocolEncoderException) t;
311             } else {
312                 pee = new ProtocolEncoderException(t);
313             }
314             
315             throw pee;
316         }
317     }
318     
319 
320     @Override
321     public void sessionClosed(NextFilter nextFilter, IoSession session)
322             throws Exception {
323         // Call finishDecode() first when a connection is closed.
324         ProtocolDecoder decoder = getDecoder(session);
325         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
326         
327         try {
328             decoder.finishDecode(session, decoderOut);
329         } catch (Throwable t) {
330             ProtocolDecoderException pde;
331             if (t instanceof ProtocolDecoderException) {
332                 pde = (ProtocolDecoderException) t;
333             } else {
334                 pde = new ProtocolDecoderException(t);
335             }
336             throw pde;
337         } finally {
338             // Dispose everything
339             disposeCodec(session);
340             decoderOut.flush(nextFilter, session);
341         }
342 
343         // Call the next filter
344         nextFilter.sessionClosed(session);
345     }
346 
347     private static class EncodedWriteRequest extends DefaultWriteRequest {
348         private EncodedWriteRequest(Object encodedMessage,
349                 WriteFuture future, SocketAddress destination) {
350             super(encodedMessage, future, destination);
351         }
352     }
353 
354     private static class MessageWriteRequest extends WriteRequestWrapper {
355         private MessageWriteRequest(WriteRequest writeRequest) {
356             super(writeRequest);
357         }
358 
359         @Override
360         public Object getMessage() {
361             return EMPTY_BUFFER;
362         }
363     }
364 
365     private static class ProtocolDecoderOutputImpl extends
366             AbstractProtocolDecoderOutput {
367         public ProtocolDecoderOutputImpl() {
368         }
369 
370         public void flush(NextFilter nextFilter, IoSession session) {
371             Queue<Object> messageQueue = getMessageQueue();
372             while (!messageQueue.isEmpty()) {
373                 nextFilter.messageReceived(session, messageQueue.poll());
374             }
375         }
376     }
377 
378     private static class ProtocolEncoderOutputImpl extends
379             AbstractProtocolEncoderOutput {
380         private final IoSession session;
381 
382         private final NextFilter nextFilter;
383 
384         private final WriteRequest writeRequest;
385 
386         public ProtocolEncoderOutputImpl(IoSession session,
387                 NextFilter nextFilter, WriteRequest writeRequest) {
388             this.session = session;
389             this.nextFilter = nextFilter;
390             this.writeRequest = writeRequest;
391         }
392 
393         public WriteFuture flush() {
394             Queue<Object> bufferQueue = getMessageQueue();
395             WriteFuture future = null;
396             for (;;) {
397                 Object encodedMessage = bufferQueue.poll();
398                 if (encodedMessage == null) {
399                     break;
400                 }
401 
402                 // Flush only when the buffer has remaining.
403                 if (!(encodedMessage instanceof IoBuffer) ||
404                         ((IoBuffer) encodedMessage).hasRemaining()) {
405                     future = new DefaultWriteFuture(session);
406                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
407                             future, writeRequest.getDestination()));
408                 }
409             }
410 
411             if (future == null) {
412                 future = DefaultWriteFuture.newNotWrittenFuture(
413                         session, new NothingWrittenException(writeRequest));
414             }
415 
416             return future;
417         }
418         
419         public void flushWithoutFuture() {
420             Queue<Object> bufferQueue = getMessageQueue();
421             for (;;) {
422                 Object encodedMessage = bufferQueue.poll();
423                 if (encodedMessage == null) {
424                     break;
425                 }
426 
427                 // Flush only when the buffer has remaining.
428                 if (!(encodedMessage instanceof IoBuffer) ||
429                         ((IoBuffer) encodedMessage).hasRemaining()) {
430                     SocketAddress destination = writeRequest.getDestination();
431                     WriteRequest writeRequest = new EncodedWriteRequest(
432                         encodedMessage, null, destination); 
433                     nextFilter.filterWrite(session, writeRequest);
434                 }
435             }
436         }
437     }
438     
439     //----------- Helper methods ---------------------------------------------
440     /**
441      * Initialize the encoder and the decoder, storing them in the 
442      * session attributes.
443      */
444     private void initCodec(IoSession session) throws Exception {
445         // Creates the decoder and stores it into the newly created session 
446         ProtocolDecoder decoder = factory.getDecoder(session);
447         session.setAttribute(DECODER, decoder);
448 
449         // Creates the encoder and stores it into the newly created session 
450         ProtocolEncoder encoder = factory.getEncoder(session);
451         session.setAttribute(ENCODER, encoder);
452     }
453     
454     /**
455      * Dispose the encoder, decoder, and the callback for the decoded
456      * messages.
457      */
458     private void disposeCodec(IoSession session) {
459         // We just remove the two instances of encoder/decoder to release resources
460         // from the session
461         disposeEncoder(session);
462         disposeDecoder(session);
463         
464         // We also remove the callback  
465         disposeDecoderOut(session);
466     }
467     
468     /**
469      * dispose the encoder, removing its instance from the
470      * session's attributes, and calling the associated
471      * dispose method.
472      */
473     private void disposeEncoder(IoSession session) {
474         ProtocolEncoder encoder = (ProtocolEncoder) session
475                 .removeAttribute(ENCODER);
476         if (encoder == null) {
477             return;
478         }
479 
480         try {
481             encoder.dispose(session);
482         } catch (Throwable t) {
483             logger.warn(
484                     "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
485         }
486     }
487 
488     /**
489      * Get the decoder instance from a given session.
490      *
491      * @param session The associated session we will get the decoder from
492      * @return The decoder instance
493      */
494     private ProtocolDecoder getDecoder(IoSession session) {
495         return (ProtocolDecoder) session.getAttribute(DECODER);
496     }
497 
498     /**
499      * dispose the decoder, removing its instance from the
500      * session's attributes, and calling the associated
501      * dispose method.
502      */
503     private void disposeDecoder(IoSession session) {
504         ProtocolDecoder decoder = (ProtocolDecoder) session
505                 .removeAttribute(DECODER);
506         if (decoder == null) {
507             return;
508         }
509 
510         try {
511             decoder.dispose(session);
512         } catch (Throwable t) {
513             logger.warn(
514                     "Falied to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
515         }
516     }
517 
518     /**
519      * Return a reference to the decoder callback. If it's not already created
520      * and stored into the session, we create a new instance.
521      */
522     private ProtocolDecoderOutput getDecoderOut(IoSession session,
523             NextFilter nextFilter) {
524         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
525         
526         if (out == null) {
527             // Create a new instance, and stores it into the session
528             out = new ProtocolDecoderOutputImpl();
529             session.setAttribute(DECODER_OUT, out);
530         }
531         
532         return out;
533     }
534 
535     private ProtocolEncoderOutput getEncoderOut(IoSession session,
536         NextFilter nextFilter, WriteRequest writeRequest) {
537         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
538         
539         if (out == null) {
540             // Create a new instance, and stores it into the session
541             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
542             session.setAttribute(ENCODER_OUT, out);
543         }
544         
545         return out;
546     }
547 
548     /**
549      * Remove the decoder callback from the session's attributes.
550      */
551     private void disposeDecoderOut(IoSession session) {
552         session.removeAttribute(DECODER_OUT);
553     }
554 }