View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.codec;
21  
22  import java.util.Queue;
23  
24  import org.apache.mina.core.buffer.IoBuffer;
25  import org.apache.mina.util.CircularQueue;
26  
27  /**
28   * A {@link ProtocolEncoderOutput} based on queue.
29   *
30   * @author The Apache MINA Project (dev@mina.apache.org)
31   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
32   */
33  public abstract class AbstractProtocolEncoderOutput implements
34          ProtocolEncoderOutput {
35      private final Queue<Object> messageQueue = new CircularQueue<Object>();
36      private boolean buffersOnly = true;
37  
38      public AbstractProtocolEncoderOutput() {
39      }
40  
41      public Queue<Object> getMessageQueue() {
42          return messageQueue;
43      }
44  
45      public void write(Object encodedMessage) {
46          if (encodedMessage instanceof IoBuffer) {
47              IoBuffer buf = (IoBuffer) encodedMessage;
48              if (buf.hasRemaining()) {
49                  messageQueue.offer(buf);
50              } else {
51                  throw new IllegalArgumentException(
52                          "buf is empty. Forgot to call flip()?");
53              }
54          } else {
55              messageQueue.offer(encodedMessage);
56              buffersOnly = false;
57          }
58      }
59  
60      public void mergeAll() {
61          if (!buffersOnly) {
62              throw new IllegalStateException(
63                      "the encoded message list contains a non-buffer.");
64          }
65          
66          final int size = messageQueue.size();
67  
68          if (size < 2) {
69              // no need to merge!
70              return;
71          }
72  
73          // Get the size of merged BB
74          int sum = 0;
75          for (Object b : messageQueue) {
76              sum += ((IoBuffer) b).remaining();
77          }
78  
79          // Allocate a new BB that will contain all fragments
80          IoBuffer newBuf = IoBuffer.allocate(sum);
81  
82          // and merge all.
83          for (; ;) {
84              IoBuffer buf = (IoBuffer) messageQueue.poll();
85              if (buf == null) {
86                  break;
87              }
88  
89              newBuf.put(buf);
90          }
91  
92          // Push the new buffer finally.
93          newBuf.flip();
94          messageQueue.add(newBuf);
95      }
96  }