1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.filterchain;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26
27 import org.apache.mina.core.buffer.IoBuffer;
28 import org.apache.mina.core.filterchain.IoFilter.NextFilter;
29 import org.apache.mina.core.future.ConnectFuture;
30 import org.apache.mina.core.future.IoFuture;
31 import org.apache.mina.core.session.AbstractIoSession;
32 import org.apache.mina.core.session.AttributeKey;
33 import org.apache.mina.core.session.IdleStatus;
34 import org.apache.mina.core.session.IoSession;
35 import org.apache.mina.core.write.WriteRequest;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39
40
41
42
43
44
45
46 public class DefaultIoFilterChain implements IoFilterChain {
47
48
49
50
51
52
53 public static final AttributeKey SESSION_CREATED_FUTURE = new AttributeKey(
54 DefaultIoFilterChain.class, "connectFuture");
55
56
57 private final AbstractIoSession session;
58
59 private final Map<String, Entry> name2entry = new ConcurrentHashMap<String, Entry>();
60
61
62 private final EntryImpl head;
63
64
65 private final EntryImpl tail;
66
67
68 private final static Logger LOGGER = LoggerFactory.getLogger(DefaultIoFilterChain.class);
69
70
71
72
73
74
75
76
77 public DefaultIoFilterChain(AbstractIoSession session) {
78 if (session == null) {
79 throw new IllegalArgumentException("session");
80 }
81
82 this.session = session;
83 head = new EntryImpl(null, null, "head", new HeadFilter());
84 tail = new EntryImpl(head, null, "tail", new TailFilter());
85 head.nextEntry = tail;
86 }
87
88 public IoSession getSession() {
89 return session;
90 }
91
92 public Entry getEntry(String name) {
93 Entry e = name2entry.get(name);
94 if (e == null) {
95 return null;
96 }
97 return e;
98 }
99
100 public Entry getEntry(IoFilter filter) {
101 EntryImpl e = head.nextEntry;
102 while (e != tail) {
103 if (e.getFilter() == filter) {
104 return e;
105 }
106 e = e.nextEntry;
107 }
108 return null;
109 }
110
111 public Entry getEntry(Class<? extends IoFilter> filterType) {
112 EntryImpl e = head.nextEntry;
113 while (e != tail) {
114 if (filterType.isAssignableFrom(e.getFilter().getClass())) {
115 return e;
116 }
117 e = e.nextEntry;
118 }
119 return null;
120 }
121
122 public IoFilter get(String name) {
123 Entry e = getEntry(name);
124 if (e == null) {
125 return null;
126 }
127
128 return e.getFilter();
129 }
130
131 public IoFilter get(Class<? extends IoFilter> filterType) {
132 Entry e = getEntry(filterType);
133 if (e == null) {
134 return null;
135 }
136
137 return e.getFilter();
138 }
139
140 public NextFilter getNextFilter(String name) {
141 Entry e = getEntry(name);
142 if (e == null) {
143 return null;
144 }
145
146 return e.getNextFilter();
147 }
148
149 public NextFilter getNextFilter(IoFilter filter) {
150 Entry e = getEntry(filter);
151 if (e == null) {
152 return null;
153 }
154
155 return e.getNextFilter();
156 }
157
158 public NextFilter getNextFilter(Class<? extends IoFilter> filterType) {
159 Entry e = getEntry(filterType);
160 if (e == null) {
161 return null;
162 }
163
164 return e.getNextFilter();
165 }
166
167 public synchronized void addFirst(String name, IoFilter filter) {
168 checkAddable(name);
169 register(head, name, filter);
170 }
171
172 public synchronized void addLast(String name, IoFilter filter) {
173 checkAddable(name);
174 register(tail.prevEntry, name, filter);
175 }
176
177 public synchronized void addBefore(String baseName, String name,
178 IoFilter filter) {
179 EntryImpl baseEntry = checkOldName(baseName);
180 checkAddable(name);
181 register(baseEntry.prevEntry, name, filter);
182 }
183
184 public synchronized void addAfter(String baseName, String name,
185 IoFilter filter) {
186 EntryImpl baseEntry = checkOldName(baseName);
187 checkAddable(name);
188 register(baseEntry, name, filter);
189 }
190
191 public synchronized IoFilter remove(String name) {
192 EntryImpl entry = checkOldName(name);
193 deregister(entry);
194 return entry.getFilter();
195 }
196
197 public synchronized void remove(IoFilter filter) {
198 EntryImpl e = head.nextEntry;
199 while (e != tail) {
200 if (e.getFilter() == filter) {
201 deregister(e);
202 return;
203 }
204 e = e.nextEntry;
205 }
206 throw new IllegalArgumentException("Filter not found: "
207 + filter.getClass().getName());
208 }
209
210 public synchronized IoFilter remove(Class<? extends IoFilter> filterType) {
211 EntryImpl e = head.nextEntry;
212 while (e != tail) {
213 if (filterType.isAssignableFrom(e.getFilter().getClass())) {
214 IoFilter oldFilter = e.getFilter();
215 deregister(e);
216 return oldFilter;
217 }
218 e = e.nextEntry;
219 }
220 throw new IllegalArgumentException("Filter not found: "
221 + filterType.getName());
222 }
223
224 public synchronized IoFilter replace(String name, IoFilter newFilter) {
225 EntryImpl entry = checkOldName(name);
226 IoFilter oldFilter = entry.getFilter();
227 entry.setFilter(newFilter);
228 return oldFilter;
229 }
230
231 public synchronized void replace(IoFilter oldFilter, IoFilter newFilter) {
232 EntryImpl e = head.nextEntry;
233 while (e != tail) {
234 if (e.getFilter() == oldFilter) {
235 e.setFilter(newFilter);
236 return;
237 }
238 e = e.nextEntry;
239 }
240 throw new IllegalArgumentException("Filter not found: "
241 + oldFilter.getClass().getName());
242 }
243
244 public synchronized IoFilter replace(
245 Class<? extends IoFilter> oldFilterType, IoFilter newFilter) {
246 EntryImpl e = head.nextEntry;
247 while (e != tail) {
248 if (oldFilterType.isAssignableFrom(e.getFilter().getClass())) {
249 IoFilter oldFilter = e.getFilter();
250 e.setFilter(newFilter);
251 return oldFilter;
252 }
253 e = e.nextEntry;
254 }
255 throw new IllegalArgumentException("Filter not found: "
256 + oldFilterType.getName());
257 }
258
259 public synchronized void clear() throws Exception {
260 List<IoFilterChain.Entry> l = new ArrayList<IoFilterChain.Entry>(
261 name2entry.values());
262 for (IoFilterChain.Entry entry : l) {
263 try {
264 deregister((EntryImpl) entry);
265 } catch (Exception e) {
266 throw new IoFilterLifeCycleException("clear(): "
267 + entry.getName() + " in " + getSession(), e);
268 }
269 }
270 }
271
272 private void register(EntryImpl prevEntry, String name, IoFilter filter) {
273 EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry,
274 name, filter);
275
276 try {
277 filter.onPreAdd(this, name, newEntry.getNextFilter());
278 } catch (Exception e) {
279 throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':'
280 + filter + " in " + getSession(), e);
281 }
282
283 prevEntry.nextEntry.prevEntry = newEntry;
284 prevEntry.nextEntry = newEntry;
285 name2entry.put(name, newEntry);
286
287 try {
288 filter.onPostAdd(this, name, newEntry.getNextFilter());
289 } catch (Exception e) {
290 deregister0(newEntry);
291 throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':'
292 + filter + " in " + getSession(), e);
293 }
294 }
295
296 private void deregister(EntryImpl entry) {
297 IoFilter filter = entry.getFilter();
298
299 try {
300 filter.onPreRemove(this, entry.getName(), entry.getNextFilter());
301 } catch (Exception e) {
302 throw new IoFilterLifeCycleException("onPreRemove(): "
303 + entry.getName() + ':' + filter + " in " + getSession(), e);
304 }
305
306 deregister0(entry);
307
308 try {
309 filter.onPostRemove(this, entry.getName(), entry.getNextFilter());
310 } catch (Exception e) {
311 throw new IoFilterLifeCycleException("onPostRemove(): "
312 + entry.getName() + ':' + filter + " in " + getSession(), e);
313 }
314 }
315
316 private void deregister0(EntryImpl entry) {
317 EntryImpl prevEntry = entry.prevEntry;
318 EntryImpl nextEntry = entry.nextEntry;
319 prevEntry.nextEntry = nextEntry;
320 nextEntry.prevEntry = prevEntry;
321
322 name2entry.remove(entry.name);
323 }
324
325
326
327
328
329
330 private EntryImpl checkOldName(String baseName) {
331 EntryImpl e = (EntryImpl) name2entry.get(baseName);
332 if (e == null) {
333 throw new IllegalArgumentException("Filter not found:" + baseName);
334 }
335 return e;
336 }
337
338
339
340
341 private void checkAddable(String name) {
342 if (name2entry.containsKey(name)) {
343 throw new IllegalArgumentException(
344 "Other filter is using the same name '" + name + "'");
345 }
346 }
347
348 public void fireSessionCreated() {
349 Entry head = this.head;
350 callNextSessionCreated(head, session);
351 }
352
353 private void callNextSessionCreated(Entry entry, IoSession session) {
354 try {
355 IoFilter filter = entry.getFilter();
356 NextFilter nextFilter = entry.getNextFilter();
357 filter.sessionCreated(nextFilter, session);
358 } catch (Throwable e) {
359 fireExceptionCaught(e);
360 }
361 }
362
363 public void fireSessionOpened() {
364 Entry head = this.head;
365 callNextSessionOpened(head, session);
366 }
367
368 private void callNextSessionOpened(Entry entry, IoSession session) {
369 try {
370 IoFilter filter = entry.getFilter();
371 NextFilter nextFilter = entry.getNextFilter();
372 filter.sessionOpened(nextFilter, session);
373 } catch (Throwable e) {
374 fireExceptionCaught(e);
375 }
376 }
377
378 public void fireSessionClosed() {
379
380 try {
381 session.getCloseFuture().setClosed();
382 } catch (Throwable t) {
383 fireExceptionCaught(t);
384 }
385
386
387 Entry head = this.head;
388 callNextSessionClosed(head, session);
389 }
390
391 private void callNextSessionClosed(Entry entry, IoSession session) {
392 try {
393 IoFilter filter = entry.getFilter();
394 NextFilter nextFilter = entry.getNextFilter();
395 filter.sessionClosed(nextFilter, session);
396 } catch (Throwable e) {
397 fireExceptionCaught(e);
398 }
399 }
400
401 public void fireSessionIdle(IdleStatus status) {
402 session.increaseIdleCount(status, System.currentTimeMillis());
403 Entry head = this.head;
404 callNextSessionIdle(head, session, status);
405 }
406
407 private void callNextSessionIdle(Entry entry, IoSession session,
408 IdleStatus status) {
409 try {
410 IoFilter filter = entry.getFilter();
411 NextFilter nextFilter = entry.getNextFilter();
412 filter.sessionIdle(nextFilter, session,
413 status);
414 } catch (Throwable e) {
415 fireExceptionCaught(e);
416 }
417 }
418
419 public void fireMessageReceived(Object message) {
420 if (message instanceof IoBuffer) {
421 session.increaseReadBytes(((IoBuffer) message).remaining(), System
422 .currentTimeMillis());
423 }
424
425 Entry head = this.head;
426 callNextMessageReceived(head, session, message);
427 }
428
429 private void callNextMessageReceived(Entry entry, IoSession session,
430 Object message) {
431 try {
432 IoFilter filter = entry.getFilter();
433 NextFilter nextFilter = entry.getNextFilter();
434 filter.messageReceived(nextFilter, session,
435 message);
436 } catch (Throwable e) {
437 fireExceptionCaught(e);
438 }
439 }
440
441 public void fireMessageSent(WriteRequest request) {
442 session.increaseWrittenMessages(request, System.currentTimeMillis());
443
444 try {
445 request.getFuture().setWritten();
446 } catch (Throwable t) {
447 fireExceptionCaught(t);
448 }
449
450 Entry head = this.head;
451
452 if (!request.isEncoded()) {
453 callNextMessageSent(head, session, request);
454 }
455 }
456
457 private void callNextMessageSent(Entry entry, IoSession session,
458 WriteRequest writeRequest) {
459 try {
460 IoFilter filter = entry.getFilter();
461 NextFilter nextFilter = entry.getNextFilter();
462 filter.messageSent(nextFilter, session,
463 writeRequest);
464 } catch (Throwable e) {
465 fireExceptionCaught(e);
466 }
467 }
468
469 public void fireExceptionCaught(Throwable cause) {
470 Entry head = this.head;
471 callNextExceptionCaught(head, session, cause);
472 }
473
474 private void callNextExceptionCaught(Entry entry, IoSession session,
475 Throwable cause) {
476
477 ConnectFuture future = (ConnectFuture) session
478 .removeAttribute(SESSION_CREATED_FUTURE);
479 if (future == null) {
480 try {
481 IoFilter filter = entry.getFilter();
482 NextFilter nextFilter = entry.getNextFilter();
483 filter.exceptionCaught(nextFilter,
484 session, cause);
485 } catch (Throwable e) {
486 LOGGER
487 .warn(
488 "Unexpected exception from exceptionCaught handler.",
489 e);
490 }
491 } else {
492
493
494 session.close(true);
495 future.setException(cause);
496 }
497 }
498
499 public void fireFilterWrite(WriteRequest writeRequest) {
500 Entry tail = this.tail;
501 callPreviousFilterWrite(tail, session, writeRequest);
502 }
503
504 private void callPreviousFilterWrite(Entry entry, IoSession session,
505 WriteRequest writeRequest) {
506 try {
507 IoFilter filter = entry.getFilter();
508 NextFilter nextFilter = entry.getNextFilter();
509 filter.filterWrite(nextFilter, session, writeRequest);
510 } catch (Throwable e) {
511 writeRequest.getFuture().setException(e);
512 fireExceptionCaught(e);
513 }
514 }
515
516 public void fireFilterClose() {
517 Entry tail = this.tail;
518 callPreviousFilterClose(tail, session);
519 }
520
521 private void callPreviousFilterClose(Entry entry, IoSession session) {
522 try {
523 IoFilter filter = entry.getFilter();
524 NextFilter nextFilter = entry.getNextFilter();
525 filter.filterClose(nextFilter, session);
526 } catch (Throwable e) {
527 fireExceptionCaught(e);
528 }
529 }
530
531 public List<Entry> getAll() {
532 List<Entry> list = new ArrayList<Entry>();
533 EntryImpl e = head.nextEntry;
534 while (e != tail) {
535 list.add(e);
536 e = e.nextEntry;
537 }
538
539 return list;
540 }
541
542 public List<Entry> getAllReversed() {
543 List<Entry> list = new ArrayList<Entry>();
544 EntryImpl e = tail.prevEntry;
545 while (e != head) {
546 list.add(e);
547 e = e.prevEntry;
548 }
549 return list;
550 }
551
552 public boolean contains(String name) {
553 return getEntry(name) != null;
554 }
555
556 public boolean contains(IoFilter filter) {
557 return getEntry(filter) != null;
558 }
559
560 public boolean contains(Class<? extends IoFilter> filterType) {
561 return getEntry(filterType) != null;
562 }
563
564 @Override
565 public String toString() {
566 StringBuilder buf = new StringBuilder();
567 buf.append("{ ");
568
569 boolean empty = true;
570
571 EntryImpl e = head.nextEntry;
572 while (e != tail) {
573 if (!empty) {
574 buf.append(", ");
575 } else {
576 empty = false;
577 }
578
579 buf.append('(');
580 buf.append(e.getName());
581 buf.append(':');
582 buf.append(e.getFilter());
583 buf.append(')');
584
585 e = e.nextEntry;
586 }
587
588 if (empty) {
589 buf.append("empty");
590 }
591
592 buf.append(" }");
593
594 return buf.toString();
595 }
596
597 private class HeadFilter extends IoFilterAdapter {
598 @SuppressWarnings("unchecked")
599 @Override
600 public void filterWrite(NextFilter nextFilter, IoSession session,
601 WriteRequest writeRequest) throws Exception {
602
603 AbstractIoSession s = (AbstractIoSession) session;
604
605
606 if (writeRequest.getMessage() instanceof IoBuffer) {
607 IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
608
609
610
611 buffer.mark();
612 int remaining = buffer.remaining();
613 if (remaining == 0) {
614
615
616 s.increaseScheduledWriteMessages();
617 } else {
618 s.increaseScheduledWriteBytes(remaining);
619 }
620 } else {
621 s.increaseScheduledWriteMessages();
622 }
623
624 s.getWriteRequestQueue().offer(s, writeRequest);
625 if (!s.isWriteSuspended()) {
626 s.getProcessor().flush(s);
627 }
628 }
629
630 @SuppressWarnings("unchecked")
631 @Override
632 public void filterClose(NextFilter nextFilter, IoSession session)
633 throws Exception {
634 ((AbstractIoSession) session).getProcessor().remove(((AbstractIoSession) session));
635 }
636 }
637
638 private static class TailFilter extends IoFilterAdapter {
639 @Override
640 public void sessionCreated(NextFilter nextFilter, IoSession session)
641 throws Exception {
642 try {
643 session.getHandler().sessionCreated(session);
644 } finally {
645
646 ConnectFuture future = (ConnectFuture) session
647 .removeAttribute(SESSION_CREATED_FUTURE);
648 if (future != null) {
649 future.setSession(session);
650 }
651 }
652 }
653
654 @Override
655 public void sessionOpened(NextFilter nextFilter, IoSession session)
656 throws Exception {
657 session.getHandler().sessionOpened(session);
658 }
659
660 @Override
661 public void sessionClosed(NextFilter nextFilter, IoSession session)
662 throws Exception {
663 AbstractIoSession s = (AbstractIoSession) session;
664 try {
665 s.getHandler().sessionClosed(session);
666 } finally {
667 try {
668 s.getWriteRequestQueue().dispose(session);
669 } finally {
670 try {
671 s.getAttributeMap().dispose(session);
672 } finally {
673 try {
674
675 session.getFilterChain().clear();
676 } finally {
677 if (s.getConfig().isUseReadOperation()) {
678 s.offerClosedReadFuture();
679 }
680 }
681 }
682 }
683 }
684 }
685
686 @Override
687 public void sessionIdle(NextFilter nextFilter, IoSession session,
688 IdleStatus status) throws Exception {
689 session.getHandler().sessionIdle(session, status);
690 }
691
692 @Override
693 public void exceptionCaught(NextFilter nextFilter, IoSession session,
694 Throwable cause) throws Exception {
695 AbstractIoSession s = (AbstractIoSession) session;
696 try {
697 s.getHandler().exceptionCaught(s, cause);
698 } finally {
699 if (s.getConfig().isUseReadOperation()) {
700 s.offerFailedReadFuture(cause);
701 }
702 }
703 }
704
705 @Override
706 public void messageReceived(NextFilter nextFilter, IoSession session,
707 Object message) throws Exception {
708 AbstractIoSession s = (AbstractIoSession) session;
709 if (!(message instanceof IoBuffer)) {
710 s.increaseReadMessages(System.currentTimeMillis());
711 } else if (!((IoBuffer) message).hasRemaining()) {
712 s.increaseReadMessages(System.currentTimeMillis());
713 }
714
715 try {
716 session.getHandler().messageReceived(s, message);
717 } finally {
718 if (s.getConfig().isUseReadOperation()) {
719 s.offerReadFuture(message);
720 }
721 }
722 }
723
724 @Override
725 public void messageSent(NextFilter nextFilter, IoSession session,
726 WriteRequest writeRequest) throws Exception {
727 session.getHandler()
728 .messageSent(session, writeRequest.getMessage());
729 }
730
731 @Override
732 public void filterWrite(NextFilter nextFilter, IoSession session,
733 WriteRequest writeRequest) throws Exception {
734 nextFilter.filterWrite(session, writeRequest);
735 }
736
737 @Override
738 public void filterClose(NextFilter nextFilter, IoSession session)
739 throws Exception {
740 nextFilter.filterClose(session);
741 }
742 }
743
744 private class EntryImpl implements Entry {
745 private EntryImpl prevEntry;
746
747 private EntryImpl nextEntry;
748
749 private final String name;
750
751 private IoFilter filter;
752
753 private final NextFilter nextFilter;
754
755 private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry,
756 String name, IoFilter filter) {
757 if (filter == null) {
758 throw new IllegalArgumentException("filter");
759 }
760 if (name == null) {
761 throw new IllegalArgumentException("name");
762 }
763
764 this.prevEntry = prevEntry;
765 this.nextEntry = nextEntry;
766 this.name = name;
767 this.filter = filter;
768 this.nextFilter = new NextFilter() {
769 public void sessionCreated(IoSession session) {
770 Entry nextEntry = EntryImpl.this.nextEntry;
771 callNextSessionCreated(nextEntry, session);
772 }
773
774 public void sessionOpened(IoSession session) {
775 Entry nextEntry = EntryImpl.this.nextEntry;
776 callNextSessionOpened(nextEntry, session);
777 }
778
779 public void sessionClosed(IoSession session) {
780 Entry nextEntry = EntryImpl.this.nextEntry;
781 callNextSessionClosed(nextEntry, session);
782 }
783
784 public void sessionIdle(IoSession session, IdleStatus status) {
785 Entry nextEntry = EntryImpl.this.nextEntry;
786 callNextSessionIdle(nextEntry, session, status);
787 }
788
789 public void exceptionCaught(IoSession session, Throwable cause) {
790 Entry nextEntry = EntryImpl.this.nextEntry;
791 callNextExceptionCaught(nextEntry, session, cause);
792 }
793
794 public void messageReceived(IoSession session, Object message) {
795 Entry nextEntry = EntryImpl.this.nextEntry;
796 callNextMessageReceived(nextEntry, session, message);
797 }
798
799 public void messageSent(IoSession session,
800 WriteRequest writeRequest) {
801 Entry nextEntry = EntryImpl.this.nextEntry;
802 callNextMessageSent(nextEntry, session, writeRequest);
803 }
804
805 public void filterWrite(IoSession session,
806 WriteRequest writeRequest) {
807 Entry nextEntry = EntryImpl.this.prevEntry;
808 callPreviousFilterWrite(nextEntry, session, writeRequest);
809 }
810
811 public void filterClose(IoSession session) {
812 Entry nextEntry = EntryImpl.this.prevEntry;
813 callPreviousFilterClose(nextEntry, session);
814 }
815
816 public String toString() {
817 return EntryImpl.this.nextEntry.name;
818 }
819 };
820 }
821
822 public String getName() {
823 return name;
824 }
825
826 public IoFilter getFilter() {
827 return filter;
828 }
829
830 private void setFilter(IoFilter filter) {
831 if (filter == null) {
832 throw new IllegalArgumentException("filter");
833 }
834
835 this.filter = filter;
836 }
837
838 public NextFilter getNextFilter() {
839 return nextFilter;
840 }
841
842 @Override
843 public String toString() {
844 StringBuilder sb = new StringBuilder();
845
846
847 sb.append("('").append(getName()).append('\'');
848
849
850 sb.append(", prev: '");
851
852 if (prevEntry != null) {
853 sb.append(prevEntry.name);
854 sb.append(':');
855 sb.append(prevEntry.getFilter().getClass().getSimpleName());
856 } else {
857 sb.append("null");
858 }
859
860
861 sb.append("', next: '");
862
863 if (nextEntry != null) {
864 sb.append(nextEntry.name);
865 sb.append(':');
866 sb.append(nextEntry.getFilter().getClass().getSimpleName());
867 } else {
868 sb.append("null");
869 }
870
871 sb.append("')");
872 return sb.toString();
873 }
874
875 public void addAfter(String name, IoFilter filter) {
876 DefaultIoFilterChain.this.addAfter(getName(), name, filter);
877 }
878
879 public void addBefore(String name, IoFilter filter) {
880 DefaultIoFilterChain.this.addBefore(getName(), name, filter);
881 }
882
883 public void remove() {
884 DefaultIoFilterChain.this.remove(getName());
885 }
886
887 public void replace(IoFilter newFilter) {
888 DefaultIoFilterChain.this.replace(getName(), newFilter);
889 }
890 }
891 }