1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.mina.core.session.IoEvent;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28
29
30
31
32
33 public class IoEventQueueThrottle implements IoEventQueueHandler {
34
35 private static final Logger LOGGER = LoggerFactory.getLogger(IoEventQueueThrottle.class);
36
37
38 private final IoEventSizeEstimator eventSizeEstimator;
39
40 private volatile int threshold;
41
42 private final Object lock = new Object();
43
44
45 private final AtomicInteger counter = new AtomicInteger();
46
47 private int waiters;
48
49
50
51
52 public IoEventQueueThrottle() {
53 this(new DefaultIoEventSizeEstimator(), 65536);
54 }
55
56
57
58
59
60
61 public IoEventQueueThrottle(int threshold) {
62 this(new DefaultIoEventSizeEstimator(), threshold);
63 }
64
65
66
67
68
69
70
71 public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {
72 if (eventSizeEstimator == null) {
73 throw new IllegalArgumentException("eventSizeEstimator");
74 }
75
76 this.eventSizeEstimator = eventSizeEstimator;
77
78 setThreshold(threshold);
79 }
80
81
82
83
84 public IoEventSizeEstimator getEventSizeEstimator() {
85 return eventSizeEstimator;
86 }
87
88
89
90
91 public int getThreshold() {
92 return threshold;
93 }
94
95
96
97
98 public int getCounter() {
99 return counter.get();
100 }
101
102
103
104
105
106
107 public void setThreshold(int threshold) {
108 if (threshold <= 0) {
109 throw new IllegalArgumentException("threshold: " + threshold);
110 }
111
112 this.threshold = threshold;
113 }
114
115
116
117
118 @Override
119 public boolean accept(Object source, IoEvent event) {
120 return true;
121 }
122
123
124
125
126 @Override
127 public void offered(Object source, IoEvent event) {
128 int eventSize = estimateSize(event);
129 int currentCounter = counter.addAndGet(eventSize);
130 logState();
131
132 if (currentCounter >= threshold) {
133 block();
134 }
135 }
136
137
138
139
140 @Override
141 public void polled(Object source, IoEvent event) {
142 int eventSize = estimateSize(event);
143 int currentCounter = counter.addAndGet(-eventSize);
144
145 logState();
146
147 if (currentCounter < threshold) {
148 unblock();
149 }
150 }
151
152 private int estimateSize(IoEvent event) {
153 int size = getEventSizeEstimator().estimateSize(event);
154
155 if (size < 0) {
156 throw new IllegalStateException(IoEventSizeEstimator.class.getSimpleName() + " returned "
157 + "a negative value (" + size + "): " + event);
158 }
159
160 return size;
161 }
162
163 private void logState() {
164 if (LOGGER.isDebugEnabled()) {
165 LOGGER.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());
166 }
167 }
168
169 protected void block() {
170 if (LOGGER.isDebugEnabled()) {
171 LOGGER.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);
172 }
173
174 synchronized (lock) {
175 while (counter.get() >= threshold) {
176 waiters++;
177 try {
178 lock.wait();
179 } catch (InterruptedException e) {
180
181 } finally {
182 waiters--;
183 }
184 }
185 }
186
187 if (LOGGER.isDebugEnabled()) {
188 LOGGER.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);
189 }
190 }
191
192 protected void unblock() {
193 synchronized (lock) {
194 if (waiters > 0) {
195 lock.notifyAll();
196 }
197 }
198 }
199 }