View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AbstractIoSession;
33  import org.apache.mina.core.session.AttributeKey;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.DefaultWriteRequest;
36  import org.apache.mina.core.write.NothingWrittenException;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message objects and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   * @org.apache.xbean.XBean
48   */
49  public class ProtocolCodecFilter extends IoFilterAdapter {
50      /** A logger for this class */
51      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52  
53      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54  
55      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
56  
57      private static final AttributeKeyributeKey.html#AttributeKey">AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
58  
59      private static final AttributeKeyributeKey.html#AttributeKey">AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
60  
61      private static final AttributeKeyteKey.html#AttributeKey">AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
62  
63      private static final AttributeKeyteKey.html#AttributeKey">AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
64  
65      /** The factory responsible for creating the encoder and decoder */
66      private final ProtocolCodecFactory factory;
67  
68      /**
69       * Creates a new instance of ProtocolCodecFilter, associating a factory
70       * for the creation of the encoder and decoder.
71       *
72       * @param factory The associated factory
73       */
74      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
75          if (factory == null) {
76              throw new IllegalArgumentException("factory");
77          }
78  
79          this.factory = factory;
80      }
81  
82      /**
83       * Creates a new instance of ProtocolCodecFilter, without any factory.
84       * The encoder/decoder factory will be created as an inner class, using
85       * the two parameters (encoder and decoder).
86       * 
87       * @param encoder The class responsible for encoding the message
88       * @param decoder The class responsible for decoding the message
89       */
90      public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
91          if (encoder == null) {
92              throw new IllegalArgumentException("encoder");
93          }
94          if (decoder == null) {
95              throw new IllegalArgumentException("decoder");
96          }
97  
98          // Create the inner Factory based on the two parameters
99          this.factory = new ProtocolCodecFactory() {
100             /**
101              * {@inheritDoc}
102              */
103             @Override
104             public ProtocolEncoder getEncoder(IoSession session) {
105                 return encoder;
106             }
107 
108             /**
109              * {@inheritDoc}
110              */
111             @Override
112             public ProtocolDecoder getDecoder(IoSession session) {
113                 return decoder;
114             }
115         };
116     }
117 
118     /**
119      * Creates a new instance of ProtocolCodecFilter, without any factory.
120      * The encoder/decoder factory will be created as an inner class, using
121      * the two parameters (encoder and decoder), which are class names. Instances
122      * for those classes will be created in this constructor.
123      * 
124      * @param encoderClass The class responsible for encoding the message
125      * @param decoderClass The class responsible for decoding the message
126      */
127     public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
128             final Class<? extends ProtocolDecoder> decoderClass) {
129         if (encoderClass == null) {
130             throw new IllegalArgumentException("encoderClass");
131         }
132         if (decoderClass == null) {
133             throw new IllegalArgumentException("decoderClass");
134         }
135         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
136             throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
137         }
138         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
139             throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
140         }
141         try {
142             encoderClass.getConstructor(EMPTY_PARAMS);
143         } catch (NoSuchMethodException e) {
144             throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
145         }
146         try {
147             decoderClass.getConstructor(EMPTY_PARAMS);
148         } catch (NoSuchMethodException e) {
149             throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
150         }
151 
152         final ProtocolEncoder encoder;
153 
154         try {
155             encoder = encoderClass.newInstance();
156         } catch (Exception e) {
157             throw new IllegalArgumentException("encoderClass cannot be initialized");
158         }
159 
160         final ProtocolDecoder decoder;
161 
162         try {
163             decoder = decoderClass.newInstance();
164         } catch (Exception e) {
165             throw new IllegalArgumentException("decoderClass cannot be initialized");
166         }
167 
168         // Create the inner factory based on the two parameters.
169         this.factory = new ProtocolCodecFactory() {
170             /**
171              * {@inheritDoc}
172              */
173             @Override
174             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
175                 return encoder;
176             }
177 
178             /**
179              * {@inheritDoc}
180              */
181             @Override
182             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
183                 return decoder;
184             }
185         };
186     }
187 
188     /**
189      * Get the encoder instance from a given session.
190      *
191      * @param session The associated session we will get the encoder from
192      * @return The encoder instance, if any
193      */
194     public ProtocolEncoder getEncoder(IoSession session) {
195         return (ProtocolEncoder) session.getAttribute(ENCODER);
196     }
197 
198     /**
199      * {@inheritDoc}
200      */
201     @Override
202     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
203         if (parent.contains(this)) {
204             throw new IllegalArgumentException(
205                     "You can't add the same filter instance more than once.  Create another instance and add it.");
206         }
207     }
208 
209     /**
210      * {@inheritDoc}
211      */
212     @Override
213     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
214         // Clean everything
215         disposeCodec(parent.getSession());
216     }
217 
218     /**
219      * Process the incoming message, calling the session decoder. As the incoming
220      * buffer might contains more than one messages, we have to loop until the decoder
221      * throws an exception.
222      * 
223      *  while ( buffer not empty )
224      *    try
225      *      decode ( buffer )
226      *    catch
227      *      break;
228      * 
229      */
230     @Override
231     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
232         if (LOGGER.isDebugEnabled()) {
233             LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
234         }
235 
236         if (!(message instanceof IoBuffer)) {
237             nextFilter.messageReceived(session, message);
238             return;
239         }
240 
241         IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer in = (IoBuffer) message;
242         ProtocolDecoder decoder = factory.getDecoder(session);
243         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
244 
245         // Loop until we don't have anymore byte in the buffer,
246         // or until the decoder throws an unrecoverable exception or
247         // can't decoder a message, because there are not enough
248         // data in the buffer
249         while (in.hasRemaining()) {
250             int oldPos = in.position();
251             try {
252                 synchronized (session) {
253                     // Call the decoder with the read bytes
254                     decoder.decode(session, in, decoderOut);
255                 }
256                 // Finish decoding if no exception was thrown.
257                 decoderOut.flush(nextFilter, session);
258             } catch (Exception e) {
259                 ProtocolDecoderException pde;
260                 if (e instanceof ProtocolDecoderException) {
261                     pde = (ProtocolDecoderException) e;
262                 } else {
263                     pde = new ProtocolDecoderException(e);
264                 }
265                 if (pde.getHexdump() == null) {
266                     // Generate a message hex dump
267                     int curPos = in.position();
268                     in.position(oldPos);
269                     pde.setHexdump(in.getHexDump());
270                     in.position(curPos);
271                 }
272                 // Fire the exceptionCaught event.
273                 decoderOut.flush(nextFilter, session);
274                 nextFilter.exceptionCaught(session, pde);
275                 // Retry only if the type of the caught exception is
276                 // recoverable and the buffer position has changed.
277                 // We check buffer position additionally to prevent an
278                 // infinite loop.
279                 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
280                     break;
281                 }
282             }
283         }
284     }
285 
286     /**
287      * {@inheritDoc}
288      */
289     @Override
290     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
291         if (writeRequest instanceof EncodedWriteRequest) {
292             return;
293         }
294 
295         nextFilter.messageSent(session, writeRequest);
296     }
297 
298     /**
299      * {@inheritDoc}
300      */
301     @Override
302     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
303         Object message = writeRequest.getMessage();
304 
305         // Bypass the encoding if the message is contained in a IoBuffer,
306         // as it has already been encoded before
307         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
308             nextFilter.filterWrite(session, writeRequest);
309             return;
310         }
311 
312         // Get the encoder in the session
313         ProtocolEncoder encoder = factory.getEncoder(session);
314 
315         ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
316 
317         if (encoder == null) {
318             throw new ProtocolEncoderException("The encoder is null for the session " + session);
319         }
320 
321         try {
322             // Now we can try to encode the response
323             encoder.encode(session, message, encoderOut);
324 
325             // Send it directly
326             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
327 
328             // Write all the encoded messages now
329             while (!bufferQueue.isEmpty()) {
330                 Object encodedMessage = bufferQueue.poll();
331 
332                 if (encodedMessage == null) {
333                     break;
334                 }
335 
336 		// Flush only when the buffer has remaining.
337 		if (!(encodedMessage instanceof IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
338 		    if (bufferQueue.isEmpty()) {
339 			writeRequest.setMessage(encodedMessage);
340 			nextFilter.filterWrite(session, writeRequest);
341 		    } else {
342 			SocketAddress destination = writeRequest.getDestination();
343 			WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
344 			nextFilter.filterWrite(session, encodedWriteRequest);
345 		    }
346 		}
347 	    }
348         } catch (Exception e) {
349             ProtocolEncoderException pee;
350 
351             // Generate the correct exception
352             if (e instanceof ProtocolEncoderException) {
353                 pee = (ProtocolEncoderException) e;
354             } else {
355                 pee = new ProtocolEncoderException(e);
356             }
357 
358             throw pee;
359         }
360     }
361 
362     /**
363      * {@inheritDoc}
364      */
365     @Override
366     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
367         // Call finishDecode() first when a connection is closed.
368         ProtocolDecoder decoder = factory.getDecoder(session);
369         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
370 
371         try {
372             decoder.finishDecode(session, decoderOut);
373         } catch (Exception e) {
374             ProtocolDecoderException pde;
375             if (e instanceof ProtocolDecoderException) {
376                 pde = (ProtocolDecoderException) e;
377             } else {
378                 pde = new ProtocolDecoderException(e);
379             }
380             throw pde;
381         } finally {
382             // Dispose everything
383             disposeCodec(session);
384             decoderOut.flush(nextFilter, session);
385         }
386 
387         // Call the next filter
388         nextFilter.sessionClosed(session);
389     }
390 
391     private static class EncodedWriteRequest extends DefaultWriteRequest {
392         public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
393             super(encodedMessage, future, destination);
394         }
395 
396         /**
397          * {@inheritDoc}
398          */
399         @Override
400         public boolean isEncoded() {
401             return true;
402         }
403     }
404 
405     private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
406         public ProtocolDecoderOutputImpl() {
407             // Do nothing
408         }
409 
410         /**
411          * {@inheritDoc}
412          */
413         @Override
414         public void flush(NextFilter nextFilter, IoSession session) {
415             Queue<Object> messageQueue = getMessageQueue();
416 
417             while (!messageQueue.isEmpty()) {
418                 nextFilter.messageReceived(session, messageQueue.poll());
419             }
420         }
421     }
422 
423     private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
424         private final IoSession session;
425 
426         private final NextFilter nextFilter;
427 
428         /** The WriteRequest destination */
429         private final SocketAddress destination;
430 
431         public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
432             this.session = session;
433             this.nextFilter = nextFilter;
434 
435             // Only store the destination, not the full WriteRequest.
436             destination = writeRequest.getDestination();
437         }
438 
439         /**
440          * {@inheritDoc}
441          */
442         @Override
443         public WriteFuture flush() {
444             Queue<Object> bufferQueue = getMessageQueue();
445             WriteFuture future = null;
446 
447             while (!bufferQueue.isEmpty()) {
448                 Object encodedMessage = bufferQueue.poll();
449 
450                 if (encodedMessage == null) {
451                     break;
452                 }
453 
454                 // Flush only when the buffer has remaining.
455                 if (!(encodedMessage instanceof IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
456                     future = new DefaultWriteFuture(session);
457                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
458                 }
459             }
460 
461             if (future == null) {
462                 // Creates an empty writeRequest containing the destination
463                 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
464             }
465 
466             return future;
467         }
468     }
469 
470     //----------- Helper methods ---------------------------------------------
471     /**
472      * Dispose the encoder, decoder, and the callback for the decoded
473      * messages.
474      */
475     private void disposeCodec(IoSession session) {
476         // We just remove the two instances of encoder/decoder to release resources
477         // from the session
478         disposeEncoder(session);
479         disposeDecoder(session);
480 
481         // We also remove the callback
482         disposeDecoderOut(session);
483     }
484 
485     /**
486      * Dispose the encoder, removing its instance from the
487      * session's attributes, and calling the associated
488      * dispose method.
489      */
490     private void disposeEncoder(IoSession session) {
491         ProtocolEncoder./../org/apache/mina/filter/codec/ProtocolEncoder.html#ProtocolEncoder">ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
492         if (encoder == null) {
493             return;
494         }
495 
496         try {
497             encoder.dispose(session);
498         } catch (Exception e) {
499             LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
500         }
501     }
502 
503     /**
504      * Dispose the decoder, removing its instance from the
505      * session's attributes, and calling the associated
506      * dispose method.
507      */
508     private void disposeDecoder(IoSession session) {
509         ProtocolDecoder./../org/apache/mina/filter/codec/ProtocolDecoder.html#ProtocolDecoder">ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
510         if (decoder == null) {
511             return;
512         }
513 
514         try {
515             decoder.dispose(session);
516         } catch (Exception e) {
517             LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
518         }
519     }
520 
521     /**
522      * Return a reference to the decoder callback. If it's not already created
523      * and stored into the session, we create a new instance.
524      */
525     private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
526         ProtocolDecoderOutput../org/apache/mina/filter/codec/ProtocolDecoderOutput.html#ProtocolDecoderOutput">ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
527 
528         if (out == null) {
529             // Create a new instance, and stores it into the session
530             out = new ProtocolDecoderOutputImpl();
531             session.setAttribute(DECODER_OUT, out);
532         }
533 
534         return out;
535     }
536 
537     private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
538         ProtocolEncoderOutput../org/apache/mina/filter/codec/ProtocolEncoderOutput.html#ProtocolEncoderOutput">ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
539 
540         if (out == null) {
541             // Create a new instance, and stores it into the session
542             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
543             session.setAttribute(ENCODER_OUT, out);
544         }
545 
546         return out;
547     }
548 
549     /**
550      * Remove the decoder callback from the session's attributes.
551      */
552     private void disposeDecoderOut(IoSession session) {
553         session.removeAttribute(DECODER_OUT);
554     }
555 }