1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.buffer;
21
22 import java.io.BufferedOutputStream;
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.filterchain.IoFilter;
27 import org.apache.mina.core.filterchain.IoFilterAdapter;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.core.write.DefaultWriteRequest;
30 import org.apache.mina.core.write.WriteRequest;
31 import org.apache.mina.filter.codec.ProtocolCodecFilter;
32 import org.apache.mina.util.LazyInitializedCacheMap;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public final class BufferedWriteFilter extends IoFilterAdapter {
50 private final Logger logger = LoggerFactory
51 .getLogger(BufferedWriteFilter.class);
52
53
54
55
56 public final static int DEFAULT_BUFFER_SIZE = 8192;
57
58
59
60
61 private int bufferSize = DEFAULT_BUFFER_SIZE;
62
63
64
65
66
67 private final LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap;
68
69
70
71
72
73 public BufferedWriteFilter() {
74 this(DEFAULT_BUFFER_SIZE, null);
75 }
76
77
78
79
80
81
82
83 public BufferedWriteFilter(int bufferSize) {
84 this(bufferSize, null);
85 }
86
87
88
89
90
91
92
93
94
95 public BufferedWriteFilter(int bufferSize,
96 LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap) {
97 super();
98 this.bufferSize = bufferSize;
99 if (buffersMap == null) {
100 this.buffersMap = new LazyInitializedCacheMap<IoSession, IoBuffer>();
101 } else {
102 this.buffersMap = buffersMap;
103 }
104 }
105
106
107
108
109 public int getBufferSize() {
110 return bufferSize;
111 }
112
113
114
115
116
117
118 public void setBufferSize(int bufferSize) {
119 this.bufferSize = bufferSize;
120 }
121
122
123
124
125
126
127
128 @Override
129 public void filterWrite(NextFilter nextFilter, IoSession session,
130 WriteRequest writeRequest) throws Exception {
131
132 Object data = writeRequest.getMessage();
133
134 if (data instanceof IoBuffer) {
135 write(session, (IoBuffer) data);
136 } else {
137 throw new IllegalArgumentException(
138 "This filter should only buffer IoBuffer objects");
139 }
140 }
141
142
143
144
145
146
147
148 private void write(IoSession session, IoBuffer data) {
149 IoBuffer dest = buffersMap.putIfAbsent(session,
150 new IoBufferLazyInitializer(bufferSize));
151
152 write(session, data, dest);
153 }
154
155
156
157
158
159
160
161
162
163
164
165 private void write(IoSession session, IoBuffer data, IoBuffer buf) {
166 try {
167 int len = data.remaining();
168 if (len >= buf.capacity()) {
169
170
171
172
173 NextFilter nextFilter = session.getFilterChain().getNextFilter(
174 this);
175 internalFlush(nextFilter, session, buf);
176 nextFilter.filterWrite(session, new DefaultWriteRequest(data));
177 return;
178 }
179 if (len > (buf.limit() - buf.position())) {
180 internalFlush(session.getFilterChain().getNextFilter(this),
181 session, buf);
182 }
183 synchronized (buf) {
184 buf.put(data);
185 }
186 } catch (Throwable e) {
187 session.getFilterChain().fireExceptionCaught(e);
188 }
189 }
190
191
192
193
194
195
196
197
198
199 private void internalFlush(NextFilter nextFilter, IoSession session,
200 IoBuffer buf) throws Exception {
201 IoBuffer tmp = null;
202 synchronized (buf) {
203 buf.flip();
204 tmp = buf.duplicate();
205 buf.clear();
206 }
207 logger.debug("Flushing buffer: {}", tmp);
208 nextFilter.filterWrite(session, new DefaultWriteRequest(tmp));
209 }
210
211
212
213
214
215
216 public void flush(IoSession session) {
217 try {
218 internalFlush(session.getFilterChain().getNextFilter(this),
219 session, buffersMap.get(session));
220 } catch (Throwable e) {
221 session.getFilterChain().fireExceptionCaught(e);
222 }
223 }
224
225
226
227
228
229
230
231 private void free(IoSession session) {
232 IoBuffer buf = buffersMap.remove(session);
233 if (buf != null) {
234 buf.free();
235 }
236 }
237
238
239
240
241 @Override
242 public void exceptionCaught(NextFilter nextFilter, IoSession session,
243 Throwable cause) throws Exception {
244 free(session);
245 }
246
247
248
249
250 @Override
251 public void sessionClosed(NextFilter nextFilter, IoSession session)
252 throws Exception {
253 free(session);
254 }
255 }