001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.filter.codec;
021
022import java.util.Queue;
023import java.util.concurrent.ConcurrentLinkedQueue;
024
025import org.apache.mina.core.buffer.IoBuffer;
026
027/**
028 * A {@link ProtocolEncoderOutput} based on queue.
029 *
030 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
031 */
032public abstract class AbstractProtocolEncoderOutput implements ProtocolEncoderOutput {
033    private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<Object>();
034
035    private boolean buffersOnly = true;
036
037    public AbstractProtocolEncoderOutput() {
038        // Do nothing
039    }
040
041    public Queue<Object> getMessageQueue() {
042        return messageQueue;
043    }
044
045    public void write(Object encodedMessage) {
046        if (encodedMessage instanceof IoBuffer) {
047            IoBuffer buf = (IoBuffer) encodedMessage;
048            if (buf.hasRemaining()) {
049                messageQueue.offer(buf);
050            } else {
051                throw new IllegalArgumentException("buf is empty. Forgot to call flip()?");
052            }
053        } else {
054            messageQueue.offer(encodedMessage);
055            buffersOnly = false;
056        }
057    }
058
059    public void mergeAll() {
060        if (!buffersOnly) {
061            throw new IllegalStateException("the encoded message list contains a non-buffer.");
062        }
063
064        final int size = messageQueue.size();
065
066        if (size < 2) {
067            // no need to merge!
068            return;
069        }
070
071        // Get the size of merged BB
072        int sum = 0;
073        for (Object b : messageQueue) {
074            sum += ((IoBuffer) b).remaining();
075        }
076
077        // Allocate a new BB that will contain all fragments
078        IoBuffer newBuf = IoBuffer.allocate(sum);
079
080        // and merge all.
081        for (;;) {
082            IoBuffer buf = (IoBuffer) messageQueue.poll();
083            if (buf == null) {
084                break;
085            }
086
087            newBuf.put(buf);
088        }
089
090        // Push the new buffer finally.
091        newBuf.flip();
092        messageQueue.add(newBuf);
093    }
094}