1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.buffer;
21
22 import java.nio.ByteBuffer;
23 import java.nio.ByteOrder;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Queue;
27
28 import org.apache.mina.util.CircularQueue;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class CachedBufferAllocator implements IoBufferAllocator {
63
64 private static final int DEFAULT_MAX_POOL_SIZE = 8;
65 private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18;
66
67 private final int maxPoolSize;
68 private final int maxCachedBufferSize;
69
70 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
71 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
72
73
74
75
76
77 public CachedBufferAllocator() {
78 this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
79 }
80
81
82
83
84
85
86
87
88
89
90 public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) {
91 if (maxPoolSize < 0) {
92 throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize);
93 }
94 if (maxCachedBufferSize < 0) {
95 throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize);
96 }
97
98 this.maxPoolSize = maxPoolSize;
99 this.maxCachedBufferSize = maxCachedBufferSize;
100
101 this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
102 @Override
103 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
104 return newPoolMap();
105 }
106 };
107 this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
108 @Override
109 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
110 return newPoolMap();
111 }
112 };
113 }
114
115
116
117
118
119 public int getMaxPoolSize() {
120 return maxPoolSize;
121 }
122
123
124
125
126
127
128 public int getMaxCachedBufferSize() {
129 return maxCachedBufferSize;
130 }
131
132 private Map<Integer, Queue<CachedBuffer>> newPoolMap() {
133 Map<Integer, Queue<CachedBuffer>> poolMap =
134 new HashMap<Integer, Queue<CachedBuffer>>();
135 int poolSize = maxPoolSize == 0? DEFAULT_MAX_POOL_SIZE : maxPoolSize;
136 for (int i = 0; i < 31; i ++) {
137 poolMap.put(1 << i, new CircularQueue<CachedBuffer>(poolSize));
138 }
139 poolMap.put(0, new CircularQueue<CachedBuffer>(poolSize));
140 poolMap.put(Integer.MAX_VALUE, new CircularQueue<CachedBuffer>(poolSize));
141 return poolMap;
142 }
143
144 public IoBuffer allocate(int requestedCapacity, boolean direct) {
145 int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity);
146 IoBuffer buf ;
147 if (maxCachedBufferSize != 0 && actualCapacity > maxCachedBufferSize) {
148 if (direct) {
149 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
150 } else {
151 buf = wrap(ByteBuffer.allocate(actualCapacity));
152 }
153 } else {
154 Queue<CachedBuffer> pool;
155 if (direct) {
156 pool = directBuffers.get().get(actualCapacity);
157 } else {
158 pool = heapBuffers.get().get(actualCapacity);
159 }
160
161
162 buf = pool.poll();
163 if (buf != null) {
164 buf.clear();
165 buf.setAutoExpand(false);
166 buf.order(ByteOrder.BIG_ENDIAN);
167 } else {
168 if (direct) {
169 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
170 } else {
171 buf = wrap(ByteBuffer.allocate(actualCapacity));
172 }
173 }
174 }
175
176 buf.limit(requestedCapacity);
177 return buf;
178 }
179
180 public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
181 return allocate(capacity, direct).buf();
182 }
183
184 public IoBuffer wrap(ByteBuffer nioBuffer) {
185 return new CachedBuffer(nioBuffer);
186 }
187
188 public void dispose() {
189 }
190
191 private class CachedBuffer extends AbstractIoBuffer {
192 private final Thread ownerThread;
193 private ByteBuffer buf;
194
195 protected CachedBuffer(ByteBuffer buf) {
196 super(CachedBufferAllocator.this, buf.capacity());
197 this.ownerThread = Thread.currentThread();
198 this.buf = buf;
199 buf.order(ByteOrder.BIG_ENDIAN);
200 }
201
202 protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
203 super(parent);
204 this.ownerThread = Thread.currentThread();
205 this.buf = buf;
206 }
207
208 @Override
209 public ByteBuffer buf() {
210 if (buf == null) {
211 throw new IllegalStateException("Buffer has been freed already.");
212 }
213 return buf;
214 }
215
216 @Override
217 protected void buf(ByteBuffer buf) {
218 ByteBuffer oldBuf = this.buf;
219 this.buf = buf;
220 free(oldBuf);
221 }
222
223 @Override
224 protected IoBuffer duplicate0() {
225 return new CachedBuffer(this, buf().duplicate());
226 }
227
228 @Override
229 protected IoBuffer slice0() {
230 return new CachedBuffer(this, buf().slice());
231 }
232
233 @Override
234 protected IoBuffer asReadOnlyBuffer0() {
235 return new CachedBuffer(this, buf().asReadOnlyBuffer());
236 }
237
238 @Override
239 public byte[] array() {
240 return buf().array();
241 }
242
243 @Override
244 public int arrayOffset() {
245 return buf().arrayOffset();
246 }
247
248 @Override
249 public boolean hasArray() {
250 return buf().hasArray();
251 }
252
253 @Override
254 public void free() {
255 free(buf);
256 buf = null;
257 }
258
259 private void free(ByteBuffer oldBuf) {
260 if (oldBuf == null || oldBuf.capacity() > maxCachedBufferSize ||
261 oldBuf.isReadOnly() || isDerived() ||
262 Thread.currentThread() != ownerThread) {
263 return;
264 }
265
266
267 Queue<CachedBuffer> pool;
268 if (oldBuf.isDirect()) {
269 pool = directBuffers.get().get(oldBuf.capacity());
270 } else {
271 pool = heapBuffers.get().get(oldBuf.capacity());
272 }
273
274 if (pool == null) {
275 return;
276 }
277
278
279 if (maxPoolSize == 0 || pool.size() < maxPoolSize) {
280 pool.offer(new CachedBuffer(oldBuf));
281 }
282 }
283 }
284 }