1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.stream;
21
22 import java.io.IOException;
23 import java.util.Queue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.filterchain.IoFilterAdapter;
27 import org.apache.mina.core.filterchain.IoFilterChain;
28 import org.apache.mina.core.session.AttributeKey;
29 import org.apache.mina.core.session.IoSession;
30 import org.apache.mina.core.write.DefaultWriteRequest;
31 import org.apache.mina.core.write.WriteRequest;
32 import org.apache.mina.util.CircularQueue;
33
34
35
36
37
38
39 public abstract class AbstractStreamWriteFilter<T> extends IoFilterAdapter {
40
41
42
43 public static final int DEFAULT_STREAM_BUFFER_SIZE = 4096;
44
45
46
47
48 protected final AttributeKey CURRENT_STREAM = new AttributeKey(getClass(), "stream");
49
50 protected final AttributeKey WRITE_REQUEST_QUEUE = new AttributeKey(getClass(), "queue");
51 protected final AttributeKey CURRENT_WRITE_REQUEST = new AttributeKey(getClass(), "writeRequest");
52
53 private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
54
55
56 @Override
57 public void onPreAdd(IoFilterChain parent, String name,
58 NextFilter nextFilter) throws Exception {
59 Class<? extends IoFilterAdapter> clazz = getClass();
60 if (parent.contains(clazz)) {
61 throw new IllegalStateException(
62 "Only one " + clazz.getName() + " is permitted.");
63 }
64 }
65
66 @Override
67 public void filterWrite(NextFilter nextFilter, IoSession session,
68 WriteRequest writeRequest) throws Exception {
69
70 if (session.getAttribute(CURRENT_STREAM) != null) {
71 Queue<WriteRequest> queue = getWriteRequestQueue(session);
72 if (queue == null) {
73 queue = new CircularQueue<WriteRequest>();
74 session.setAttribute(WRITE_REQUEST_QUEUE, queue);
75 }
76 queue.add(writeRequest);
77 return;
78 }
79
80 Object message = writeRequest.getMessage();
81
82 if (getMessageClass().isInstance(message)) {
83
84 T stream = getMessageClass().cast(message);
85
86 IoBuffer buffer = getNextBuffer(stream);
87 if (buffer == null) {
88
89 writeRequest.getFuture().setWritten();
90 nextFilter.messageSent(session, writeRequest);
91 } else {
92 session.setAttribute(CURRENT_STREAM, message);
93 session.setAttribute(CURRENT_WRITE_REQUEST, writeRequest);
94
95 nextFilter.filterWrite(session, new DefaultWriteRequest(
96 buffer));
97 }
98
99 } else {
100 nextFilter.filterWrite(session, writeRequest);
101 }
102 }
103
104 abstract protected Class<T> getMessageClass();
105
106 @SuppressWarnings("unchecked")
107 private Queue<WriteRequest> getWriteRequestQueue(IoSession session) {
108 return (Queue<WriteRequest>) session.getAttribute(WRITE_REQUEST_QUEUE);
109 }
110
111 @SuppressWarnings("unchecked")
112 private Queue<WriteRequest> removeWriteRequestQueue(IoSession session) {
113 return (Queue<WriteRequest>) session.removeAttribute(WRITE_REQUEST_QUEUE);
114 }
115
116 @Override
117 public void messageSent(NextFilter nextFilter, IoSession session,
118 WriteRequest writeRequest) throws Exception {
119 T stream = getMessageClass().cast(session.getAttribute(CURRENT_STREAM));
120
121 if (stream == null) {
122 nextFilter.messageSent(session, writeRequest);
123 } else {
124 IoBuffer buffer = getNextBuffer(stream);
125
126 if (buffer == null) {
127
128 session.removeAttribute(CURRENT_STREAM);
129 WriteRequest currentWriteRequest = (WriteRequest) session
130 .removeAttribute(CURRENT_WRITE_REQUEST);
131
132
133 Queue<WriteRequest> queue = removeWriteRequestQueue(session);
134 if (queue != null) {
135 WriteRequest wr = queue.poll();
136 while (wr != null) {
137 filterWrite(nextFilter, session, wr);
138 wr = queue.poll();
139 }
140 }
141
142 currentWriteRequest.getFuture().setWritten();
143 nextFilter.messageSent(session, currentWriteRequest);
144 } else {
145 nextFilter.filterWrite(session, new DefaultWriteRequest(
146 buffer));
147 }
148 }
149 }
150
151
152
153
154
155
156
157 public int getWriteBufferSize() {
158 return writeBufferSize;
159 }
160
161
162
163
164
165
166
167 public void setWriteBufferSize(int writeBufferSize) {
168 if (writeBufferSize < 1) {
169 throw new IllegalArgumentException(
170 "writeBufferSize must be at least 1");
171 }
172 this.writeBufferSize = writeBufferSize;
173 }
174
175 abstract protected IoBuffer getNextBuffer(T message) throws IOException;
176 }