View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AttributeKey;
33  import org.apache.mina.core.session.IoSession;
34  import org.apache.mina.core.write.DefaultWriteRequest;
35  import org.apache.mina.core.write.NothingWrittenException;
36  import org.apache.mina.core.write.WriteRequest;
37  import org.apache.mina.core.write.WriteRequestWrapper;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * An {@link IoFilter} which translates binary or protocol specific data into
43   * message objects and vice versa using {@link ProtocolCodecFactory},
44   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   * @org.apache.xbean.XBean
48   */
49  public class ProtocolCodecFilter extends IoFilterAdapter {
50      /** A logger for this class */
51      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52  
53      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
55  
56      private final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
57      private final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
58      private final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
59      private final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
60      
61      /** The factory responsible for creating the encoder and decoder */
62      private final ProtocolCodecFactory factory;
63  
64      /**
65       * 
66       * Creates a new instance of ProtocolCodecFilter, associating a factory
67       * for the creation of the encoder and decoder.
68       *
69       * @param factory The associated factory
70       */
71      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
72          if (factory == null) {
73              throw new IllegalArgumentException("factory");
74          }
75          
76          this.factory = factory;
77      }
78  
79      
80      /**
81       * Creates a new instance of ProtocolCodecFilter, without any factory.
82       * The encoder/decoder factory will be created as an inner class, using
83       * the two parameters (encoder and decoder). 
84       * 
85       * @param encoder The class responsible for encoding the message
86       * @param decoder The class responsible for decoding the message
87       */
88      public ProtocolCodecFilter(final ProtocolEncoder encoder,
89              final ProtocolDecoder decoder) {
90          if (encoder == null) {
91              throw new IllegalArgumentException("encoder");
92          }
93          if (decoder == null) {
94              throw new IllegalArgumentException("decoder");
95          }
96  
97          // Create the inner Factory based on the two parameters
98          this.factory = new ProtocolCodecFactory() {
99              public ProtocolEncoder getEncoder(IoSession session) {
100                 return encoder;
101             }
102 
103             public ProtocolDecoder getDecoder(IoSession session) {
104                 return decoder;
105             }
106         };
107     }
108 
109     /**
110      * Creates a new instance of ProtocolCodecFilter, without any factory.
111      * The encoder/decoder factory will be created as an inner class, using
112      * the two parameters (encoder and decoder), which are class names. Instances
113      * for those classes will be created in this constructor.
114      * 
115      * @param encoder The class responsible for encoding the message
116      * @param decoder The class responsible for decoding the message
117      */
118     public ProtocolCodecFilter(
119             final Class<? extends ProtocolEncoder> encoderClass,
120             final Class<? extends ProtocolDecoder> decoderClass) {
121         if (encoderClass == null) {
122             throw new IllegalArgumentException("encoderClass");
123         }
124         if (decoderClass == null) {
125             throw new IllegalArgumentException("decoderClass");
126         }
127         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
128             throw new IllegalArgumentException("encoderClass: "
129                     + encoderClass.getName());
130         }
131         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
132             throw new IllegalArgumentException("decoderClass: "
133                     + decoderClass.getName());
134         }
135         try {
136             encoderClass.getConstructor(EMPTY_PARAMS);
137         } catch (NoSuchMethodException e) {
138             throw new IllegalArgumentException(
139                     "encoderClass doesn't have a public default constructor.");
140         }
141         try {
142             decoderClass.getConstructor(EMPTY_PARAMS);
143         } catch (NoSuchMethodException e) {
144             throw new IllegalArgumentException(
145                     "decoderClass doesn't have a public default constructor.");
146         }
147 
148         final ProtocolEncoder encoder;
149         
150         try {
151             encoder = encoderClass.newInstance();
152         } catch (Exception e) {
153             throw new IllegalArgumentException(
154                 "encoderClass cannot be initialized");
155         }
156 
157         final ProtocolDecoder decoder;
158         
159         try {
160             decoder = decoderClass.newInstance();
161         } catch (Exception e) {
162             throw new IllegalArgumentException(
163                 "decoderClass cannot be initialized");
164         }
165     
166         // Create the inner factory based on the two parameters.
167         this.factory = new ProtocolCodecFactory() {
168             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
169                 return encoder;
170             }
171 
172             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
173                 return decoder;
174             }
175         };
176     }
177 
178     
179     /**
180      * Get the encoder instance from a given session.
181      *
182      * @param session The associated session we will get the encoder from
183      * @return The encoder instance, if any
184      */
185     public ProtocolEncoder getEncoder(IoSession session) {
186         return (ProtocolEncoder) session.getAttribute(ENCODER);
187     }
188 
189     @Override
190     public void onPreAdd(IoFilterChain parent, String name,
191             NextFilter nextFilter) throws Exception {
192         if (parent.contains(this)) {
193             throw new IllegalArgumentException(
194                     "You can't add the same filter instance more than once.  Create another instance and add it.");
195         }
196     }
197 
198     @Override
199     public void onPostRemove(IoFilterChain parent, String name,
200             NextFilter nextFilter) throws Exception {
201         // Clean everything
202         disposeCodec(parent.getSession());
203     }
204 
205     /**
206      * Process the incoming message, calling the session decoder. As the incoming
207      * buffer might contains more than one messages, we have to loop until the decoder
208      * throws an exception.
209      * 
210      *  while ( buffer not empty )
211      *    try 
212      *      decode ( buffer )
213      *    catch
214      *      break;
215      *    
216      */
217     @Override
218     public void messageReceived(NextFilter nextFilter, IoSession session,
219             Object message) throws Exception {
220         LOGGER.debug( "Processing a MESSAGE_RECEIVED for session {}", session.getId() );
221         
222         if (!(message instanceof IoBuffer)) {
223             nextFilter.messageReceived(session, message);
224             return;
225         }
226 
227         IoBuffer in = (IoBuffer) message;
228         ProtocolDecoder decoder = factory.getDecoder(session);
229         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
230         
231         // Loop until we don't have anymore byte in the buffer,
232         // or until the decoder throws an unrecoverable exception or 
233         // can't decoder a message, because there are not enough 
234         // data in the buffer
235         while (in.hasRemaining()) {
236             int oldPos = in.position();
237             
238             try {
239                 synchronized (decoderOut) {
240                     // Call the decoder with the read bytes
241                     decoder.decode(session, in, decoderOut);
242                 }
243                 
244                 // Finish decoding if no exception was thrown.
245                 decoderOut.flush(nextFilter, session);
246             } catch (Throwable t) {
247                 ProtocolDecoderException pde;
248                 if (t instanceof ProtocolDecoderException) {
249                     pde = (ProtocolDecoderException) t;
250                 } else {
251                     pde = new ProtocolDecoderException(t);
252                 }
253                 
254                 if (pde.getHexdump() == null) {
255                     // Generate a message hex dump
256                     int curPos = in.position();
257                     in.position(oldPos);
258                     pde.setHexdump(in.getHexDump());
259                     in.position(curPos);
260                 }
261 
262                 // Fire the exceptionCaught event.
263                 decoderOut.flush(nextFilter, session);
264                 nextFilter.exceptionCaught(session, pde);
265 
266                 // Retry only if the type of the caught exception is
267                 // recoverable and the buffer position has changed.
268                 // We check buffer position additionally to prevent an
269                 // infinite loop.
270                 if (!(t instanceof RecoverableProtocolDecoderException) ||
271                         (in.position() == oldPos)) {
272                     break;
273                 }
274             }
275         }
276     }
277 
278     @Override
279     public void messageSent(NextFilter nextFilter, IoSession session,
280             WriteRequest writeRequest) throws Exception {
281         if (writeRequest instanceof EncodedWriteRequest) {
282             return;
283         }
284 
285         if (writeRequest instanceof MessageWriteRequest) {
286             MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
287             nextFilter.messageSent(session, wrappedRequest.getParentRequest());            
288         }
289         else {
290             nextFilter.messageSent(session, writeRequest);
291         }        
292     }
293 
294     @Override
295     public void filterWrite(NextFilter nextFilter, IoSession session,
296             WriteRequest writeRequest) throws Exception {
297         Object message = writeRequest.getMessage();
298         
299         // Bypass the encoding if the message is contained in a IoBuffer,
300         // as it has already been encoded before
301         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
302             nextFilter.filterWrite(session, writeRequest);
303             return;
304         }
305 
306         // Get the encoder in the session
307         ProtocolEncoder encoder = factory.getEncoder(session);
308 
309         ProtocolEncoderOutput encoderOut = getEncoderOut(session,
310                 nextFilter, writeRequest);
311         
312         if (encoder == null) {
313             throw new ProtocolEncoderException("The encoder is null for the session " + session);
314         }
315         
316         if (encoderOut == null) {
317             throw new ProtocolEncoderException("The encoderOut is null for the session " + session);
318         }
319         
320         try {
321             // Now we can try to encode the response
322             encoder.encode(session, message, encoderOut);
323             
324             // Send it directly
325             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput)encoderOut).getMessageQueue();
326             
327             // Write all the encoded messages now
328             while (!bufferQueue.isEmpty()) {
329                 Object encodedMessage = bufferQueue.poll();
330                 
331                 if (encodedMessage == null) {
332                     break;
333                 }
334 
335                 // Flush only when the buffer has remaining.
336                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
337                     SocketAddress destination = writeRequest.getDestination();
338                     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); 
339 
340                     nextFilter.filterWrite(session, encodedWriteRequest);
341                 }
342             }
343 
344             
345             // Call the next filter
346             nextFilter.filterWrite(session, new MessageWriteRequest(
347                     writeRequest));
348         } catch (Throwable t) {
349             ProtocolEncoderException pee;
350             
351             // Generate the correct exception
352             if (t instanceof ProtocolEncoderException) {
353                 pee = (ProtocolEncoderException) t;
354             } else {
355                 pee = new ProtocolEncoderException(t);
356             }
357             
358             throw pee;
359         }
360     }
361     
362 
363     @Override
364     public void sessionClosed(NextFilter nextFilter, IoSession session)
365             throws Exception {
366         // Call finishDecode() first when a connection is closed.
367         ProtocolDecoder decoder = factory.getDecoder(session);
368         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
369         
370         try {
371             decoder.finishDecode(session, decoderOut);
372         } catch (Throwable t) {
373             ProtocolDecoderException pde;
374             if (t instanceof ProtocolDecoderException) {
375                 pde = (ProtocolDecoderException) t;
376             } else {
377                 pde = new ProtocolDecoderException(t);
378             }
379             throw pde;
380         } finally {
381             // Dispose everything
382             disposeCodec(session);
383             decoderOut.flush(nextFilter, session);
384         }
385 
386         // Call the next filter
387         nextFilter.sessionClosed(session);
388     }
389 
390     private static class EncodedWriteRequest extends DefaultWriteRequest {
391         public EncodedWriteRequest(Object encodedMessage,
392                 WriteFuture future, SocketAddress destination) {
393             super(encodedMessage, future, destination);
394         }
395         
396         public boolean isEncoded() {
397             return true;
398         }
399     }
400 
401     private static class MessageWriteRequest extends WriteRequestWrapper {
402         public MessageWriteRequest(WriteRequest writeRequest) {
403             super(writeRequest);
404         }
405 
406         @Override
407         public Object getMessage() {
408             return EMPTY_BUFFER;
409         }
410         
411         @Override
412         public String toString() {
413             return "MessageWriteRequest, parent : " + super.toString();
414         }
415     }
416 
417     private static class ProtocolDecoderOutputImpl extends
418             AbstractProtocolDecoderOutput {
419         public ProtocolDecoderOutputImpl() {
420             // Do nothing
421         }
422 
423         public void flush(NextFilter nextFilter, IoSession session) {
424             Queue<Object> messageQueue = getMessageQueue();
425             
426             while (!messageQueue.isEmpty()) {
427                 nextFilter.messageReceived(session, messageQueue.poll());
428             }
429         }
430     }
431 
432     private static class ProtocolEncoderOutputImpl extends
433             AbstractProtocolEncoderOutput {
434         private final IoSession session;
435 
436         private final NextFilter nextFilter;
437 
438         private final WriteRequest writeRequest;
439 
440         public ProtocolEncoderOutputImpl(IoSession session,
441                 NextFilter nextFilter, WriteRequest writeRequest) {
442             this.session = session;
443             this.nextFilter = nextFilter;
444             this.writeRequest = writeRequest;
445         }
446 
447         public WriteFuture flush() {
448             Queue<Object> bufferQueue = getMessageQueue();
449             WriteFuture future = null;
450             
451             while (!bufferQueue.isEmpty()) {
452                 Object encodedMessage = bufferQueue.poll();
453 
454                 if (encodedMessage == null) {
455                     break;
456                 }
457 
458                 // Flush only when the buffer has remaining.
459                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
460                     future = new DefaultWriteFuture(session);
461                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
462                             future, writeRequest.getDestination()));
463                 }
464             }
465 
466             if (future == null) {
467                 future = DefaultWriteFuture.newNotWrittenFuture(
468                         session, new NothingWrittenException(writeRequest));
469             }
470 
471             return future;
472         }
473     }
474     
475     //----------- Helper methods ---------------------------------------------
476     /**
477      * Dispose the encoder, decoder, and the callback for the decoded
478      * messages.
479      */
480     private void disposeCodec(IoSession session) {
481         // We just remove the two instances of encoder/decoder to release resources
482         // from the session
483         disposeEncoder(session);
484         disposeDecoder(session);
485         
486         // We also remove the callback  
487         disposeDecoderOut(session);
488     }
489     
490     /**
491      * Dispose the encoder, removing its instance from the
492      * session's attributes, and calling the associated
493      * dispose method.
494      */
495     private void disposeEncoder(IoSession session) {
496         ProtocolEncoder encoder = (ProtocolEncoder) session
497                 .removeAttribute(ENCODER);
498         if (encoder == null) {
499             return;
500         }
501 
502         try {
503             encoder.dispose(session);
504         } catch (Throwable t) {
505             LOGGER.warn(
506                     "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
507         }
508     }
509 
510     /**
511      * Dispose the decoder, removing its instance from the
512      * session's attributes, and calling the associated
513      * dispose method.
514      */
515     private void disposeDecoder(IoSession session) {
516         ProtocolDecoder decoder = (ProtocolDecoder) session
517                 .removeAttribute(DECODER);
518         if (decoder == null) {
519             return;
520         }
521 
522         try {
523             decoder.dispose(session);
524         } catch (Throwable t) {
525             LOGGER.warn(
526                     "Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
527         }
528     }
529 
530     /**
531      * Return a reference to the decoder callback. If it's not already created
532      * and stored into the session, we create a new instance.
533      */
534     private ProtocolDecoderOutput getDecoderOut(IoSession session,
535             NextFilter nextFilter) {
536         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
537         
538         if (out == null) {
539             // Create a new instance, and stores it into the session
540             out = new ProtocolDecoderOutputImpl();
541             session.setAttribute(DECODER_OUT, out);
542         }
543         
544         return out;
545     }
546 
547     private ProtocolEncoderOutput getEncoderOut(IoSession session,
548         NextFilter nextFilter, WriteRequest writeRequest) {
549         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
550         
551         if (out == null) {
552             // Create a new instance, and stores it into the session
553             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
554             session.setAttribute(ENCODER_OUT, out);
555         }
556         
557         return out;
558     }
559 
560     /**
561      * Remove the decoder callback from the session's attributes.
562      */
563     private void disposeDecoderOut(IoSession session) {
564         session.removeAttribute(DECODER_OUT);
565     }
566 }