001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.filter.codec; 021 022import java.util.Queue; 023import java.util.concurrent.ConcurrentLinkedQueue; 024 025import org.apache.mina.core.buffer.IoBuffer; 026 027/** 028 * A {@link ProtocolEncoderOutput} based on queue. 029 * 030 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 031 */ 032public abstract class AbstractProtocolEncoderOutput implements ProtocolEncoderOutput { 033 private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<Object>(); 034 035 private boolean buffersOnly = true; 036 037 public AbstractProtocolEncoderOutput() { 038 // Do nothing 039 } 040 041 public Queue<Object> getMessageQueue() { 042 return messageQueue; 043 } 044 045 public void write(Object encodedMessage) { 046 if (encodedMessage instanceof IoBuffer) { 047 IoBuffer buf = (IoBuffer) encodedMessage; 048 if (buf.hasRemaining()) { 049 messageQueue.offer(buf); 050 } else { 051 throw new IllegalArgumentException("buf is empty. Forgot to call flip()?"); 052 } 053 } else { 054 messageQueue.offer(encodedMessage); 055 buffersOnly = false; 056 } 057 } 058 059 public void mergeAll() { 060 if (!buffersOnly) { 061 throw new IllegalStateException("the encoded message list contains a non-buffer."); 062 } 063 064 final int size = messageQueue.size(); 065 066 if (size < 2) { 067 // no need to merge! 068 return; 069 } 070 071 // Get the size of merged BB 072 int sum = 0; 073 for (Object b : messageQueue) { 074 sum += ((IoBuffer) b).remaining(); 075 } 076 077 // Allocate a new BB that will contain all fragments 078 IoBuffer newBuf = IoBuffer.allocate(sum); 079 080 // and merge all. 081 for (;;) { 082 IoBuffer buf = (IoBuffer) messageQueue.poll(); 083 if (buf == null) { 084 break; 085 } 086 087 newBuf.put(buf); 088 } 089 090 // Push the new buffer finally. 091 newBuf.flip(); 092 messageQueue.add(newBuf); 093 } 094}