1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.mina.core.session.IoEvent;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28
29
30
31
32
33 public class IoEventQueueThrottle implements IoEventQueueHandler {
34
35 private final static Logger LOGGER = LoggerFactory.getLogger(IoEventQueueThrottle.class);
36
37
38 private final IoEventSizeEstimator eventSizeEstimator;
39
40 private volatile int threshold;
41
42 private final Object lock = new Object();
43
44 private final AtomicInteger counter = new AtomicInteger();
45
46 private int waiters;
47
48 public IoEventQueueThrottle() {
49 this(new DefaultIoEventSizeEstimator(), 65536);
50 }
51
52 public IoEventQueueThrottle(int threshold) {
53 this(new DefaultIoEventSizeEstimator(), threshold);
54 }
55
56 public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {
57 if (eventSizeEstimator == null) {
58 throw new IllegalArgumentException("eventSizeEstimator");
59 }
60
61 this.eventSizeEstimator = eventSizeEstimator;
62
63 setThreshold(threshold);
64 }
65
66 public IoEventSizeEstimator getEventSizeEstimator() {
67 return eventSizeEstimator;
68 }
69
70 public int getThreshold() {
71 return threshold;
72 }
73
74 public int getCounter() {
75 return counter.get();
76 }
77
78 public void setThreshold(int threshold) {
79 if (threshold <= 0) {
80 throw new IllegalArgumentException("threshold: " + threshold);
81 }
82
83 this.threshold = threshold;
84 }
85
86 public boolean accept(Object source, IoEvent event) {
87 return true;
88 }
89
90 public void offered(Object source, IoEvent event) {
91 int eventSize = estimateSize(event);
92 int currentCounter = counter.addAndGet(eventSize);
93 logState();
94
95 if (currentCounter >= threshold) {
96 block();
97 }
98 }
99
100 public void polled(Object source, IoEvent event) {
101 int eventSize = estimateSize(event);
102 int currentCounter = counter.addAndGet(-eventSize);
103
104 logState();
105
106 if (currentCounter < threshold) {
107 unblock();
108 }
109 }
110
111 private int estimateSize(IoEvent event) {
112 int size = getEventSizeEstimator().estimateSize(event);
113
114 if (size < 0) {
115 throw new IllegalStateException(IoEventSizeEstimator.class.getSimpleName() + " returned "
116 + "a negative value (" + size + "): " + event);
117 }
118
119 return size;
120 }
121
122 private void logState() {
123 if (LOGGER.isDebugEnabled()) {
124 LOGGER.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());
125 }
126 }
127
128 protected void block() {
129 if (LOGGER.isDebugEnabled()) {
130 LOGGER.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);
131 }
132
133 synchronized (lock) {
134 while (counter.get() >= threshold) {
135 waiters++;
136 try {
137 lock.wait();
138 } catch (InterruptedException e) {
139
140 } finally {
141 waiters--;
142 }
143 }
144 }
145
146 if (LOGGER.isDebugEnabled()) {
147 LOGGER.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);
148 }
149 }
150
151 protected void unblock() {
152 synchronized (lock) {
153 if (waiters > 0) {
154 lock.notifyAll();
155 }
156 }
157 }
158 }