1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.filter.executor;
21
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.apache.mina.core.session.IoEvent;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28
29
30
31
32
33 public class IoEventQueueThrottle implements IoEventQueueHandler {
34
35 private final static Logger LOGGER = LoggerFactory.getLogger(IoEventQueueThrottle.class);
36
37
38 private final IoEventSizeEstimator eventSizeEstimator;
39
40 private volatile int threshold;
41
42 private final Object lock = new Object();
43 private final AtomicInteger counter = new AtomicInteger();
44 private int waiters;
45
46 public IoEventQueueThrottle() {
47 this(new DefaultIoEventSizeEstimator(), 65536);
48 }
49
50 public IoEventQueueThrottle(int threshold) {
51 this(new DefaultIoEventSizeEstimator(), threshold);
52 }
53
54 public IoEventQueueThrottle(IoEventSizeEstimator eventSizeEstimator, int threshold) {
55 if (eventSizeEstimator == null) {
56 throw new NullPointerException("eventSizeEstimator");
57 }
58 this.eventSizeEstimator = eventSizeEstimator;
59
60 setThreshold(threshold);
61 }
62
63 public IoEventSizeEstimator getEventSizeEstimator() {
64 return eventSizeEstimator;
65 }
66
67 public int getThreshold() {
68 return threshold;
69 }
70
71 public int getCounter() {
72 return counter.get();
73 }
74
75 public void setThreshold(int threshold) {
76 if (threshold <= 0) {
77 throw new IllegalArgumentException("threshold: " + threshold);
78 }
79 this.threshold = threshold;
80 }
81
82 public boolean accept(Object source, IoEvent event) {
83 return true;
84 }
85
86 public void offered(Object source, IoEvent event) {
87 int eventSize = estimateSize(event);
88 int currentCounter = counter.addAndGet(eventSize);
89 logState();
90
91 if (currentCounter >= threshold) {
92 block();
93 }
94 }
95
96 public void polled(Object source, IoEvent event) {
97 int eventSize = estimateSize(event);
98 int currentCounter = counter.addAndGet(-eventSize);
99
100 logState();
101
102 if (currentCounter < threshold) {
103 unblock();
104 }
105 }
106
107 private int estimateSize(IoEvent event) {
108 int size = getEventSizeEstimator().estimateSize(event);
109 if (size < 0) {
110 throw new IllegalStateException(
111 IoEventSizeEstimator.class.getSimpleName() + " returned " +
112 "a negative value (" + size + "): " + event);
113 }
114 return size;
115 }
116
117 private void logState() {
118 if (LOGGER.isDebugEnabled()) {
119 LOGGER.debug(Thread.currentThread().getName() + " state: " + counter.get() + " / " + getThreshold());
120 }
121 }
122
123 protected void block() {
124 if (LOGGER.isDebugEnabled()) {
125 LOGGER.debug(Thread.currentThread().getName() + " blocked: " + counter.get() + " >= " + threshold);
126 }
127
128 synchronized (lock) {
129 while (counter.get() >= threshold) {
130 waiters ++;
131 try {
132 lock.wait();
133 } catch (InterruptedException e) {
134
135 } finally {
136 waiters --;
137 }
138 }
139 }
140
141 if (LOGGER.isDebugEnabled()) {
142 LOGGER.debug(Thread.currentThread().getName() + " unblocked: " + counter.get() + " < " + threshold);
143 }
144 }
145
146 protected void unblock() {
147 synchronized (lock) {
148 if (waiters > 0) {
149 lock.notify();
150 }
151 }
152 }
153 }