1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import java.net.SocketAddress;
23 import java.util.Queue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.file.FileRegion;
27 import org.apache.mina.core.filterchain.IoFilter;
28 import org.apache.mina.core.filterchain.IoFilterAdapter;
29 import org.apache.mina.core.filterchain.IoFilterChain;
30 import org.apache.mina.core.future.DefaultWriteFuture;
31 import org.apache.mina.core.future.WriteFuture;
32 import org.apache.mina.core.session.AbstractIoSession;
33 import org.apache.mina.core.session.AttributeKey;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.DefaultWriteRequest;
36 import org.apache.mina.core.write.NothingWrittenException;
37 import org.apache.mina.core.write.WriteRequest;
38 import org.apache.mina.core.write.WriteRequestWrapper;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46
47
48
49
50 public class ProtocolCodecFilter extends IoFilterAdapter {
51
52 private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
53
54 private static final Class<?>[] EMPTY_PARAMS = new Class[0];
55
56 private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
57
58 private static final AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
59
60 private static final AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
61
62 private static final AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
63
64 private static final AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
65
66
67 private final ProtocolCodecFactory factory;
68
69
70
71
72
73
74
75 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
76 if (factory == null) {
77 throw new IllegalArgumentException("factory");
78 }
79
80 this.factory = factory;
81 }
82
83
84
85
86
87
88
89
90
91 public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
92 if (encoder == null) {
93 throw new IllegalArgumentException("encoder");
94 }
95 if (decoder == null) {
96 throw new IllegalArgumentException("decoder");
97 }
98
99
100 this.factory = new ProtocolCodecFactory() {
101
102
103
104 @Override
105 public ProtocolEncoder getEncoder(IoSession session) {
106 return encoder;
107 }
108
109
110
111
112 @Override
113 public ProtocolDecoder getDecoder(IoSession session) {
114 return decoder;
115 }
116 };
117 }
118
119
120
121
122
123
124
125
126
127
128 public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
129 final Class<? extends ProtocolDecoder> decoderClass) {
130 if (encoderClass == null) {
131 throw new IllegalArgumentException("encoderClass");
132 }
133 if (decoderClass == null) {
134 throw new IllegalArgumentException("decoderClass");
135 }
136 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
137 throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
138 }
139 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
140 throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
141 }
142 try {
143 encoderClass.getConstructor(EMPTY_PARAMS);
144 } catch (NoSuchMethodException e) {
145 throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
146 }
147 try {
148 decoderClass.getConstructor(EMPTY_PARAMS);
149 } catch (NoSuchMethodException e) {
150 throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
151 }
152
153 final ProtocolEncoder encoder;
154
155 try {
156 encoder = encoderClass.newInstance();
157 } catch (Exception e) {
158 throw new IllegalArgumentException("encoderClass cannot be initialized");
159 }
160
161 final ProtocolDecoder decoder;
162
163 try {
164 decoder = decoderClass.newInstance();
165 } catch (Exception e) {
166 throw new IllegalArgumentException("decoderClass cannot be initialized");
167 }
168
169
170 this.factory = new ProtocolCodecFactory() {
171
172
173
174 @Override
175 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
176 return encoder;
177 }
178
179
180
181
182 @Override
183 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
184 return decoder;
185 }
186 };
187 }
188
189
190
191
192
193
194
195 public ProtocolEncoder getEncoder(IoSession session) {
196 return (ProtocolEncoder) session.getAttribute(ENCODER);
197 }
198
199
200
201
202 @Override
203 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
204 if (parent.contains(this)) {
205 throw new IllegalArgumentException(
206 "You can't add the same filter instance more than once. Create another instance and add it.");
207 }
208 }
209
210
211
212
213 @Override
214 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
215
216 disposeCodec(parent.getSession());
217 }
218
219
220
221
222
223
224
225
226
227
228
229
230
231 @Override
232 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
233 LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
234
235 if (!(message instanceof IoBuffer)) {
236 nextFilter.messageReceived(session, message);
237 return;
238 }
239
240 IoBuffer in = (IoBuffer) message;
241 ProtocolDecoder decoder = factory.getDecoder(session);
242 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
243
244
245
246
247
248 while (in.hasRemaining()) {
249 int oldPos = in.position();
250 try {
251 synchronized (session) {
252
253 decoder.decode(session, in, decoderOut);
254 }
255
256 decoderOut.flush(nextFilter, session);
257 } catch (Exception e) {
258 ProtocolDecoderException pde;
259 if (e instanceof ProtocolDecoderException) {
260 pde = (ProtocolDecoderException) e;
261 } else {
262 pde = new ProtocolDecoderException(e);
263 }
264 if (pde.getHexdump() == null) {
265
266 int curPos = in.position();
267 in.position(oldPos);
268 pde.setHexdump(in.getHexDump());
269 in.position(curPos);
270 }
271
272 decoderOut.flush(nextFilter, session);
273 nextFilter.exceptionCaught(session, pde);
274
275
276
277
278 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
279 break;
280 }
281 }
282 }
283 }
284
285
286
287
288 @Override
289 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
290 if (writeRequest instanceof EncodedWriteRequest) {
291 return;
292 }
293
294 if (writeRequest instanceof MessageWriteRequest) {
295 MessageWriteRequest wrappedRequest = (MessageWriteRequest) writeRequest;
296 nextFilter.messageSent(session, wrappedRequest.getParentRequest());
297 } else {
298 nextFilter.messageSent(session, writeRequest);
299 }
300 }
301
302
303
304
305 @Override
306 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
307 Object message = writeRequest.getMessage();
308
309
310
311 if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
312 nextFilter.filterWrite(session, writeRequest);
313 return;
314 }
315
316
317 ProtocolEncoder encoder = factory.getEncoder(session);
318
319 ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
320
321 if (encoder == null) {
322 throw new ProtocolEncoderException("The encoder is null for the session " + session);
323 }
324
325 try {
326
327 encoder.encode(session, message, encoderOut);
328
329
330 Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
331
332
333 while (!bufferQueue.isEmpty()) {
334 Object encodedMessage = bufferQueue.poll();
335
336 if (encodedMessage == null) {
337 break;
338 }
339
340
341 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
342 SocketAddress destination = writeRequest.getDestination();
343 WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
344
345 nextFilter.filterWrite(session, encodedWriteRequest);
346 }
347 }
348
349
350 nextFilter.filterWrite(session, new MessageWriteRequest(writeRequest));
351 } catch (Exception e) {
352 ProtocolEncoderException pee;
353
354
355 if (e instanceof ProtocolEncoderException) {
356 pee = (ProtocolEncoderException) e;
357 } else {
358 pee = new ProtocolEncoderException(e);
359 }
360
361 throw pee;
362 }
363 }
364
365
366
367
368 @Override
369 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
370
371 ProtocolDecoder decoder = factory.getDecoder(session);
372 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
373
374 try {
375 decoder.finishDecode(session, decoderOut);
376 } catch (Exception e) {
377 ProtocolDecoderException pde;
378 if (e instanceof ProtocolDecoderException) {
379 pde = (ProtocolDecoderException) e;
380 } else {
381 pde = new ProtocolDecoderException(e);
382 }
383 throw pde;
384 } finally {
385
386 disposeCodec(session);
387 decoderOut.flush(nextFilter, session);
388 }
389
390
391 nextFilter.sessionClosed(session);
392 }
393
394 private static class EncodedWriteRequest extends DefaultWriteRequest {
395 public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
396 super(encodedMessage, future, destination);
397 }
398
399
400
401
402 @Override
403 public boolean isEncoded() {
404 return true;
405 }
406 }
407
408 private static class MessageWriteRequest extends WriteRequestWrapper {
409 public MessageWriteRequest(WriteRequest writeRequest) {
410 super(writeRequest);
411 }
412
413 @Override
414 public Object getMessage() {
415 return EMPTY_BUFFER;
416 }
417
418 @Override
419 public String toString() {
420 return "MessageWriteRequest, parent : " + super.toString();
421 }
422 }
423
424 private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
425 public ProtocolDecoderOutputImpl() {
426
427 }
428
429
430
431
432 @Override
433 public void flush(NextFilter nextFilter, IoSession session) {
434 Queue<Object> messageQueue = getMessageQueue();
435
436 while (!messageQueue.isEmpty()) {
437 nextFilter.messageReceived(session, messageQueue.poll());
438 }
439 }
440 }
441
442 private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
443 private final IoSession session;
444
445 private final NextFilter nextFilter;
446
447
448 private final SocketAddress destination;
449
450 public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
451 this.session = session;
452 this.nextFilter = nextFilter;
453
454
455 destination = writeRequest.getDestination();
456 }
457
458
459
460
461 @Override
462 public WriteFuture flush() {
463 Queue<Object> bufferQueue = getMessageQueue();
464 WriteFuture future = null;
465
466 while (!bufferQueue.isEmpty()) {
467 Object encodedMessage = bufferQueue.poll();
468
469 if (encodedMessage == null) {
470 break;
471 }
472
473
474 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
475 future = new DefaultWriteFuture(session);
476 nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
477 }
478 }
479
480 if (future == null) {
481
482 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
483 }
484
485 return future;
486 }
487 }
488
489
490
491
492
493
494 private void disposeCodec(IoSession session) {
495
496
497 disposeEncoder(session);
498 disposeDecoder(session);
499
500
501 disposeDecoderOut(session);
502 }
503
504
505
506
507
508
509 private void disposeEncoder(IoSession session) {
510 ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
511 if (encoder == null) {
512 return;
513 }
514
515 try {
516 encoder.dispose(session);
517 } catch (Exception e) {
518 LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
519 }
520 }
521
522
523
524
525
526
527 private void disposeDecoder(IoSession session) {
528 ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
529 if (decoder == null) {
530 return;
531 }
532
533 try {
534 decoder.dispose(session);
535 } catch (Exception e) {
536 LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
537 }
538 }
539
540
541
542
543
544 private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
545 ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
546
547 if (out == null) {
548
549 out = new ProtocolDecoderOutputImpl();
550 session.setAttribute(DECODER_OUT, out);
551 }
552
553 return out;
554 }
555
556 private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
557 ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
558
559 if (out == null) {
560
561 out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
562 session.setAttribute(ENCODER_OUT, out);
563 }
564
565 return out;
566 }
567
568
569
570
571 private void disposeDecoderOut(IoSession session) {
572 session.removeAttribute(DECODER_OUT);
573 }
574 }