View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.DefaultWriteRequest;
35  import org.apache.mina.core.write.NothingWrittenException;
36  import org.apache.mina.core.write.WriteRequest;
37  import org.apache.mina.core.write.WriteRequestWrapper;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message object and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author The Apache MINA Project (dev@mina.apache.org)
47   * @org.apache.xbean.XBean
48   */
49  public class ProtocolCodecFilter extends IoFilterAdapter {
50      /** A logger for this class */
51      private final static Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52  
53      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
55  
56      private final AttributeKey ENCODER = new AttributeKey(getClass(), "encoder");
57      private final AttributeKey DECODER = new AttributeKey(getClass(), "decoder");
58      private final AttributeKey DECODER_OUT = new AttributeKey(getClass(), "decoderOut");
59      private final AttributeKey ENCODER_OUT = new AttributeKey(getClass(), "encoderOut");
60      
61      /** The factory responsible for creating the encoder and decoder */
62      private final ProtocolCodecFactory factory;
63  
64      /**
65       * 
66       * Creates a new instance of ProtocolCodecFilter, associating a factory
67       * for the creation of the encoder and decoder.
68       *
69       * @param factory The associated factory
70       */
71      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
72          if (factory == null) {
73              throw new NullPointerException("factory");
74          }
75          this.factory = factory;
76      }
77  
78      
79      /**
80       * Creates a new instance of ProtocolCodecFilter, without any factory.
81       * The encoder/decoder factory will be created as an inner class, using
82       * the two parameters (encoder and decoder). 
83       * 
84       * @param encoder The class responsible for encoding the message
85       * @param decoder The class responsible for decoding the message
86       */
87      public ProtocolCodecFilter(final ProtocolEncoder encoder,
88              final ProtocolDecoder decoder) {
89          if (encoder == null) {
90              throw new NullPointerException("encoder");
91          }
92          if (decoder == null) {
93              throw new NullPointerException("decoder");
94          }
95  
96          // Create the inner Factory based on the two parameters
97          this.factory = new ProtocolCodecFactory() {
98              public ProtocolEncoder getEncoder(IoSession session) {
99                  return encoder;
100             }
101 
102             public ProtocolDecoder getDecoder(IoSession session) {
103                 return decoder;
104             }
105         };
106     }
107 
108     /**
109      * Creates a new instance of ProtocolCodecFilter, without any factory.
110      * The encoder/decoder factory will be created as an inner class, using
111      * the two parameters (encoder and decoder), which are class names. Instances
112      * for those classes will be created in this constructor.
113      * 
114      * @param encoder The class responsible for encoding the message
115      * @param decoder The class responsible for decoding the message
116      */
117     public ProtocolCodecFilter(
118             final Class<? extends ProtocolEncoder> encoderClass,
119             final Class<? extends ProtocolDecoder> decoderClass) {
120         if (encoderClass == null) {
121             throw new NullPointerException("encoderClass");
122         }
123         if (decoderClass == null) {
124             throw new NullPointerException("decoderClass");
125         }
126         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
127             throw new IllegalArgumentException("encoderClass: "
128                     + encoderClass.getName());
129         }
130         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
131             throw new IllegalArgumentException("decoderClass: "
132                     + decoderClass.getName());
133         }
134         try {
135             encoderClass.getConstructor(EMPTY_PARAMS);
136         } catch (NoSuchMethodException e) {
137             throw new IllegalArgumentException(
138                     "encoderClass doesn't have a public default constructor.");
139         }
140         try {
141             decoderClass.getConstructor(EMPTY_PARAMS);
142         } catch (NoSuchMethodException e) {
143             throw new IllegalArgumentException(
144                     "decoderClass doesn't have a public default constructor.");
145         }
146 
147         // Create the inner Factory based on the two parameters. We instanciate
148         // the encoder and decoder locally.
149         this.factory = new ProtocolCodecFactory() {
150             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
151                 return encoderClass.newInstance();
152             }
153 
154             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
155                 return decoderClass.newInstance();
156             }
157         };
158     }
159 
160     
161     /**
162      * Get the encoder instance from a given session.
163      *
164      * @param session The associated session we will get the encoder from
165      * @return The encoder instance, if any
166      */
167     public ProtocolEncoder getEncoder(IoSession session) {
168         return (ProtocolEncoder) session.getAttribute(ENCODER);
169     }
170 
171     @Override
172     public void onPreAdd(IoFilterChain parent, String name,
173             NextFilter nextFilter) throws Exception {
174         if (parent.contains(this)) {
175             throw new IllegalArgumentException(
176                     "You can't add the same filter instance more than once.  Create another instance and add it.");
177         }
178         
179         // Initialize the encoder and decoder
180         initCodec(parent.getSession());
181     }
182 
183     @Override
184     public void onPostRemove(IoFilterChain parent, String name,
185             NextFilter nextFilter) throws Exception {
186         // Clean everything
187         disposeCodec(parent.getSession());
188     }
189 
190     /**
191      * Process the incoming message, calling the session decoder. As the incoming
192      * buffer might contains more than one messages, we have to loop until the decoder
193      * throws an exception.
194      * 
195      *  while ( buffer not empty )
196      *    try 
197      *      decode ( buffer )
198      *    catch
199      *      break;
200      *    
201      */
202     @Override
203     public void messageReceived(NextFilter nextFilter, IoSession session,
204             Object message) throws Exception {
205         LOGGER.debug( "Processing a MESSAGE_RECEIVED for session {}", session.getId() );
206         
207         if (!(message instanceof IoBuffer)) {
208             nextFilter.messageReceived(session, message);
209             return;
210         }
211 
212         IoBuffer in = (IoBuffer) message;
213         ProtocolDecoder decoder = getDecoder(session);
214         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
215         
216         // Loop until we don't have anymore byte in the buffer,
217         // or until the decoder throws an unrecoverable exception or 
218         // can't decoder a message, because there are not enough 
219         // data in the buffer
220         while (in.hasRemaining()) {
221             int oldPos = in.position();
222             try {
223                 synchronized (decoderOut) {
224                     // Call the decoder with the read bytes
225                     decoder.decode(session, in, decoderOut);
226                 }
227                 
228                 // Finish decoding if no exception was thrown.
229                 decoderOut.flush(nextFilter, session);
230             } catch (Throwable t) {
231                 ProtocolDecoderException pde;
232                 if (t instanceof ProtocolDecoderException) {
233                     pde = (ProtocolDecoderException) t;
234                 } else {
235                     pde = new ProtocolDecoderException(t);
236                 }
237                 
238                 if (pde.getHexdump() == null) {
239                     // Generate a message hex dump
240                     int curPos = in.position();
241                     in.position(oldPos);
242                     pde.setHexdump(in.getHexDump());
243                     in.position(curPos);
244                 }
245 
246                 // Fire the exceptionCaught event.
247                 decoderOut.flush(nextFilter, session);
248                 nextFilter.exceptionCaught(session, pde);
249 
250                 // Retry only if the type of the caught exception is
251                 // recoverable and the buffer position has changed.
252                 // We check buffer position additionally to prevent an
253                 // infinite loop.
254                 if (!(t instanceof RecoverableProtocolDecoderException) ||
255                         (in.position() == oldPos)) {
256                     break;
257                 }
258             }
259         }
260     }
261 
262     @Override
263     public void messageSent(NextFilter nextFilter, IoSession session,
264             WriteRequest writeRequest) throws Exception {
265         if (writeRequest instanceof EncodedWriteRequest) {
266             return;
267         }
268 
269         if (!(writeRequest instanceof MessageWriteRequest)) {
270             nextFilter.messageSent(session, writeRequest);
271             return;
272         }
273 
274         MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
275         nextFilter.messageSent(session, wrappedRequest.getParentRequest());
276     }
277 
278     @Override
279     public void filterWrite(NextFilter nextFilter, IoSession session,
280             WriteRequest writeRequest) throws Exception {
281         Object message = writeRequest.getMessage();
282         
283         // Bypass the encoding if the message is contained in a ByteBuffer,
284         // as it has already been encoded before
285         if (message instanceof IoBuffer || message instanceof FileRegion) {
286             nextFilter.filterWrite(session, writeRequest);
287             return;
288         }
289 
290         // Get the encoder in the session
291         ProtocolEncoder encoder = getEncoder(session);
292 
293         ProtocolEncoderOutput encoderOut = getEncoderOut(session,
294                 nextFilter, writeRequest);
295 
296         try {
297             // Now we can try to encode the response
298             encoder.encode(session, message, encoderOut);
299             
300             // Send it directly
301             ((ProtocolEncoderOutputImpl)encoderOut).flushWithoutFuture();
302             
303             // Call the next filter
304             nextFilter.filterWrite(session, new MessageWriteRequest(
305                     writeRequest));
306         } catch (Throwable t) {
307             ProtocolEncoderException pee;
308             
309             // Generate the correct exception
310             if (t instanceof ProtocolEncoderException) {
311                 pee = (ProtocolEncoderException) t;
312             } else {
313                 pee = new ProtocolEncoderException(t);
314             }
315             
316             throw pee;
317         }
318     }
319     
320 
321     @Override
322     public void sessionClosed(NextFilter nextFilter, IoSession session)
323             throws Exception {
324         // Call finishDecode() first when a connection is closed.
325         ProtocolDecoder decoder = getDecoder(session);
326         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
327         
328         try {
329             decoder.finishDecode(session, decoderOut);
330         } catch (Throwable t) {
331             ProtocolDecoderException pde;
332             if (t instanceof ProtocolDecoderException) {
333                 pde = (ProtocolDecoderException) t;
334             } else {
335                 pde = new ProtocolDecoderException(t);
336             }
337             throw pde;
338         } finally {
339             // Dispose everything
340             disposeCodec(session);
341             decoderOut.flush(nextFilter, session);
342         }
343 
344         // Call the next filter
345         nextFilter.sessionClosed(session);
346     }
347 
348     private static class EncodedWriteRequest extends DefaultWriteRequest {
349         public EncodedWriteRequest(Object encodedMessage,
350                 WriteFuture future, SocketAddress destination) {
351             super(encodedMessage, future, destination);
352         }
353     }
354 
355     private static class MessageWriteRequest extends WriteRequestWrapper {
356         public MessageWriteRequest(WriteRequest writeRequest) {
357             super(writeRequest);
358         }
359 
360         @Override
361         public Object getMessage() {
362             return EMPTY_BUFFER;
363         }
364     }
365 
366     private static class ProtocolDecoderOutputImpl extends
367             AbstractProtocolDecoderOutput {
368         public ProtocolDecoderOutputImpl() {
369             // Do nothing
370         }
371 
372         public void flush(NextFilter nextFilter, IoSession session) {
373             Queue<Object> messageQueue = getMessageQueue();
374             while (!messageQueue.isEmpty()) {
375                 nextFilter.messageReceived(session, messageQueue.poll());
376             }
377         }
378     }
379 
380     private static class ProtocolEncoderOutputImpl extends
381             AbstractProtocolEncoderOutput {
382         private final IoSession session;
383 
384         private final NextFilter nextFilter;
385 
386         private final WriteRequest writeRequest;
387 
388         public ProtocolEncoderOutputImpl(IoSession session,
389                 NextFilter nextFilter, WriteRequest writeRequest) {
390             this.session = session;
391             this.nextFilter = nextFilter;
392             this.writeRequest = writeRequest;
393         }
394 
395         public WriteFuture flush() {
396             Queue<Object> bufferQueue = getMessageQueue();
397             WriteFuture future = null;
398             for (;;) {
399                 Object encodedMessage = bufferQueue.poll();
400                 if (encodedMessage == null) {
401                     break;
402                 }
403 
404                 // Flush only when the buffer has remaining.
405                 if (!(encodedMessage instanceof IoBuffer) ||
406                         ((IoBuffer) encodedMessage).hasRemaining()) {
407                     future = new DefaultWriteFuture(session);
408                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
409                             future, writeRequest.getDestination()));
410                 }
411             }
412 
413             if (future == null) {
414                 future = DefaultWriteFuture.newNotWrittenFuture(
415                         session, new NothingWrittenException(writeRequest));
416             }
417 
418             return future;
419         }
420         
421         public void flushWithoutFuture() {
422             Queue<Object> bufferQueue = getMessageQueue();
423             for (;;) {
424                 Object encodedMessage = bufferQueue.poll();
425                 if (encodedMessage == null) {
426                     break;
427                 }
428 
429                 // Flush only when the buffer has remaining.
430                 if (!(encodedMessage instanceof IoBuffer) ||
431                         ((IoBuffer) encodedMessage).hasRemaining()) {
432                     SocketAddress destination = writeRequest.getDestination();
433                     WriteRequest writeRequest = new EncodedWriteRequest(
434                         encodedMessage, null, destination); 
435                     nextFilter.filterWrite(session, writeRequest);
436                 }
437             }
438         }
439     }
440     
441     //----------- Helper methods ---------------------------------------------
442     /**
443      * Initialize the encoder and the decoder, storing them in the 
444      * session attributes.
445      */
446     private void initCodec(IoSession session) throws Exception {
447         // Creates the decoder and stores it into the newly created session 
448         ProtocolDecoder decoder = factory.getDecoder(session);
449         session.setAttribute(DECODER, decoder);
450 
451         // Creates the encoder and stores it into the newly created session 
452         ProtocolEncoder encoder = factory.getEncoder(session);
453         session.setAttribute(ENCODER, encoder);
454     }
455     
456     /**
457      * Dispose the encoder, decoder, and the callback for the decoded
458      * messages.
459      */
460     private void disposeCodec(IoSession session) {
461         // We just remove the two instances of encoder/decoder to release resources
462         // from the session
463         disposeEncoder(session);
464         disposeDecoder(session);
465         
466         // We also remove the callback  
467         disposeDecoderOut(session);
468     }
469     
470     /**
471      * dispose the encoder, removing its instance from the
472      * session's attributes, and calling the associated
473      * dispose method.
474      */
475     private void disposeEncoder(IoSession session) {
476         ProtocolEncoder encoder = (ProtocolEncoder) session
477                 .removeAttribute(ENCODER);
478         if (encoder == null) {
479             return;
480         }
481 
482         try {
483             encoder.dispose(session);
484         } catch (Throwable t) {
485             LOGGER.warn(
486                     "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
487         }
488     }
489 
490     /**
491      * Get the decoder instance from a given session.
492      *
493      * @param session The associated session we will get the decoder from
494      * @return The decoder instance
495      */
496     private ProtocolDecoder getDecoder(IoSession session) {
497         return (ProtocolDecoder) session.getAttribute(DECODER);
498     }
499 
500     /**
501      * dispose the decoder, removing its instance from the
502      * session's attributes, and calling the associated
503      * dispose method.
504      */
505     private void disposeDecoder(IoSession session) {
506         ProtocolDecoder decoder = (ProtocolDecoder) session
507                 .removeAttribute(DECODER);
508         if (decoder == null) {
509             return;
510         }
511 
512         try {
513             decoder.dispose(session);
514         } catch (Throwable t) {
515             LOGGER.warn(
516                     "Falied to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
517         }
518     }
519 
520     /**
521      * Return a reference to the decoder callback. If it's not already created
522      * and stored into the session, we create a new instance.
523      */
524     private ProtocolDecoderOutput getDecoderOut(IoSession session,
525             NextFilter nextFilter) {
526         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
527         
528         if (out == null) {
529             // Create a new instance, and stores it into the session
530             out = new ProtocolDecoderOutputImpl();
531             session.setAttribute(DECODER_OUT, out);
532         }
533         
534         return out;
535     }
536 
537     private ProtocolEncoderOutput getEncoderOut(IoSession session,
538         NextFilter nextFilter, WriteRequest writeRequest) {
539         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
540         
541         if (out == null) {
542             // Create a new instance, and stores it into the session
543             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
544             session.setAttribute(ENCODER_OUT, out);
545         }
546         
547         return out;
548     }
549 
550     /**
551      * Remove the decoder callback from the session's attributes.
552      */
553     private void disposeDecoderOut(IoSession session) {
554         session.removeAttribute(DECODER_OUT);
555     }
556 }