View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.net.SocketAddress;
23  import java.util.Queue;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.file.FileRegion;
27  import org.apache.mina.core.filterchain.IoFilter;
28  import org.apache.mina.core.filterchain.IoFilterAdapter;
29  import org.apache.mina.core.filterchain.IoFilterChain;
30  import org.apache.mina.core.future.DefaultWriteFuture;
31  import org.apache.mina.core.future.WriteFuture;
32  import org.apache.mina.core.session.AbstractIoSession;
33  import org.apache.mina.core.session.AttributeKey;
34  import org.apache.mina.core.session.IoSession;
35  import org.apache.mina.core.write.DefaultWriteRequest;
36  import org.apache.mina.core.write.NothingWrittenException;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.apache.mina.core.write.WriteRequestWrapper;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   * An {@link IoFilter} which translates binary or protocol specific data into
44   * message objects and vice versa using {@link ProtocolCodecFactory},
45   * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
46   *
47   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
48   * @org.apache.xbean.XBean
49   */
50  public class ProtocolCodecFilter extends IoFilterAdapter {
51      /** A logger for this class */
52      private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
53  
54      private static final Class<?>[] EMPTY_PARAMS = new Class[0];
55  
56      private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
57  
58      private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
59  
60      private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
61  
62      private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
63  
64      private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
65  
66      /** The factory responsible for creating the encoder and decoder */
67      private final ProtocolCodecFactory factory;
68  
69      /**
70       * Creates a new instance of ProtocolCodecFilter, associating a factory
71       * for the creation of the encoder and decoder.
72       *
73       * @param factory The associated factory
74       */
75      public ProtocolCodecFilter(ProtocolCodecFactory factory) {
76          if (factory == null) {
77              throw new IllegalArgumentException("factory");
78          }
79  
80          this.factory = factory;
81      }
82  
83      /**
84       * Creates a new instance of ProtocolCodecFilter, without any factory.
85       * The encoder/decoder factory will be created as an inner class, using
86       * the two parameters (encoder and decoder).
87       * 
88       * @param encoder The class responsible for encoding the message
89       * @param decoder The class responsible for decoding the message
90       */
91      public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
92          if (encoder == null) {
93              throw new IllegalArgumentException("encoder");
94          }
95          if (decoder == null) {
96              throw new IllegalArgumentException("decoder");
97          }
98  
99          // Create the inner Factory based on the two parameters
100         this.factory = new ProtocolCodecFactory() {
101             /**
102              * {@inheritDoc}
103              */
104             @Override
105             public ProtocolEncoder getEncoder(IoSession session) {
106                 return encoder;
107             }
108 
109             /**
110              * {@inheritDoc}
111              */
112             @Override
113             public ProtocolDecoder getDecoder(IoSession session) {
114                 return decoder;
115             }
116         };
117     }
118 
119     /**
120      * Creates a new instance of ProtocolCodecFilter, without any factory.
121      * The encoder/decoder factory will be created as an inner class, using
122      * the two parameters (encoder and decoder), which are class names. Instances
123      * for those classes will be created in this constructor.
124      * 
125      * @param encoderClass The class responsible for encoding the message
126      * @param decoderClass The class responsible for decoding the message
127      */
128     public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
129             final Class<? extends ProtocolDecoder> decoderClass) {
130         if (encoderClass == null) {
131             throw new IllegalArgumentException("encoderClass");
132         }
133         if (decoderClass == null) {
134             throw new IllegalArgumentException("decoderClass");
135         }
136         if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
137             throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
138         }
139         if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
140             throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
141         }
142         try {
143             encoderClass.getConstructor(EMPTY_PARAMS);
144         } catch (NoSuchMethodException e) {
145             throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
146         }
147         try {
148             decoderClass.getConstructor(EMPTY_PARAMS);
149         } catch (NoSuchMethodException e) {
150             throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
151         }
152 
153         final ProtocolEncoder encoder;
154 
155         try {
156             encoder = encoderClass.newInstance();
157         } catch (Exception e) {
158             throw new IllegalArgumentException("encoderClass cannot be initialized");
159         }
160 
161         final ProtocolDecoder decoder;
162 
163         try {
164             decoder = decoderClass.newInstance();
165         } catch (Exception e) {
166             throw new IllegalArgumentException("decoderClass cannot be initialized");
167         }
168 
169         // Create the inner factory based on the two parameters.
170         this.factory = new ProtocolCodecFactory() {
171             /**
172              * {@inheritDoc}
173              */
174             @Override
175             public ProtocolEncoder getEncoder(IoSession session) throws Exception {
176                 return encoder;
177             }
178 
179             /**
180              * {@inheritDoc}
181              */
182             @Override
183             public ProtocolDecoder getDecoder(IoSession session) throws Exception {
184                 return decoder;
185             }
186         };
187     }
188 
189     /**
190      * Get the encoder instance from a given session.
191      *
192      * @param session The associated session we will get the encoder from
193      * @return The encoder instance, if any
194      */
195     public ProtocolEncoder getEncoder(IoSession session) {
196         return (ProtocolEncoder) session.getAttribute(ENCODER);
197     }
198 
199     /**
200      * {@inheritDoc}
201      */
202     @Override
203     public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
204         if (parent.contains(this)) {
205             throw new IllegalArgumentException(
206                     "You can't add the same filter instance more than once.  Create another instance and add it.");
207         }
208     }
209 
210     /**
211      * {@inheritDoc}
212      */
213     @Override
214     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
215         // Clean everything
216         disposeCodec(parent.getSession());
217     }
218 
219     /**
220      * Process the incoming message, calling the session decoder. As the incoming
221      * buffer might contains more than one messages, we have to loop until the decoder
222      * throws an exception.
223      * 
224      *  while ( buffer not empty )
225      *    try
226      *      decode ( buffer )
227      *    catch
228      *      break;
229      * 
230      */
231     @Override
232     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
233         LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
234 
235         if (!(message instanceof IoBuffer)) {
236             nextFilter.messageReceived(session, message);
237             return;
238         }
239 
240         IoBuffer in = (IoBuffer) message;
241         ProtocolDecoder decoder = factory.getDecoder(session);
242         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
243 
244         // Loop until we don't have anymore byte in the buffer,
245         // or until the decoder throws an unrecoverable exception or
246         // can't decoder a message, because there are not enough
247         // data in the buffer
248         while (in.hasRemaining()) {
249             int oldPos = in.position();
250             try {
251                 synchronized (session) {
252                     // Call the decoder with the read bytes
253                     decoder.decode(session, in, decoderOut);
254                 }
255                 // Finish decoding if no exception was thrown.
256                 decoderOut.flush(nextFilter, session);
257             } catch (Exception e) {
258                 ProtocolDecoderException pde;
259                 if (e instanceof ProtocolDecoderException) {
260                     pde = (ProtocolDecoderException) e;
261                 } else {
262                     pde = new ProtocolDecoderException(e);
263                 }
264                 if (pde.getHexdump() == null) {
265                     // Generate a message hex dump
266                     int curPos = in.position();
267                     in.position(oldPos);
268                     pde.setHexdump(in.getHexDump());
269                     in.position(curPos);
270                 }
271                 // Fire the exceptionCaught event.
272                 decoderOut.flush(nextFilter, session);
273                 nextFilter.exceptionCaught(session, pde);
274                 // Retry only if the type of the caught exception is
275                 // recoverable and the buffer position has changed.
276                 // We check buffer position additionally to prevent an
277                 // infinite loop.
278                 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
279                     break;
280                 }
281             }
282         }
283     }
284 
285     /**
286      * {@inheritDoc}
287      */
288     @Override
289     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
290         if (writeRequest instanceof EncodedWriteRequest) {
291             return;
292         }
293 
294         if (writeRequest instanceof MessageWriteRequest) {
295             MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
296             nextFilter.messageSent(session, wrappedRequest.getParentRequest());
297         } else {
298             nextFilter.messageSent(session, writeRequest);
299         }
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
307         Object message = writeRequest.getMessage();
308 
309         // Bypass the encoding if the message is contained in a IoBuffer,
310         // as it has already been encoded before
311         if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
312             nextFilter.filterWrite(session, writeRequest);
313             return;
314         }
315 
316         // Get the encoder in the session
317         ProtocolEncoder encoder = factory.getEncoder(session);
318 
319         ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
320 
321         if (encoder == null) {
322             throw new ProtocolEncoderException("The encoder is null for the session " + session);
323         }
324 
325         try {
326             // Now we can try to encode the response
327             encoder.encode(session, message, encoderOut);
328 
329             // Send it directly
330             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
331 
332             // Write all the encoded messages now
333             while (!bufferQueue.isEmpty()) {
334                 Object encodedMessage = bufferQueue.poll();
335 
336                 if (encodedMessage == null) {
337                     break;
338                 }
339 
340                 // Flush only when the buffer has remaining.
341                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
342                     SocketAddress destination = writeRequest.getDestination();
343                     WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
344 
345                     nextFilter.filterWrite(session, encodedWriteRequest);
346                 }
347             }
348 
349             // Call the next filter
350             nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
351         } catch (Exception e) {
352             ProtocolEncoderException pee;
353 
354             // Generate the correct exception
355             if (e instanceof ProtocolEncoderException) {
356                 pee = (ProtocolEncoderException) e;
357             } else {
358                 pee = new ProtocolEncoderException(e);
359             }
360 
361             throw pee;
362         }
363     }
364 
365     /**
366      * {@inheritDoc}
367      */
368     @Override
369     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
370         // Call finishDecode() first when a connection is closed.
371         ProtocolDecoder decoder = factory.getDecoder(session);
372         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
373 
374         try {
375             decoder.finishDecode(session, decoderOut);
376         } catch (Exception e) {
377             ProtocolDecoderException pde;
378             if (e instanceof ProtocolDecoderException) {
379                 pde = (ProtocolDecoderException) e;
380             } else {
381                 pde = new ProtocolDecoderException(e);
382             }
383             throw pde;
384         } finally {
385             // Dispose everything
386             disposeCodec(session);
387             decoderOut.flush(nextFilter, session);
388         }
389 
390         // Call the next filter
391         nextFilter.sessionClosed(session);
392     }
393 
394     private static class EncodedWriteRequest extends DefaultWriteRequest {
395         public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
396             super(encodedMessage, future, destination);
397         }
398 
399         /**
400          * {@inheritDoc}
401          */
402         @Override
403         public boolean isEncoded() {
404             return true;
405         }
406     }
407 
408     private static class MessageWriteRequest extends WriteRequestWrapper {
409         public MessageWriteRequest(WriteRequest writeRequest) {
410             super(writeRequest);
411         }
412 
413         @Override
414         public Object getMessage() {
415             return EMPTY_BUFFER;
416         }
417 
418         @Override
419         public String toString() {
420             return "MessageWriteRequest, parent : " + super.toString();
421         }
422     }
423 
424     private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
425         public ProtocolDecoderOutputImpl() {
426             // Do nothing
427         }
428 
429         /**
430          * {@inheritDoc}
431          */
432         @Override
433         public void flush(NextFilter nextFilter, IoSession session) {
434             Queue<Object> messageQueue = getMessageQueue();
435 
436             while (!messageQueue.isEmpty()) {
437                 nextFilter.messageReceived(session, messageQueue.poll());
438             }
439         }
440     }
441 
442     private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
443         private final IoSession session;
444 
445         private final NextFilter nextFilter;
446 
447         /** The WriteRequest destination */
448         private final SocketAddress destination;
449 
450         public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
451             this.session = session;
452             this.nextFilter = nextFilter;
453 
454             // Only store the destination, not the full WriteRequest.
455             destination = writeRequest.getDestination();
456         }
457 
458         /**
459          * {@inheritDoc}
460          */
461         @Override
462         public WriteFuture flush() {
463             Queue<Object> bufferQueue = getMessageQueue();
464             WriteFuture future = null;
465 
466             while (!bufferQueue.isEmpty()) {
467                 Object encodedMessage = bufferQueue.poll();
468 
469                 if (encodedMessage == null) {
470                     break;
471                 }
472 
473                 // Flush only when the buffer has remaining.
474                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
475                     future = new DefaultWriteFuture(session);
476                     nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
477                 }
478             }
479 
480             if (future == null) {
481                 // Creates an empty writeRequest containing the destination
482                 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
483             }
484 
485             return future;
486         }
487     }
488 
489     //----------- Helper methods ---------------------------------------------
490     /**
491      * Dispose the encoder, decoder, and the callback for the decoded
492      * messages.
493      */
494     private void disposeCodec(IoSession session) {
495         // We just remove the two instances of encoder/decoder to release resources
496         // from the session
497         disposeEncoder(session);
498         disposeDecoder(session);
499 
500         // We also remove the callback
501         disposeDecoderOut(session);
502     }
503 
504     /**
505      * Dispose the encoder, removing its instance from the
506      * session's attributes, and calling the associated
507      * dispose method.
508      */
509     private void disposeEncoder(IoSession session) {
510         ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
511         if (encoder == null) {
512             return;
513         }
514 
515         try {
516             encoder.dispose(session);
517         } catch (Exception e) {
518             LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
519         }
520     }
521 
522     /**
523      * Dispose the decoder, removing its instance from the
524      * session's attributes, and calling the associated
525      * dispose method.
526      */
527     private void disposeDecoder(IoSession session) {
528         ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
529         if (decoder == null) {
530             return;
531         }
532 
533         try {
534             decoder.dispose(session);
535         } catch (Exception e) {
536             LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
537         }
538     }
539 
540     /**
541      * Return a reference to the decoder callback. If it's not already created
542      * and stored into the session, we create a new instance.
543      */
544     private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
545         ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
546 
547         if (out == null) {
548             // Create a new instance, and stores it into the session
549             out = new ProtocolDecoderOutputImpl();
550             session.setAttribute(DECODER_OUT, out);
551         }
552 
553         return out;
554     }
555 
556     private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
557         ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
558 
559         if (out == null) {
560             // Create a new instance, and stores it into the session
561             out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
562             session.setAttribute(ENCODER_OUT, out);
563         }
564 
565         return out;
566     }
567 
568     /**
569      * Remove the decoder callback from the session's attributes.
570      */
571     private void disposeDecoderOut(IoSession session) {
572         session.removeAttribute(DECODER_OUT);
573     }
574 }