1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.stream;
21
22 import java.io.IOException;
23 import java.util.Queue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.filterchain.IoFilterAdapter;
27 import org.apache.mina.core.filterchain.IoFilterChain;
28 import org.apache.mina.core.session.AttributeKey;
29 import org.apache.mina.core.session.IoSession;
30 import org.apache.mina.core.write.DefaultWriteRequest;
31 import org.apache.mina.core.write.WriteRequest;
32 import org.apache.mina.util.CircularQueue;
33
34
35
36
37
38
39
40 public abstract class AbstractStreamWriteFilter<T> extends IoFilterAdapter {
41
42
43
44 public static final int DEFAULT_STREAM_BUFFER_SIZE = 4096;
45
46
47
48
49 protected final AttributeKey CURRENT_STREAM = new AttributeKey(getClass(), "stream");
50
51 protected final AttributeKey WRITE_REQUEST_QUEUE = new AttributeKey(getClass(), "queue");
52 protected final AttributeKey CURRENT_WRITE_REQUEST = new AttributeKey(getClass(), "writeRequest");
53
54 private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
55
56
57 @Override
58 public void onPreAdd(IoFilterChain parent, String name,
59 NextFilter nextFilter) throws Exception {
60 Class<? extends IoFilterAdapter> clazz = getClass();
61 if (parent.contains(clazz)) {
62 throw new IllegalStateException(
63 "Only one " + clazz.getName() + " is permitted.");
64 }
65 }
66
67 @Override
68 public void filterWrite(NextFilter nextFilter, IoSession session,
69 WriteRequest writeRequest) throws Exception {
70
71 if (session.getAttribute(CURRENT_STREAM) != null) {
72 Queue<WriteRequest> queue = getWriteRequestQueue(session);
73 if (queue == null) {
74 queue = new CircularQueue<WriteRequest>();
75 session.setAttribute(WRITE_REQUEST_QUEUE, queue);
76 }
77 queue.add(writeRequest);
78 return;
79 }
80
81 Object message = writeRequest.getMessage();
82
83 if (getMessageClass().isInstance(message)) {
84
85 T stream = getMessageClass().cast(message);
86
87 IoBuffer buffer = getNextBuffer(stream);
88 if (buffer == null) {
89
90 writeRequest.getFuture().setWritten();
91 nextFilter.messageSent(session, writeRequest);
92 } else {
93 session.setAttribute(CURRENT_STREAM, message);
94 session.setAttribute(CURRENT_WRITE_REQUEST, writeRequest);
95
96 nextFilter.filterWrite(session, new DefaultWriteRequest(
97 buffer));
98 }
99
100 } else {
101 nextFilter.filterWrite(session, writeRequest);
102 }
103 }
104
105 abstract protected Class<T> getMessageClass();
106
107 @SuppressWarnings("unchecked")
108 private Queue<WriteRequest> getWriteRequestQueue(IoSession session) {
109 return (Queue<WriteRequest>) session.getAttribute(WRITE_REQUEST_QUEUE);
110 }
111
112 @SuppressWarnings("unchecked")
113 private Queue<WriteRequest> removeWriteRequestQueue(IoSession session) {
114 return (Queue<WriteRequest>) session.removeAttribute(WRITE_REQUEST_QUEUE);
115 }
116
117 @Override
118 public void messageSent(NextFilter nextFilter, IoSession session,
119 WriteRequest writeRequest) throws Exception {
120 T stream = getMessageClass().cast(session.getAttribute(CURRENT_STREAM));
121
122 if (stream == null) {
123 nextFilter.messageSent(session, writeRequest);
124 } else {
125 IoBuffer buffer = getNextBuffer(stream);
126
127 if (buffer == null) {
128
129 session.removeAttribute(CURRENT_STREAM);
130 WriteRequest currentWriteRequest = (WriteRequest) session
131 .removeAttribute(CURRENT_WRITE_REQUEST);
132
133
134 Queue<WriteRequest> queue = removeWriteRequestQueue(session);
135 if (queue != null) {
136 WriteRequest wr = queue.poll();
137 while (wr != null) {
138 filterWrite(nextFilter, session, wr);
139 wr = queue.poll();
140 }
141 }
142
143 currentWriteRequest.getFuture().setWritten();
144 nextFilter.messageSent(session, currentWriteRequest);
145 } else {
146 nextFilter.filterWrite(session, new DefaultWriteRequest(
147 buffer));
148 }
149 }
150 }
151
152
153
154
155
156
157
158 public int getWriteBufferSize() {
159 return writeBufferSize;
160 }
161
162
163
164
165
166
167
168 public void setWriteBufferSize(int writeBufferSize) {
169 if (writeBufferSize < 1) {
170 throw new IllegalArgumentException(
171 "writeBufferSize must be at least 1");
172 }
173 this.writeBufferSize = writeBufferSize;
174 }
175
176 abstract protected IoBuffer getNextBuffer(T message) throws IOException;
177 }