1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26
27
28
29
30
31
32 public abstract class AbstractProtocolEncoderOutput implements
33 ProtocolEncoderOutput {
34 private final Queue<Object> messageQueue = new ConcurrentLinkedQueue<Object>();
35
36 private boolean buffersOnly = true;
37
38 public AbstractProtocolEncoderOutput() {
39
40 }
41
42 public Queue<Object> getMessageQueue() {
43 return messageQueue;
44 }
45
46 public void write(Object encodedMessage) {
47 if (encodedMessage instanceof IoBuffer) {
48 IoBuffer buf = (IoBuffer) encodedMessage;
49 if (buf.hasRemaining()) {
50 messageQueue.offer(buf);
51 } else {
52 throw new IllegalArgumentException(
53 "buf is empty. Forgot to call flip()?");
54 }
55 } else {
56 messageQueue.offer(encodedMessage);
57 buffersOnly = false;
58 }
59 }
60
61 public void mergeAll() {
62 if (!buffersOnly) {
63 throw new IllegalStateException(
64 "the encoded message list contains a non-buffer.");
65 }
66
67 final int size = messageQueue.size();
68
69 if (size < 2) {
70
71 return;
72 }
73
74
75 int sum = 0;
76 for (Object b : messageQueue) {
77 sum += ((IoBuffer) b).remaining();
78 }
79
80
81 IoBuffer newBuf = IoBuffer.allocate(sum);
82
83
84 for (; ;) {
85 IoBuffer buf = (IoBuffer) messageQueue.poll();
86 if (buf == null) {
87 break;
88 }
89
90 newBuf.put(buf);
91 }
92
93
94 newBuf.flip();
95 messageQueue.add(newBuf);
96 }
97 }