1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import java.net.SocketAddress;
23 import java.util.Queue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.file.FileRegion;
27 import org.apache.mina.core.filterchain.IoFilter;
28 import org.apache.mina.core.filterchain.IoFilterAdapter;
29 import org.apache.mina.core.filterchain.IoFilterChain;
30 import org.apache.mina.core.future.DefaultWriteFuture;
31 import org.apache.mina.core.future.WriteFuture;
32 import org.apache.mina.core.session.AttributeKey;
33 import org.apache.mina.core.session.IoSession;
34 import org.apache.mina.core.write.DefaultWriteRequest;
35 import org.apache.mina.core.write.NothingWrittenException;
36 import org.apache.mina.core.write.WriteRequest;
37 import org.apache.mina.core.write.WriteRequestWrapper;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44
45
46
47
48
49 public class ProtocolCodecFilter extends IoFilterAdapter {
50
51 private final static Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52
53 private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54 private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
55
56 private final AttributeKey ENCODER = new AttributeKey(getClass(), "encoder");
57 private final AttributeKey DECODER = new AttributeKey(getClass(), "decoder");
58 private final AttributeKey DECODER_OUT = new AttributeKey(getClass(), "decoderOut");
59 private final AttributeKey ENCODER_OUT = new AttributeKey(getClass(), "encoderOut");
60
61
62 private final ProtocolCodecFactory factory;
63
64
65
66
67
68
69
70
71 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
72 if (factory == null) {
73 throw new NullPointerException("factory");
74 }
75 this.factory = factory;
76 }
77
78
79
80
81
82
83
84
85
86
87 public ProtocolCodecFilter(final ProtocolEncoder encoder,
88 final ProtocolDecoder decoder) {
89 if (encoder == null) {
90 throw new NullPointerException("encoder");
91 }
92 if (decoder == null) {
93 throw new NullPointerException("decoder");
94 }
95
96
97 this.factory = new ProtocolCodecFactory() {
98 public ProtocolEncoder getEncoder(IoSession session) {
99 return encoder;
100 }
101
102 public ProtocolDecoder getDecoder(IoSession session) {
103 return decoder;
104 }
105 };
106 }
107
108
109
110
111
112
113
114
115
116
117 public ProtocolCodecFilter(
118 final Class<? extends ProtocolEncoder> encoderClass,
119 final Class<? extends ProtocolDecoder> decoderClass) {
120 if (encoderClass == null) {
121 throw new NullPointerException("encoderClass");
122 }
123 if (decoderClass == null) {
124 throw new NullPointerException("decoderClass");
125 }
126 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
127 throw new IllegalArgumentException("encoderClass: "
128 + encoderClass.getName());
129 }
130 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
131 throw new IllegalArgumentException("decoderClass: "
132 + decoderClass.getName());
133 }
134 try {
135 encoderClass.getConstructor(EMPTY_PARAMS);
136 } catch (NoSuchMethodException e) {
137 throw new IllegalArgumentException(
138 "encoderClass doesn't have a public default constructor.");
139 }
140 try {
141 decoderClass.getConstructor(EMPTY_PARAMS);
142 } catch (NoSuchMethodException e) {
143 throw new IllegalArgumentException(
144 "decoderClass doesn't have a public default constructor.");
145 }
146
147
148
149 this.factory = new ProtocolCodecFactory() {
150 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
151 return encoderClass.newInstance();
152 }
153
154 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
155 return decoderClass.newInstance();
156 }
157 };
158 }
159
160
161
162
163
164
165
166
167 public ProtocolEncoder getEncoder(IoSession session) {
168 return (ProtocolEncoder) session.getAttribute(ENCODER);
169 }
170
171 @Override
172 public void onPreAdd(IoFilterChain parent, String name,
173 NextFilter nextFilter) throws Exception {
174 if (parent.contains(this)) {
175 throw new IllegalArgumentException(
176 "You can't add the same filter instance more than once. Create another instance and add it.");
177 }
178
179
180 initCodec(parent.getSession());
181 }
182
183 @Override
184 public void onPostRemove(IoFilterChain parent, String name,
185 NextFilter nextFilter) throws Exception {
186
187 disposeCodec(parent.getSession());
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201
202 @Override
203 public void messageReceived(NextFilter nextFilter, IoSession session,
204 Object message) throws Exception {
205 LOGGER.debug( "Processing a MESSAGE_RECEIVED for session {}", session.getId() );
206
207 if (!(message instanceof IoBuffer)) {
208 nextFilter.messageReceived(session, message);
209 return;
210 }
211
212 IoBuffer in = (IoBuffer) message;
213 ProtocolDecoder decoder = getDecoder(session);
214 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
215
216
217
218
219
220 while (in.hasRemaining()) {
221 int oldPos = in.position();
222 try {
223 synchronized (decoderOut) {
224
225 decoder.decode(session, in, decoderOut);
226 }
227
228
229 decoderOut.flush(nextFilter, session);
230 } catch (Throwable t) {
231 ProtocolDecoderException pde;
232 if (t instanceof ProtocolDecoderException) {
233 pde = (ProtocolDecoderException) t;
234 } else {
235 pde = new ProtocolDecoderException(t);
236 }
237
238 if (pde.getHexdump() == null) {
239
240 int curPos = in.position();
241 in.position(oldPos);
242 pde.setHexdump(in.getHexDump());
243 in.position(curPos);
244 }
245
246
247 decoderOut.flush(nextFilter, session);
248 nextFilter.exceptionCaught(session, pde);
249
250
251
252
253
254 if (!(t instanceof RecoverableProtocolDecoderException) ||
255 (in.position() == oldPos)) {
256 break;
257 }
258 }
259 }
260 }
261
262 @Override
263 public void messageSent(NextFilter nextFilter, IoSession session,
264 WriteRequest writeRequest) throws Exception {
265 if (writeRequest instanceof EncodedWriteRequest) {
266 return;
267 }
268
269 if (!(writeRequest instanceof MessageWriteRequest)) {
270 nextFilter.messageSent(session, writeRequest);
271 return;
272 }
273
274 MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
275 nextFilter.messageSent(session, wrappedRequest.getParentRequest());
276 }
277
278 @Override
279 public void filterWrite(NextFilter nextFilter, IoSession session,
280 WriteRequest writeRequest) throws Exception {
281 Object message = writeRequest.getMessage();
282
283
284
285 if (message instanceof IoBuffer || message instanceof FileRegion) {
286 nextFilter.filterWrite(session, writeRequest);
287 return;
288 }
289
290
291 ProtocolEncoder encoder = getEncoder(session);
292
293 ProtocolEncoderOutput encoderOut = getEncoderOut(session,
294 nextFilter, writeRequest);
295
296 try {
297
298 encoder.encode(session, message, encoderOut);
299
300
301 ((ProtocolEncoderOutputImpl)encoderOut).flushWithoutFuture();
302
303
304 nextFilter.filterWrite(session, new MessageWriteRequest(
305 writeRequest));
306 } catch (Throwable t) {
307 ProtocolEncoderException pee;
308
309
310 if (t instanceof ProtocolEncoderException) {
311 pee = (ProtocolEncoderException) t;
312 } else {
313 pee = new ProtocolEncoderException(t);
314 }
315
316 throw pee;
317 }
318 }
319
320
321 @Override
322 public void sessionClosed(NextFilter nextFilter, IoSession session)
323 throws Exception {
324
325 ProtocolDecoder decoder = getDecoder(session);
326 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
327
328 try {
329 decoder.finishDecode(session, decoderOut);
330 } catch (Throwable t) {
331 ProtocolDecoderException pde;
332 if (t instanceof ProtocolDecoderException) {
333 pde = (ProtocolDecoderException) t;
334 } else {
335 pde = new ProtocolDecoderException(t);
336 }
337 throw pde;
338 } finally {
339
340 disposeCodec(session);
341 decoderOut.flush(nextFilter, session);
342 }
343
344
345 nextFilter.sessionClosed(session);
346 }
347
348 private static class EncodedWriteRequest extends DefaultWriteRequest {
349 public EncodedWriteRequest(Object encodedMessage,
350 WriteFuture future, SocketAddress destination) {
351 super(encodedMessage, future, destination);
352 }
353 }
354
355 private static class MessageWriteRequest extends WriteRequestWrapper {
356 public MessageWriteRequest(WriteRequest writeRequest) {
357 super(writeRequest);
358 }
359
360 @Override
361 public Object getMessage() {
362 return EMPTY_BUFFER;
363 }
364 }
365
366 private static class ProtocolDecoderOutputImpl extends
367 AbstractProtocolDecoderOutput {
368 public ProtocolDecoderOutputImpl() {
369
370 }
371
372 public void flush(NextFilter nextFilter, IoSession session) {
373 Queue<Object> messageQueue = getMessageQueue();
374 while (!messageQueue.isEmpty()) {
375 nextFilter.messageReceived(session, messageQueue.poll());
376 }
377 }
378 }
379
380 private static class ProtocolEncoderOutputImpl extends
381 AbstractProtocolEncoderOutput {
382 private final IoSession session;
383
384 private final NextFilter nextFilter;
385
386 private final WriteRequest writeRequest;
387
388 public ProtocolEncoderOutputImpl(IoSession session,
389 NextFilter nextFilter, WriteRequest writeRequest) {
390 this.session = session;
391 this.nextFilter = nextFilter;
392 this.writeRequest = writeRequest;
393 }
394
395 public WriteFuture flush() {
396 Queue<Object> bufferQueue = getMessageQueue();
397 WriteFuture future = null;
398 for (;;) {
399 Object encodedMessage = bufferQueue.poll();
400 if (encodedMessage == null) {
401 break;
402 }
403
404
405 if (!(encodedMessage instanceof IoBuffer) ||
406 ((IoBuffer) encodedMessage).hasRemaining()) {
407 future = new DefaultWriteFuture(session);
408 nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
409 future, writeRequest.getDestination()));
410 }
411 }
412
413 if (future == null) {
414 future = DefaultWriteFuture.newNotWrittenFuture(
415 session, new NothingWrittenException(writeRequest));
416 }
417
418 return future;
419 }
420
421 public void flushWithoutFuture() {
422 Queue<Object> bufferQueue = getMessageQueue();
423 for (;;) {
424 Object encodedMessage = bufferQueue.poll();
425 if (encodedMessage == null) {
426 break;
427 }
428
429
430 if (!(encodedMessage instanceof IoBuffer) ||
431 ((IoBuffer) encodedMessage).hasRemaining()) {
432 SocketAddress destination = writeRequest.getDestination();
433 WriteRequest writeRequest = new EncodedWriteRequest(
434 encodedMessage, null, destination);
435 nextFilter.filterWrite(session, writeRequest);
436 }
437 }
438 }
439 }
440
441
442
443
444
445
446 private void initCodec(IoSession session) throws Exception {
447
448 ProtocolDecoder decoder = factory.getDecoder(session);
449 session.setAttribute(DECODER, decoder);
450
451
452 ProtocolEncoder encoder = factory.getEncoder(session);
453 session.setAttribute(ENCODER, encoder);
454 }
455
456
457
458
459
460 private void disposeCodec(IoSession session) {
461
462
463 disposeEncoder(session);
464 disposeDecoder(session);
465
466
467 disposeDecoderOut(session);
468 }
469
470
471
472
473
474
475 private void disposeEncoder(IoSession session) {
476 ProtocolEncoder encoder = (ProtocolEncoder) session
477 .removeAttribute(ENCODER);
478 if (encoder == null) {
479 return;
480 }
481
482 try {
483 encoder.dispose(session);
484 } catch (Throwable t) {
485 LOGGER.warn(
486 "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
487 }
488 }
489
490
491
492
493
494
495
496 private ProtocolDecoder getDecoder(IoSession session) {
497 return (ProtocolDecoder) session.getAttribute(DECODER);
498 }
499
500
501
502
503
504
505 private void disposeDecoder(IoSession session) {
506 ProtocolDecoder decoder = (ProtocolDecoder) session
507 .removeAttribute(DECODER);
508 if (decoder == null) {
509 return;
510 }
511
512 try {
513 decoder.dispose(session);
514 } catch (Throwable t) {
515 LOGGER.warn(
516 "Falied to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
517 }
518 }
519
520
521
522
523
524 private ProtocolDecoderOutput getDecoderOut(IoSession session,
525 NextFilter nextFilter) {
526 ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
527
528 if (out == null) {
529
530 out = new ProtocolDecoderOutputImpl();
531 session.setAttribute(DECODER_OUT, out);
532 }
533
534 return out;
535 }
536
537 private ProtocolEncoderOutput getEncoderOut(IoSession session,
538 NextFilter nextFilter, WriteRequest writeRequest) {
539 ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
540
541 if (out == null) {
542
543 out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
544 session.setAttribute(ENCODER_OUT, out);
545 }
546
547 return out;
548 }
549
550
551
552
553 private void disposeDecoderOut(IoSession session) {
554 session.removeAttribute(DECODER_OUT);
555 }
556 }