1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.stream;
21
22 import java.io.IOException;
23 import java.util.Queue;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25
26 import org.apache.mina.core.buffer.IoBuffer;
27 import org.apache.mina.core.filterchain.IoFilterAdapter;
28 import org.apache.mina.core.filterchain.IoFilterChain;
29 import org.apache.mina.core.session.AttributeKey;
30 import org.apache.mina.core.session.IoSession;
31 import org.apache.mina.core.write.DefaultWriteRequest;
32 import org.apache.mina.core.write.WriteRequest;
33
34
35
36
37
38
39
40
41
42 public abstract class AbstractStreamWriteFilter<T> extends IoFilterAdapter {
43
44
45
46 public static final int DEFAULT_STREAM_BUFFER_SIZE = 4096;
47
48
49
50
51 protected static final AttributeKeyey.html#AttributeKey">AttributeKey CURRENT_STREAM = new AttributeKey(AbstractStreamWriteFilter.class, "stream");
52
53 protected static final AttributeKeyml#AttributeKey">AttributeKey WRITE_REQUEST_QUEUE = new AttributeKey(AbstractStreamWriteFilter.class, "queue");
54
55 protected static final AttributeKey#AttributeKey">AttributeKey CURRENT_WRITE_REQUEST = new AttributeKey(AbstractStreamWriteFilter.class, "writeRequest");
56
57 private int writeBufferSize = DEFAULT_STREAM_BUFFER_SIZE;
58
59
60
61
62 @Override
63 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
64 Class<? extends IoFilterAdapter> clazz = getClass();
65
66 if (parent.contains(clazz)) {
67 throw new IllegalStateException("Only one " + clazz.getName() + " is permitted.");
68 }
69 }
70
71
72
73
74 @Override
75 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
76
77 if (session.getAttribute(CURRENT_STREAM) != null) {
78 Queue<WriteRequest> queue = getWriteRequestQueue(session);
79
80 if (queue == null) {
81 queue = new ConcurrentLinkedQueue<>();
82 session.setAttribute(WRITE_REQUEST_QUEUE, queue);
83 }
84
85 queue.add(writeRequest);
86
87 return;
88 }
89
90 Object message = writeRequest.getMessage();
91
92 if (getMessageClass().isInstance(message)) {
93
94 T stream = getMessageClass().cast(message);
95
96 IoBuffer buffer = getNextBuffer(stream);
97
98 if (buffer == null) {
99
100 writeRequest.getFuture().setWritten();
101 nextFilter.messageSent(session, writeRequest);
102 } else {
103 session.setAttribute(CURRENT_STREAM, message);
104 session.setAttribute(CURRENT_WRITE_REQUEST, writeRequest);
105
106 nextFilter.filterWrite(session, new DefaultWriteRequest(buffer));
107 }
108 } else {
109 nextFilter.filterWrite(session, writeRequest);
110 }
111 }
112
113 protected abstract Class<T> getMessageClass();
114
115 @SuppressWarnings("unchecked")
116 private Queue<WriteRequest> getWriteRequestQueue(IoSession session) {
117 return (Queue<WriteRequest>) session.getAttribute(WRITE_REQUEST_QUEUE);
118 }
119
120 @SuppressWarnings("unchecked")
121 private Queue<WriteRequest> removeWriteRequestQueue(IoSession session) {
122 return (Queue<WriteRequest>) session.removeAttribute(WRITE_REQUEST_QUEUE);
123 }
124
125
126
127
128 @Override
129 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
130 T stream = getMessageClass().cast(session.getAttribute(CURRENT_STREAM));
131
132 if (stream == null) {
133 nextFilter.messageSent(session, writeRequest);
134 } else {
135 IoBuffer buffer = getNextBuffer(stream);
136
137 if (buffer == null) {
138
139 session.removeAttribute(CURRENT_STREAM);
140 WriteRequestapache/mina/core/write/WriteRequest.html#WriteRequest">WriteRequest currentWriteRequest = (WriteRequest) session.removeAttribute(CURRENT_WRITE_REQUEST);
141
142
143 Queue<WriteRequest> queue = removeWriteRequestQueue(session);
144
145 if (queue != null) {
146 WriteRequest wr = queue.poll();
147
148 while (wr != null) {
149 filterWrite(nextFilter, session, wr);
150 wr = queue.poll();
151 }
152 }
153
154 currentWriteRequest.getFuture().setWritten();
155 nextFilter.messageSent(session, currentWriteRequest);
156 } else {
157 nextFilter.filterWrite(session, new DefaultWriteRequest(buffer));
158 }
159 }
160 }
161
162
163
164
165
166 public int getWriteBufferSize() {
167 return writeBufferSize;
168 }
169
170
171
172
173
174
175
176
177 public void setWriteBufferSize(int writeBufferSize) {
178 if (writeBufferSize < 1) {
179 throw new IllegalArgumentException("writeBufferSize must be at least 1");
180 }
181
182 this.writeBufferSize = writeBufferSize;
183 }
184
185 protected abstract IoBuffer getNextBuffer(T message) throws IOException;
186 }