1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.mina.core.session.IoEvent;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28
29
30
31
32
33
34 public class IoEventQueueThrottle implements IoEventQueueHandler {
35
36 private final Logger logger = LoggerFactory.getLogger(getClass());
37
38 private final IoEventSizeEstimator eventSizeEstimator;
39 private volatile int threshold;
40
41 private final Object lock = new Object();
42 private final AtomicInteger counter = new AtomicInteger();
43 private int waiters;
44
45 public IoEventQueueThrottle() {
46 this(new DefaultIoEventSizeEstimator(), 65536);
47 }
48
49 public IoEventQueueThrottle(int threshold) {
50 this(new DefaultIoEventSizeEstimator(), threshold);
51 }
52
53 public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {
54 if (eventSizeEstimator == null) {
55 throw new NullPointerException("eventSizeEstimator");
56 }
57 this.eventSizeEstimator = eventSizeEstimator;
58
59 setThreshold(threshold);
60 }
61
62 public IoEventSizeEstimator getEventSizeEstimator() {
63 return eventSizeEstimator;
64 }
65
66 public int getThreshold() {
67 return threshold;
68 }
69
70 public int getCounter() {
71 return counter.get();
72 }
73
74 public void setThreshold(int threshold) {
75 if (threshold <= 0) {
76 throw new IllegalArgumentException("threshold: " + threshold);
77 }
78 this.threshold = threshold;
79 }
80
81 public boolean accept(Object source, IoEvent event) {
82 return true;
83 }
84
85 public void offered(Object source, IoEvent event) {
86 int eventSize = estimateSize(event);
87 int currentCounter = counter.addAndGet(eventSize);
88 logState();
89
90 if (currentCounter >= threshold) {
91 block();
92 }
93 }
94
95 public void polled(Object source, IoEvent event) {
96 int eventSize = estimateSize(event);
97 int currentCounter = counter.addAndGet(-eventSize);
98
99 logState();
100
101 if (currentCounter < threshold) {
102 unblock();
103 }
104 }
105
106 private int estimateSize(IoEvent event) {
107 int size = getEventSizeEstimator().estimateSize(event);
108 if (size < 0) {
109 throw new IllegalStateException(
110 IoEventSizeEstimator.class.getSimpleName() + " returned " +
111 "a negative value (" + size + "): " + event);
112 }
113 return size;
114 }
115
116 private void logState() {
117 if (logger.isDebugEnabled()) {
118 logger.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());
119 }
120 }
121
122 protected void block() {
123 if (logger.isDebugEnabled()) {
124 logger.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);
125 }
126
127 synchronized (lock) {
128 while (counter.get() >= threshold) {
129 waiters ++;
130 try {
131 lock.wait();
132 } catch (InterruptedException e) {
133
134 } finally {
135 waiters --;
136 }
137 }
138 }
139
140 if (logger.isDebugEnabled()) {
141 logger.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);
142 }
143 }
144
145 protected void unblock() {
146 synchronized (lock) {
147 if (waiters > 0) {
148 lock.notify();
149 }
150 }
151 }
152 }