1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.buffer;
21
22 import java.nio.ByteBuffer;
23 import java.nio.ByteOrder;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class CachedBufferAllocator implements IoBufferAllocator {
61
62 private static final int DEFAULT_MAX_POOL_SIZE = 8;
63
64 private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18;
65
66 private final int maxPoolSize;
67
68 private final int maxCachedBufferSize;
69
70 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
71
72 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
73
74
75
76
77
78 public CachedBufferAllocator() {
79 this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
80 }
81
82
83
84
85
86
87
88
89
90
91 public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) {
92 if (maxPoolSize < 0) {
93 throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize);
94 }
95
96 if (maxCachedBufferSize < 0) {
97 throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize);
98 }
99
100 this.maxPoolSize = maxPoolSize;
101 this.maxCachedBufferSize = maxCachedBufferSize;
102
103 this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
104 @Override
105 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
106 return newPoolMap();
107 }
108 };
109
110 this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
111 @Override
112 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
113 return newPoolMap();
114 }
115 };
116 }
117
118
119
120
121
122 public int getMaxPoolSize() {
123 return maxPoolSize;
124 }
125
126
127
128
129
130
131 public int getMaxCachedBufferSize() {
132 return maxCachedBufferSize;
133 }
134
135 Map<Integer, Queue<CachedBuffer>> newPoolMap() {
136 Map<Integer, Queue<CachedBuffer>> poolMap = new HashMap<>();
137
138 for (int i = 0; i < 31; i++) {
139 poolMap.put(1 << i, new ConcurrentLinkedQueue<CachedBuffer>());
140 }
141
142 poolMap.put(0, new ConcurrentLinkedQueue<CachedBuffer>());
143 poolMap.put(Integer.MAX_VALUE, new ConcurrentLinkedQueue<CachedBuffer>());
144
145 return poolMap;
146 }
147
148
149
150
151 @Override
152 public IoBuffer allocate(int requestedCapacity, boolean direct) {
153 int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity);
154 IoBuffer buf;
155
156 if ((maxCachedBufferSize != 0) && (actualCapacity > maxCachedBufferSize)) {
157 if (direct) {
158 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
159 } else {
160 buf = wrap(ByteBuffer.allocate(actualCapacity));
161 }
162 } else {
163 Queue<CachedBuffer> pool;
164
165 if (direct) {
166 pool = directBuffers.get().get(actualCapacity);
167 } else {
168 pool = heapBuffers.get().get(actualCapacity);
169 }
170
171
172 buf = pool.poll();
173
174 if (buf != null) {
175 buf.clear();
176 buf.setAutoExpand(false);
177 buf.order(ByteOrder.BIG_ENDIAN);
178 } else {
179 if (direct) {
180 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
181 } else {
182 buf = wrap(ByteBuffer.allocate(actualCapacity));
183 }
184 }
185 }
186
187 buf.limit(requestedCapacity);
188 return buf;
189 }
190
191
192
193
194 @Override
195 public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
196 return allocate(capacity, direct).buf();
197 }
198
199
200
201
202 @Override
203 public IoBuffer wrap(ByteBuffer nioBuffer) {
204 return new CachedBuffer(nioBuffer);
205 }
206
207
208
209
210 @Override
211 public void dispose() {
212
213 }
214
215 private class CachedBuffer extends AbstractIoBuffer {
216 private final Thread ownerThread;
217
218 private ByteBuffer buf;
219
220 protected CachedBuffer(ByteBuffer buf) {
221 super(CachedBufferAllocator.this, buf.capacity());
222 this.ownerThread = Thread.currentThread();
223 this.buf = buf;
224 buf.order(ByteOrder.BIG_ENDIAN);
225 }
226
227 protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
228 super(parent);
229 this.ownerThread = Thread.currentThread();
230 this.buf = buf;
231 }
232
233 @Override
234 public ByteBuffer buf() {
235 if (buf == null) {
236 throw new IllegalStateException("Buffer has been freed already.");
237 }
238 return buf;
239 }
240
241 @Override
242 protected void buf(ByteBuffer buf) {
243 ByteBuffer oldBuf = this.buf;
244 this.buf = buf;
245 free(oldBuf);
246 }
247
248 @Override
249 protected IoBuffer duplicate0() {
250 return new CachedBuffer(this, buf().duplicate());
251 }
252
253 @Override
254 protected IoBuffer slice0() {
255 return new CachedBuffer(this, buf().slice());
256 }
257
258 @Override
259 protected IoBuffer asReadOnlyBuffer0() {
260 return new CachedBuffer(this, buf().asReadOnlyBuffer());
261 }
262
263 @Override
264 public byte[] array() {
265 return buf().array();
266 }
267
268 @Override
269 public int arrayOffset() {
270 return buf().arrayOffset();
271 }
272
273 @Override
274 public boolean hasArray() {
275 return buf().hasArray();
276 }
277
278 @Override
279 public void free() {
280 free(buf);
281 buf = null;
282 }
283
284 private void free(ByteBuffer oldBuf) {
285 if ((oldBuf == null) || ((maxCachedBufferSize != 0) && (oldBuf.capacity() > maxCachedBufferSize))
286 || oldBuf.isReadOnly() || isDerived() || (Thread.currentThread() != ownerThread)) {
287 return;
288 }
289
290
291 Queue<CachedBuffer> pool;
292
293 if (oldBuf.isDirect()) {
294 pool = directBuffers.get().get(oldBuf.capacity());
295 } else {
296 pool = heapBuffers.get().get(oldBuf.capacity());
297 }
298
299 if (pool == null) {
300 return;
301 }
302
303
304 if ((maxPoolSize == 0) || (pool.size() < maxPoolSize)) {
305 pool.offer(new CachedBuffer(oldBuf));
306 }
307 }
308 }
309 }