1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.codec;
21
22 import java.net.SocketAddress;
23 import java.util.Queue;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.file.FileRegion;
27 import org.apache.mina.core.filterchain.IoFilter;
28 import org.apache.mina.core.filterchain.IoFilterAdapter;
29 import org.apache.mina.core.filterchain.IoFilterChain;
30 import org.apache.mina.core.future.DefaultWriteFuture;
31 import org.apache.mina.core.future.WriteFuture;
32 import org.apache.mina.core.session.AbstractIoSession;
33 import org.apache.mina.core.session.AttributeKey;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.DefaultWriteRequest;
36 import org.apache.mina.core.write.NothingWrittenException;
37 import org.apache.mina.core.write.WriteRequest;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44
45
46
47
48
49 public class ProtocolCodecFilter extends IoFilterAdapter {
50
51 private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolCodecFilter.class);
52
53 private static final Class<?>[] EMPTY_PARAMS = new Class[0];
54
55 private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
56
57 private static final AttributeKeyributeKey.html#AttributeKey">AttributeKey ENCODER = new AttributeKey(ProtocolCodecFilter.class, "encoder");
58
59 private static final AttributeKeyributeKey.html#AttributeKey">AttributeKey DECODER = new AttributeKey(ProtocolCodecFilter.class, "decoder");
60
61 private static final AttributeKeyteKey.html#AttributeKey">AttributeKey DECODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "decoderOut");
62
63 private static final AttributeKeyteKey.html#AttributeKey">AttributeKey ENCODER_OUT = new AttributeKey(ProtocolCodecFilter.class, "encoderOut");
64
65
66 private final ProtocolCodecFactory factory;
67
68
69
70
71
72
73
74 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
75 if (factory == null) {
76 throw new IllegalArgumentException("factory");
77 }
78
79 this.factory = factory;
80 }
81
82
83
84
85
86
87
88
89
90 public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) {
91 if (encoder == null) {
92 throw new IllegalArgumentException("encoder");
93 }
94 if (decoder == null) {
95 throw new IllegalArgumentException("decoder");
96 }
97
98
99 this.factory = new ProtocolCodecFactory() {
100
101
102
103 @Override
104 public ProtocolEncoder getEncoder(IoSession session) {
105 return encoder;
106 }
107
108
109
110
111 @Override
112 public ProtocolDecoder getDecoder(IoSession session) {
113 return decoder;
114 }
115 };
116 }
117
118
119
120
121
122
123
124
125
126
127 public ProtocolCodecFilter(final Class<? extends ProtocolEncoder> encoderClass,
128 final Class<? extends ProtocolDecoder> decoderClass) {
129 if (encoderClass == null) {
130 throw new IllegalArgumentException("encoderClass");
131 }
132 if (decoderClass == null) {
133 throw new IllegalArgumentException("decoderClass");
134 }
135 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
136 throw new IllegalArgumentException("encoderClass: " + encoderClass.getName());
137 }
138 if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
139 throw new IllegalArgumentException("decoderClass: " + decoderClass.getName());
140 }
141 try {
142 encoderClass.getConstructor(EMPTY_PARAMS);
143 } catch (NoSuchMethodException e) {
144 throw new IllegalArgumentException("encoderClass doesn't have a public default constructor.");
145 }
146 try {
147 decoderClass.getConstructor(EMPTY_PARAMS);
148 } catch (NoSuchMethodException e) {
149 throw new IllegalArgumentException("decoderClass doesn't have a public default constructor.");
150 }
151
152 final ProtocolEncoder encoder;
153
154 try {
155 encoder = encoderClass.newInstance();
156 } catch (Exception e) {
157 throw new IllegalArgumentException("encoderClass cannot be initialized");
158 }
159
160 final ProtocolDecoder decoder;
161
162 try {
163 decoder = decoderClass.newInstance();
164 } catch (Exception e) {
165 throw new IllegalArgumentException("decoderClass cannot be initialized");
166 }
167
168
169 this.factory = new ProtocolCodecFactory() {
170
171
172
173 @Override
174 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
175 return encoder;
176 }
177
178
179
180
181 @Override
182 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
183 return decoder;
184 }
185 };
186 }
187
188
189
190
191
192
193
194 public ProtocolEncoder getEncoder(IoSession session) {
195 return (ProtocolEncoder) session.getAttribute(ENCODER);
196 }
197
198
199
200
201 @Override
202 public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
203 if (parent.contains(this)) {
204 throw new IllegalArgumentException(
205 "You can't add the same filter instance more than once. Create another instance and add it.");
206 }
207 }
208
209
210
211
212 @Override
213 public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
214
215 disposeCodec(parent.getSession());
216 }
217
218
219
220
221
222
223
224
225
226
227
228
229
230 @Override
231 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
232 if (LOGGER.isDebugEnabled()) {
233 LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session.getId());
234 }
235
236 if (!(message instanceof IoBuffer)) {
237 nextFilter.messageReceived(session, message);
238 return;
239 }
240
241 IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer in = (IoBuffer) message;
242 ProtocolDecoder decoder = factory.getDecoder(session);
243 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
244
245
246
247
248
249 while (in.hasRemaining()) {
250 int oldPos = in.position();
251 try {
252 synchronized (session) {
253
254 decoder.decode(session, in, decoderOut);
255 }
256
257 decoderOut.flush(nextFilter, session);
258 } catch (Exception e) {
259 ProtocolDecoderException pde;
260 if (e instanceof ProtocolDecoderException) {
261 pde = (ProtocolDecoderException) e;
262 } else {
263 pde = new ProtocolDecoderException(e);
264 }
265 if (pde.getHexdump() == null) {
266
267 int curPos = in.position();
268 in.position(oldPos);
269 pde.setHexdump(in.getHexDump());
270 in.position(curPos);
271 }
272
273 decoderOut.flush(nextFilter, session);
274 nextFilter.exceptionCaught(session, pde);
275
276
277
278
279 if (!(e instanceof RecoverableProtocolDecoderException) || (in.position() == oldPos)) {
280 break;
281 }
282 }
283 }
284 }
285
286
287
288
289 @Override
290 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
291 if (writeRequest instanceof EncodedWriteRequest) {
292 return;
293 }
294
295 nextFilter.messageSent(session, writeRequest);
296 }
297
298
299
300
301 @Override
302 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
303 Object message = writeRequest.getMessage();
304
305
306
307 if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
308 nextFilter.filterWrite(session, writeRequest);
309 return;
310 }
311
312
313 ProtocolEncoder encoder = factory.getEncoder(session);
314
315 ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest);
316
317 if (encoder == null) {
318 throw new ProtocolEncoderException("The encoder is null for the session " + session);
319 }
320
321 try {
322
323 encoder.encode(session, message, encoderOut);
324
325
326 Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) encoderOut).getMessageQueue();
327
328
329 while (!bufferQueue.isEmpty()) {
330 Object encodedMessage = bufferQueue.poll();
331
332 if (encodedMessage == null) {
333 break;
334 }
335
336
337 if (!(encodedMessage instanceof IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
338 if (bufferQueue.isEmpty()) {
339 writeRequest.setMessage(encodedMessage);
340 nextFilter.filterWrite(session, writeRequest);
341 } else {
342 SocketAddress destination = writeRequest.getDestination();
343 WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination);
344 nextFilter.filterWrite(session, encodedWriteRequest);
345 }
346 }
347 }
348 } catch (Exception e) {
349 ProtocolEncoderException pee;
350
351
352 if (e instanceof ProtocolEncoderException) {
353 pee = (ProtocolEncoderException) e;
354 } else {
355 pee = new ProtocolEncoderException(e);
356 }
357
358 throw pee;
359 }
360 }
361
362
363
364
365 @Override
366 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
367
368 ProtocolDecoder decoder = factory.getDecoder(session);
369 ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
370
371 try {
372 decoder.finishDecode(session, decoderOut);
373 } catch (Exception e) {
374 ProtocolDecoderException pde;
375 if (e instanceof ProtocolDecoderException) {
376 pde = (ProtocolDecoderException) e;
377 } else {
378 pde = new ProtocolDecoderException(e);
379 }
380 throw pde;
381 } finally {
382
383 disposeCodec(session);
384 decoderOut.flush(nextFilter, session);
385 }
386
387
388 nextFilter.sessionClosed(session);
389 }
390
391 private static class EncodedWriteRequest extends DefaultWriteRequest {
392 public EncodedWriteRequest(Object encodedMessage, WriteFuture future, SocketAddress destination) {
393 super(encodedMessage, future, destination);
394 }
395
396
397
398
399 @Override
400 public boolean isEncoded() {
401 return true;
402 }
403 }
404
405 private static class ProtocolDecoderOutputImpl extends AbstractProtocolDecoderOutput {
406 public ProtocolDecoderOutputImpl() {
407
408 }
409
410
411
412
413 @Override
414 public void flush(NextFilter nextFilter, IoSession session) {
415 Queue<Object> messageQueue = getMessageQueue();
416
417 while (!messageQueue.isEmpty()) {
418 nextFilter.messageReceived(session, messageQueue.poll());
419 }
420 }
421 }
422
423 private static class ProtocolEncoderOutputImpl extends AbstractProtocolEncoderOutput {
424 private final IoSession session;
425
426 private final NextFilter nextFilter;
427
428
429 private final SocketAddress destination;
430
431 public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
432 this.session = session;
433 this.nextFilter = nextFilter;
434
435
436 destination = writeRequest.getDestination();
437 }
438
439
440
441
442 @Override
443 public WriteFuture flush() {
444 Queue<Object> bufferQueue = getMessageQueue();
445 WriteFuture future = null;
446
447 while (!bufferQueue.isEmpty()) {
448 Object encodedMessage = bufferQueue.poll();
449
450 if (encodedMessage == null) {
451 break;
452 }
453
454
455 if (!(encodedMessage instanceof IoBuffer="../../../../../org/apache/mina/core/buffer/IoBuffer.html#IoBuffer">IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) {
456 future = new DefaultWriteFuture(session);
457 nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, destination));
458 }
459 }
460
461 if (future == null) {
462
463 future = DefaultWriteFuture.newNotWrittenFuture(session, new NothingWrittenException(AbstractIoSession.MESSAGE_SENT_REQUEST));
464 }
465
466 return future;
467 }
468 }
469
470
471
472
473
474
475 private void disposeCodec(IoSession session) {
476
477
478 disposeEncoder(session);
479 disposeDecoder(session);
480
481
482 disposeDecoderOut(session);
483 }
484
485
486
487
488
489
490 private void disposeEncoder(IoSession session) {
491 ProtocolEncoder./../org/apache/mina/filter/codec/ProtocolEncoder.html#ProtocolEncoder">ProtocolEncoder encoder = (ProtocolEncoder) session.removeAttribute(ENCODER);
492 if (encoder == null) {
493 return;
494 }
495
496 try {
497 encoder.dispose(session);
498 } catch (Exception e) {
499 LOGGER.warn("Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')');
500 }
501 }
502
503
504
505
506
507
508 private void disposeDecoder(IoSession session) {
509 ProtocolDecoder./../org/apache/mina/filter/codec/ProtocolDecoder.html#ProtocolDecoder">ProtocolDecoder decoder = (ProtocolDecoder) session.removeAttribute(DECODER);
510 if (decoder == null) {
511 return;
512 }
513
514 try {
515 decoder.dispose(session);
516 } catch (Exception e) {
517 LOGGER.warn("Failed to dispose: " + decoder.getClass().getName() + " (" + decoder + ')');
518 }
519 }
520
521
522
523
524
525 private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) {
526 ProtocolDecoderOutput../org/apache/mina/filter/codec/ProtocolDecoderOutput.html#ProtocolDecoderOutput">ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
527
528 if (out == null) {
529
530 out = new ProtocolDecoderOutputImpl();
531 session.setAttribute(DECODER_OUT, out);
532 }
533
534 return out;
535 }
536
537 private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) {
538 ProtocolEncoderOutput../org/apache/mina/filter/codec/ProtocolEncoderOutput.html#ProtocolEncoderOutput">ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);
539
540 if (out == null) {
541
542 out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
543 session.setAttribute(ENCODER_OUT, out);
544 }
545
546 return out;
547 }
548
549
550
551
552 private void disposeDecoderOut(IoSession session) {
553 session.removeAttribute(DECODER_OUT);
554 }
555 }