001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.codec;
021
022import java.net.SocketAddress;
023import java.util.Queue;
024
025import org.apache.mina.core.buffer.IoBuffer;
026import org.apache.mina.core.file.FileRegion;
027import org.apache.mina.core.filterchain.IoFilter;
028import org.apache.mina.core.filterchain.IoFilterAdapter;
029import org.apache.mina.core.filterchain.IoFilterChain;
030import org.apache.mina.core.future.DefaultWriteFuture;
031import org.apache.mina.core.future.WriteFuture;
032import org.apache.mina.core.session.AttributeKey;
033import org.apache.mina.core.session.IoSession;
034import org.apache.mina.core.write.DefaultWriteRequest;
035import org.apache.mina.core.write.NothingWrittenException;
036import org.apache.mina.core.write.WriteRequest;
037import org.apache.mina.core.write.WriteRequestWrapper;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * An {@link IoFilter} which translates binary or protocol specific data into
043 * message objects and vice versa using {@link ProtocolCodecFactory},
044 * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
045 *
046 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
047 * @org.apache.xbean.XBean
048 */
049public class ProtocolCodecFilter extends IoFilterAdapter {
050    /** A logger for this class */
051    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
052
053    private static final Class<?>[] EMPTY_PARAMS = new Class[0];
054
055    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
056
057    private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
058
059    private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
060
061    private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
062
063    private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
064
065    /** The factory responsible for creating the encoder and decoder */
066    private final ProtocolCodecFactory factory;
067
068    /**
069     * Creates a new instance of ProtocolCodecFilter, associating a factory
070     * for the creation of the encoder and decoder.
071     *
072     * @param factory The associated factory
073     */
074    public ProtocolCodecFilter(ProtocolCodecFactory factory) {
075        if (factory == null) {
076            throw new IllegalArgumentException("factory");
077        }
078
079        this.factory = factory;
080    }
081
082    /**
083     * Creates a new instance of ProtocolCodecFilter, without any factory.
084     * The encoder/decoder factory will be created as an inner class, using
085     * the two parameters (encoder and decoder).
086     * 
087     * @param encoder The class responsible for encoding the message
088     * @param decoder The class responsible for decoding the message
089     */
090    public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
091        if (encoder == null) {
092            throw new IllegalArgumentException("encoder");
093        }
094        if (decoder == null) {
095            throw new IllegalArgumentException("decoder");
096        }
097
098        // Create the inner Factory based on the two parameters
099        this.factory = new ProtocolCodecFactory() {
100            public ProtocolEncoder getEncoder(IoSession session) {
101                return encoder;
102            }
103
104            public ProtocolDecoder getDecoder(IoSession session) {
105                return decoder;
106            }
107        };
108    }
109
110    /**
111     * Creates a new instance of ProtocolCodecFilter, without any factory.
112     * The encoder/decoder factory will be created as an inner class, using
113     * the two parameters (encoder and decoder), which are class names. Instances
114     * for those classes will be created in this constructor.
115     * 
116     * @param encoderClass The class responsible for encoding the message
117     * @param decoderClass The class responsible for decoding the message
118     */
119    public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
120            final Class<? extends ProtocolDecoder> decoderClass) {
121        if (encoderClass == null) {
122            throw new IllegalArgumentException("encoderClass");
123        }
124        if (decoderClass == null) {
125            throw new IllegalArgumentException("decoderClass");
126        }
127        if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
128            throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
129        }
130        if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
131            throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
132        }
133        try {
134            encoderClass.getConstructor(EMPTY_PARAMS);
135        } catch (NoSuchMethodException e) {
136            throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
137        }
138        try {
139            decoderClass.getConstructor(EMPTY_PARAMS);
140        } catch (NoSuchMethodException e) {
141            throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
142        }
143
144        final ProtocolEncoder encoder;
145
146        try {
147            encoder = encoderClass.newInstance();
148        } catch (Exception e) {
149            throw new IllegalArgumentException("encoderClass cannot be initialized");
150        }
151
152        final ProtocolDecoder decoder;
153
154        try {
155            decoder = decoderClass.newInstance();
156        } catch (Exception e) {
157            throw new IllegalArgumentException("decoderClass cannot be initialized");
158        }
159
160        // Create the inner factory based on the two parameters.
161        this.factory = new ProtocolCodecFactory() {
162            public ProtocolEncoder getEncoder(IoSession session) throws Exception {
163                return encoder;
164            }
165
166            public ProtocolDecoder getDecoder(IoSession session) throws Exception {
167                return decoder;
168            }
169        };
170    }
171
172    /**
173     * Get the encoder instance from a given session.
174     *
175     * @param session The associated session we will get the encoder from
176     * @return The encoder instance, if any
177     */
178    public ProtocolEncoder getEncoder(IoSession session) {
179        return (ProtocolEncoder) session.getAttribute(ENCODER);
180    }
181
182    @Override
183    public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
184        if (parent.contains(this)) {
185            throw new IllegalArgumentException(
186                    "You can't add the same filter instance more than once.  Create another instance and add it.");
187        }
188    }
189
190    @Override
191    public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
192        // Clean everything
193        disposeCodec(parent.getSession());
194    }
195
196    /**
197     * Process the incoming message, calling the session decoder. As the incoming
198     * buffer might contains more than one messages, we have to loop until the decoder
199     * throws an exception.
200     * 
201     *  while ( buffer not empty )
202     *    try
203     *      decode ( buffer )
204     *    catch
205     *      break;
206     * 
207     */
208    @Override
209    public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
210        LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
211
212        if (!(message instanceof IoBuffer)) {
213            nextFilter.messageReceived(session, message);
214            return;
215        }
216
217        IoBuffer in = (IoBuffer) message;
218        ProtocolDecoder decoder = factory.getDecoder(session);
219        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
220
221        // Loop until we don't have anymore byte in the buffer,
222        // or until the decoder throws an unrecoverable exception or
223        // can't decoder a message, because there are not enough
224        // data in the buffer
225        while (in.hasRemaining()) {
226            int oldPos = in.position();
227            try {
228                synchronized (session) {
229                    // Call the decoder with the read bytes
230                    decoder.decode(session, in, decoderOut);
231                }
232                // Finish decoding if no exception was thrown.
233                decoderOut.flush(nextFilter, session);
234            } catch (Exception e) {
235                ProtocolDecoderException pde;
236                if (e instanceof ProtocolDecoderException) {
237                    pde = (ProtocolDecoderException) e;
238                } else {
239                    pde = new ProtocolDecoderException(e);
240                }
241                if (pde.getHexdump() == null) {
242                    // Generate a message hex dump
243                    int curPos = in.position();
244                    in.position(oldPos);
245                    pde.setHexdump(in.getHexDump());
246                    in.position(curPos);
247                }
248                // Fire the exceptionCaught event.
249                decoderOut.flush(nextFilter, session);
250                nextFilter.exceptionCaught(session, pde);
251                // Retry only if the type of the caught exception is
252                // recoverable and the buffer position has changed.
253                // We check buffer position additionally to prevent an
254                // infinite loop.
255                if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
256                    break;
257                }
258            }
259        }
260    }
261
262    @Override
263    public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
264        if (writeRequest instanceof EncodedWriteRequest) {
265            return;
266        }
267
268        if (writeRequest instanceof MessageWriteRequest) {
269            MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
270            nextFilter.messageSent(session, wrappedRequest.getParentRequest());
271        } else {
272            nextFilter.messageSent(session, writeRequest);
273        }
274    }
275
276    @Override
277    public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
278        Object message = writeRequest.getMessage();
279
280        // Bypass the encoding if the message is contained in a IoBuffer,
281        // as it has already been encoded before
282        if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
283            nextFilter.filterWrite(session, writeRequest);
284            return;
285        }
286
287        // Get the encoder in the session
288        ProtocolEncoder encoder = factory.getEncoder(session);
289
290        ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
291
292        if (encoder == null) {
293            throw new ProtocolEncoderException("The encoder is null for the session " + session);
294        }
295
296        try {
297            // Now we can try to encode the response
298            encoder.encode(session, message, encoderOut);
299
300            // Send it directly
301            Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
302
303            // Write all the encoded messages now
304            while (!bufferQueue.isEmpty()) {
305                Object encodedMessage = bufferQueue.poll();
306
307                if (encodedMessage == null) {
308                    break;
309                }
310
311                // Flush only when the buffer has remaining.
312                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
313                    SocketAddress destination = writeRequest.getDestination();
314                    WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
315
316                    nextFilter.filterWrite(session, encodedWriteRequest);
317                }
318            }
319
320            // Call the next filter
321            nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
322        } catch (Exception e) {
323            ProtocolEncoderException pee;
324
325            // Generate the correct exception
326            if (e instanceof ProtocolEncoderException) {
327                pee = (ProtocolEncoderException) e;
328            } else {
329                pee = new ProtocolEncoderException(e);
330            }
331
332            throw pee;
333        }
334    }
335
336    @Override
337    public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
338        // Call finishDecode() first when a connection is closed.
339        ProtocolDecoder decoder = factory.getDecoder(session);
340        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
341
342        try {
343            decoder.finishDecode(session, decoderOut);
344        } catch (Exception e) {
345            ProtocolDecoderException pde;
346            if (e instanceof ProtocolDecoderException) {
347                pde = (ProtocolDecoderException) e;
348            } else {
349                pde = new ProtocolDecoderException(e);
350            }
351            throw pde;
352        } finally {
353            // Dispose everything
354            disposeCodec(session);
355            decoderOut.flush(nextFilter, session);
356        }
357
358        // Call the next filter
359        nextFilter.sessionClosed(session);
360    }
361
362    private static class EncodedWriteRequest extends DefaultWriteRequest {
363        public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
364            super(encodedMessage, future, destination);
365        }
366
367        public boolean isEncoded() {
368            return true;
369        }
370    }
371
372    private static class MessageWriteRequest extends WriteRequestWrapper {
373        public MessageWriteRequest(WriteRequest writeRequest) {
374            super(writeRequest);
375        }
376
377        @Override
378        public Object getMessage() {
379            return EMPTY_BUFFER;
380        }
381
382        @Override
383        public String toString() {
384            return "MessageWriteRequest, parent : " + super.toString();
385        }
386    }
387
388    private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
389        public ProtocolDecoderOutputImpl() {
390            // Do nothing
391        }
392
393        public void flush(NextFilter nextFilter, IoSession session) {
394            Queue<Object> messageQueue = getMessageQueue();
395
396            while (!messageQueue.isEmpty()) {
397                nextFilter.messageReceived(session, messageQueue.poll());
398            }
399        }
400    }
401
402    private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
403        private final IoSession session;
404
405        private final NextFilter nextFilter;
406
407        /** The WriteRequest destination */
408        private final SocketAddress destination;
409
410        public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
411            this.session = session;
412            this.nextFilter = nextFilter;
413
414            // Only store the destination, not the full WriteRequest.
415            destination = writeRequest.getDestination();
416        }
417
418        public WriteFuture flush() {
419            Queue<Object> bufferQueue = getMessageQueue();
420            WriteFuture future = null;
421
422            while (!bufferQueue.isEmpty()) {
423                Object encodedMessage = bufferQueue.poll();
424
425                if (encodedMessage == null) {
426                    break;
427                }
428
429                // Flush only when the buffer has remaining.
430                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
431                    future = new DefaultWriteFuture(session);
432                    nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
433                }
434            }
435
436            if (future == null) {
437                // Creates an empty writeRequest containing the destination
438                WriteRequest writeRequest = new DefaultWriteRequest(
439                        DefaultWriteRequest.EMPTY_MESSAGE, null, destination);
440                future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(writeRequest));
441            }
442
443            return future;
444        }
445    }
446
447    //----------- Helper methods ---------------------------------------------
448    /**
449     * Dispose the encoder, decoder, and the callback for the decoded
450     * messages.
451     */
452    private void disposeCodec(IoSession session) {
453        // We just remove the two instances of encoder/decoder to release resources
454        // from the session
455        disposeEncoder(session);
456        disposeDecoder(session);
457
458        // We also remove the callback
459        disposeDecoderOut(session);
460    }
461
462    /**
463     * Dispose the encoder, removing its instance from the
464     * session's attributes, and calling the associated
465     * dispose method.
466     */
467    private void disposeEncoder(IoSession session) {
468        ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
469        if (encoder == null) {
470            return;
471        }
472
473        try {
474            encoder.dispose(session);
475        } catch (Exception e) {
476            LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
477        }
478    }
479
480    /**
481     * Dispose the decoder, removing its instance from the
482     * session's attributes, and calling the associated
483     * dispose method.
484     */
485    private void disposeDecoder(IoSession session) {
486        ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
487        if (decoder == null) {
488            return;
489        }
490
491        try {
492            decoder.dispose(session);
493        } catch (Exception e) {
494            LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
495        }
496    }
497
498    /**
499     * Return a reference to the decoder callback. If it's not already created
500     * and stored into the session, we create a new instance.
501     */
502    private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
503        ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
504
505        if (out == null) {
506            // Create a new instance, and stores it into the session
507            out = new ProtocolDecoderOutputImpl();
508            session.setAttribute(DECODER_OUT, out);
509        }
510
511        return out;
512    }
513
514    private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
515        ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
516
517        if (out == null) {
518            // Create a new instance, and stores it into the session
519            out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
520            session.setAttribute(ENCODER_OUT, out);
521        }
522
523        return out;
524    }
525
526    /**
527     * Remove the decoder callback from the session's attributes.
528     */
529    private void disposeDecoderOut(IoSession session) {
530        session.removeAttribute(DECODER_OUT);
531    }
532}