1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import java.net.SocketAddress;
23 import java.util.Queue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.file.FileRegion;
27 import org.apache.mina.core.filterchain.IoFilter;
28 import org.apache.mina.core.filterchain.IoFilterAdapter;
29 import org.apache.mina.core.filterchain.IoFilterChain;
30 import org.apache.mina.core.future.DefaultWriteFuture;
31 import org.apache.mina.core.future.WriteFuture;
32 import org.apache.mina.core.session.AttributeKey;
33 import org.apache.mina.core.session.IoSession;
34 import org.apache.mina.core.write.DefaultWriteRequest;
35 import org.apache.mina.core.write.NothingWrittenException;
36 import org.apache.mina.core.write.WriteRequest;
37 import org.apache.mina.core.write.WriteRequestWrapper;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44
45
46
47
48
49 public class ProtocolCodecFilter extends IoFilterAdapter {
50
51 private static final Class<?>[] EMPTY_PARAMS = new Class[0];
52 private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
53
54 private final AttributeKey ENCODER = new AttributeKey(getClass(), "encoder");
55 private final AttributeKey DECODER = new AttributeKey(getClass(), "decoder");
56 private final AttributeKey DECODER_OUT = new AttributeKey(getClass(), "decoderOut");
57 private final ProtocolCodecFactory factory;
58
59 private final Logger logger = LoggerFactory.getLogger(getClass());
60
61 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
62 if (factory == null) {
63 throw new NullPointerException("factory");
64 }
65 this.factory = factory;
66 }
67
68 public ProtocolCodecFilter(final ProtocolEncoder encoder,
69 final ProtocolDecoder decoder) {
70 if (encoder == null) {
71 throw new NullPointerException("encoder");
72 }
73 if (decoder == null) {
74 throw new NullPointerException("decoder");
75 }
76
77 this.factory = new ProtocolCodecFactory() {
78 public ProtocolEncoder getEncoder(IoSession session) {
79 return encoder;
80 }
81
82 public ProtocolDecoder getDecoder(IoSession session) {
83 return decoder;
84 }
85 };
86 }
87
88 public ProtocolCodecFilter(
89 final Class<? extends ProtocolEncoder> encoderClass,
90 final Class<? extends ProtocolDecoder> decoderClass) {
91 if (encoderClass == null) {
92 throw new NullPointerException("encoderClass");
93 }
94 if (decoderClass == null) {
95 throw new NullPointerException("decoderClass");
96 }
97 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
98 throw new IllegalArgumentException("encoderClass: "
99 + encoderClass.getName());
100 }
101 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
102 throw new IllegalArgumentException("decoderClass: "
103 + decoderClass.getName());
104 }
105 try {
106 encoderClass.getConstructor(EMPTY_PARAMS);
107 } catch (NoSuchMethodException e) {
108 throw new IllegalArgumentException(
109 "encoderClass doesn't have a public default constructor.");
110 }
111 try {
112 decoderClass.getConstructor(EMPTY_PARAMS);
113 } catch (NoSuchMethodException e) {
114 throw new IllegalArgumentException(
115 "decoderClass doesn't have a public default constructor.");
116 }
117
118 this.factory = new ProtocolCodecFactory() {
119 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
120 return encoderClass.newInstance();
121 }
122
123 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
124 return decoderClass.newInstance();
125 }
126 };
127 }
128
129 public ProtocolEncoder getEncoder(IoSession session) {
130 return (ProtocolEncoder) session.getAttribute(ENCODER);
131 }
132
133 public ProtocolDecoder getDecoder(IoSession session) {
134 return (ProtocolDecoder) session.getAttribute(DECODER);
135 }
136
137 @Override
138 public void onPreAdd(IoFilterChain parent, String name,
139 NextFilter nextFilter) throws Exception {
140 if (parent.contains(this)) {
141 throw new IllegalArgumentException(
142 "You can't add the same filter instance more than once. Create another instance and add it.");
143 }
144 }
145
146 @Override
147 public void onPostRemove(IoFilterChain parent, String name,
148 NextFilter nextFilter) throws Exception {
149 disposeEncoder(parent.getSession());
150 disposeDecoder(parent.getSession());
151 disposeDecoderOut(parent.getSession());
152 }
153
154 @Override
155 public void messageReceived(NextFilter nextFilter, IoSession session,
156 Object message) throws Exception {
157 if (!(message instanceof IoBuffer)) {
158 nextFilter.messageReceived(session, message);
159 return;
160 }
161
162 IoBuffer in = (IoBuffer) message;
163 ProtocolDecoder decoder = getDecoder0(session);
164 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
165
166 while (in.hasRemaining()) {
167 int oldPos = in.position();
168 try {
169 synchronized (decoderOut) {
170 decoder.decode(session, in, decoderOut);
171 }
172
173 decoderOut.flush();
174 break;
175 } catch (Throwable t) {
176 ProtocolDecoderException pde;
177 if (t instanceof ProtocolDecoderException) {
178 pde = (ProtocolDecoderException) t;
179 } else {
180 pde = new ProtocolDecoderException(t);
181 }
182
183 if (pde.getHexdump() == null) {
184 int curPos = in.position();
185 in.position(oldPos);
186 pde.setHexdump(in.getHexDump());
187 in.position(curPos);
188 }
189
190
191 decoderOut.flush();
192 nextFilter.exceptionCaught(session, pde);
193
194
195
196
197
198 if (!(t instanceof RecoverableProtocolDecoderException) ||
199 in.position() == oldPos) {
200 break;
201 }
202 }
203 }
204 }
205
206 @Override
207 public void messageSent(NextFilter nextFilter, IoSession session,
208 WriteRequest writeRequest) throws Exception {
209 if (writeRequest instanceof EncodedWriteRequest) {
210 return;
211 }
212
213 if (!(writeRequest instanceof MessageWriteRequest)) {
214 nextFilter.messageSent(session, writeRequest);
215 return;
216 }
217
218 MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
219 nextFilter.messageSent(session, wrappedRequest.getParentRequest());
220 }
221
222 @Override
223 public void filterWrite(NextFilter nextFilter, IoSession session,
224 WriteRequest writeRequest) throws Exception {
225 Object message = writeRequest.getMessage();
226 if (message instanceof IoBuffer || message instanceof FileRegion) {
227 nextFilter.filterWrite(session, writeRequest);
228 return;
229 }
230
231 ProtocolEncoder encoder = getEncoder0(session);
232 ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
233 nextFilter, writeRequest);
234
235 try {
236 encoder.encode(session, message, encoderOut);
237 encoderOut.flushWithoutFuture();
238 nextFilter.filterWrite(session, new MessageWriteRequest(
239 writeRequest));
240 } catch (Throwable t) {
241 ProtocolEncoderException pee;
242 if (t instanceof ProtocolEncoderException) {
243 pee = (ProtocolEncoderException) t;
244 } else {
245 pee = new ProtocolEncoderException(t);
246 }
247 throw pee;
248 }
249 }
250
251 @Override
252 public void sessionClosed(NextFilter nextFilter, IoSession session)
253 throws Exception {
254
255 ProtocolDecoder decoder = getDecoder0(session);
256 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
257 try {
258 decoder.finishDecode(session, decoderOut);
259 } catch (Throwable t) {
260 ProtocolDecoderException pde;
261 if (t instanceof ProtocolDecoderException) {
262 pde = (ProtocolDecoderException) t;
263 } else {
264 pde = new ProtocolDecoderException(t);
265 }
266 throw pde;
267 } finally {
268
269 disposeEncoder(session);
270 disposeDecoder(session);
271 disposeDecoderOut(session);
272 decoderOut.flush();
273 }
274
275 nextFilter.sessionClosed(session);
276 }
277
278 private ProtocolEncoder getEncoder0(IoSession session) throws Exception {
279 ProtocolEncoder encoder = (ProtocolEncoder) session
280 .getAttribute(ENCODER);
281 if (encoder == null) {
282 encoder = factory.getEncoder(session);
283 session.setAttribute(ENCODER, encoder);
284 }
285 return encoder;
286 }
287
288 private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
289 NextFilter nextFilter, WriteRequest writeRequest) {
290 return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
291 }
292
293 private ProtocolDecoder getDecoder0(IoSession session) throws Exception {
294 ProtocolDecoder decoder = (ProtocolDecoder) session
295 .getAttribute(DECODER);
296 if (decoder == null) {
297 decoder = factory.getDecoder(session);
298 session.setAttribute(DECODER, decoder);
299 }
300 return decoder;
301 }
302
303 private ProtocolDecoderOutput getDecoderOut(IoSession session,
304 NextFilter nextFilter) {
305 ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
306 if (out == null) {
307 out = new ProtocolDecoderOutputImpl(session, nextFilter);
308 session.setAttribute(DECODER_OUT, out);
309 }
310 return out;
311 }
312
313 private void disposeEncoder(IoSession session) {
314 ProtocolEncoder encoder = (ProtocolEncoder) session
315 .removeAttribute(ENCODER);
316 if (encoder == null) {
317 return;
318 }
319
320 try {
321 encoder.dispose(session);
322 } catch (Throwable t) {
323 logger.warn(
324 "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
325 }
326 }
327
328 private void disposeDecoder(IoSession session) {
329 ProtocolDecoder decoder = (ProtocolDecoder) session
330 .removeAttribute(DECODER);
331 if (decoder == null) {
332 return;
333 }
334
335 try {
336 decoder.dispose(session);
337 } catch (Throwable t) {
338 logger.warn(
339 "Falied to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
340 }
341 }
342
343 private void disposeDecoderOut(IoSession session) {
344 session.removeAttribute(DECODER_OUT);
345 }
346
347 private static class EncodedWriteRequest extends DefaultWriteRequest {
348 private EncodedWriteRequest(Object encodedMessage,
349 WriteFuture future, SocketAddress destination) {
350 super(encodedMessage, future, destination);
351 }
352 }
353
354 private static class MessageWriteRequest extends WriteRequestWrapper {
355 private MessageWriteRequest(WriteRequest writeRequest) {
356 super(writeRequest);
357 }
358
359 @Override
360 public Object getMessage() {
361 return EMPTY_BUFFER;
362 }
363 }
364
365 private static class ProtocolDecoderOutputImpl extends
366 AbstractProtocolDecoderOutput {
367 private final IoSession session;
368 private final NextFilter nextFilter;
369
370 public ProtocolDecoderOutputImpl(
371 IoSession session, NextFilter nextFilter) {
372 this.session = session;
373 this.nextFilter = nextFilter;
374 }
375
376 public void flush() {
377 Queue<Object> messageQueue = getMessageQueue();
378 while (!messageQueue.isEmpty()) {
379 nextFilter.messageReceived(session, messageQueue.poll());
380 }
381 }
382 }
383
384 private static class ProtocolEncoderOutputImpl extends
385 AbstractProtocolEncoderOutput {
386 private final IoSession session;
387
388 private final NextFilter nextFilter;
389
390 private final WriteRequest writeRequest;
391
392 public ProtocolEncoderOutputImpl(IoSession session,
393 NextFilter nextFilter, WriteRequest writeRequest) {
394 this.session = session;
395 this.nextFilter = nextFilter;
396 this.writeRequest = writeRequest;
397 }
398
399 public WriteFuture flush() {
400 Queue<Object> bufferQueue = getMessageQueue();
401 WriteFuture future = null;
402 for (;;) {
403 Object encodedMessage = bufferQueue.poll();
404 if (encodedMessage == null) {
405 break;
406 }
407
408
409 if (!(encodedMessage instanceof IoBuffer) ||
410 ((IoBuffer) encodedMessage).hasRemaining()) {
411 future = new DefaultWriteFuture(session);
412 nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
413 future, writeRequest.getDestination()));
414 }
415 }
416
417 if (future == null) {
418 future = DefaultWriteFuture.newNotWrittenFuture(
419 session, new NothingWrittenException(writeRequest));
420 }
421
422 return future;
423 }
424
425 public void flushWithoutFuture() {
426 Queue<Object> bufferQueue = getMessageQueue();
427 for (;;) {
428 Object encodedMessage = bufferQueue.poll();
429 if (encodedMessage == null) {
430 break;
431 }
432
433
434 if (!(encodedMessage instanceof IoBuffer) ||
435 ((IoBuffer) encodedMessage).hasRemaining()) {
436 nextFilter.filterWrite(
437 session, new EncodedWriteRequest(
438 encodedMessage, null, writeRequest.getDestination()));
439 }
440 }
441 }
442 }
443 }