1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.buffer;
21
22 import java.io.BufferedOutputStream;
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.filterchain.IoFilter;
27 import org.apache.mina.core.filterchain.IoFilterAdapter;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.core.write.DefaultWriteRequest;
30 import org.apache.mina.core.write.WriteRequest;
31 import org.apache.mina.filter.codec.ProtocolCodecFilter;
32 import org.apache.mina.util.LazyInitializedCacheMap;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 public final class BufferedWriteFilter extends IoFilterAdapter {
50 private static final Logger LOGGER = LoggerFactory.getLogger(BufferedWriteFilter.class);
51
52
53
54
55 public final static int DEFAULT_BUFFER_SIZE = 8192;
56
57
58
59
60 private int bufferSize = DEFAULT_BUFFER_SIZE;
61
62
63
64
65
66 private final LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap;
67
68
69
70
71
72 public BufferedWriteFilter() {
73 this(DEFAULT_BUFFER_SIZE, null);
74 }
75
76
77
78
79
80
81
82 public BufferedWriteFilter(int bufferSize) {
83 this(bufferSize, null);
84 }
85
86
87
88
89
90
91
92
93
94 public BufferedWriteFilter(int bufferSize, LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap) {
95 super();
96 this.bufferSize = bufferSize;
97 if (buffersMap == null) {
98 this.buffersMap = new LazyInitializedCacheMap<IoSession, IoBuffer>();
99 } else {
100 this.buffersMap = buffersMap;
101 }
102 }
103
104
105
106
107 public int getBufferSize() {
108 return bufferSize;
109 }
110
111
112
113
114
115
116 public void setBufferSize(int bufferSize) {
117 this.bufferSize = bufferSize;
118 }
119
120
121
122
123
124
125
126 @Override
127 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
128
129 Object data = writeRequest.getMessage();
130
131 if (data instanceof IoBuffer) {
132 write(session, (IoBuffer) data);
133 } else {
134 throw new IllegalArgumentException("This filter should only buffer IoBuffer objects");
135 }
136 }
137
138
139
140
141
142
143
144 private void write(IoSession session, IoBuffer data) {
145 IoBuffer dest = buffersMap.putIfAbsent(session, new IoBufferLazyInitializer(bufferSize));
146
147 write(session, data, dest);
148 }
149
150
151
152
153
154
155
156
157
158
159
160 private void write(IoSession session, IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer data, IoBuffer buf) {
161 try {
162 int len = data.remaining();
163 if (len >= buf.capacity()) {
164
165
166
167
168 NextFilter nextFilter = session.getFilterChain().getNextFilter(this);
169 internalFlush(nextFilter, session, buf);
170 nextFilter.filterWrite(session, new DefaultWriteRequest(data));
171 return;
172 }
173 if (len > (buf.limit() - buf.position())) {
174 internalFlush(session.getFilterChain().getNextFilter(this), session, buf);
175 }
176 synchronized (buf) {
177 buf.put(data);
178 }
179 } catch (Exception e) {
180 session.getFilterChain().fireExceptionCaught(e);
181 }
182 }
183
184
185
186
187
188
189
190
191
192 private void internalFlush(NextFilter nextFilter, IoSession session, IoBuffer buf) throws Exception {
193 IoBuffer tmp = null;
194 synchronized (buf) {
195 buf.flip();
196 tmp = buf.duplicate();
197 buf.clear();
198 }
199
200 if (LOGGER.isDebugEnabled()) {
201 LOGGER.debug("Flushing buffer: {}", tmp);
202 }
203
204 nextFilter.filterWrite(session, new DefaultWriteRequest(tmp));
205 }
206
207
208
209
210
211
212 public void flush(IoSession session) {
213 try {
214 internalFlush(session.getFilterChain().getNextFilter(this), session, buffersMap.get(session));
215 } catch (Exception e) {
216 session.getFilterChain().fireExceptionCaught(e);
217 }
218 }
219
220
221
222
223
224
225
226 private void free(IoSession session) {
227 IoBuffer buf = buffersMap.remove(session);
228 if (buf != null) {
229 buf.free();
230 }
231 }
232
233
234
235
236 @Override
237 public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
238 free(session);
239 nextFilter.exceptionCaught(session, cause);
240 }
241
242
243
244
245 @Override
246 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
247 free(session);
248 nextFilter.sessionClosed(session);
249 }
250 }