1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.buffer;
21
22 import java.nio.ByteBuffer;
23 import java.nio.ByteOrder;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public class CachedBufferAllocator implements IoBufferAllocator {
61
62 private static final int DEFAULT_MAX_POOL_SIZE = 8;
63
64 private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18;
65
66 private final int maxPoolSize;
67
68 private final int maxCachedBufferSize;
69
70 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
71
72 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
73
74
75
76
77
78 public CachedBufferAllocator() {
79 this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
80 }
81
82
83
84
85
86
87
88
89
90
91 public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) {
92 if (maxPoolSize < 0) {
93 throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize);
94 }
95
96 if (maxCachedBufferSize < 0) {
97 throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize);
98 }
99
100 this.maxPoolSize = maxPoolSize;
101 this.maxCachedBufferSize = maxCachedBufferSize;
102
103 this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
104 @Override
105 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
106 return newPoolMap();
107 }
108 };
109
110 this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
111 @Override
112 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
113 return newPoolMap();
114 }
115 };
116 }
117
118
119
120
121
122 public int getMaxPoolSize() {
123 return maxPoolSize;
124 }
125
126
127
128
129
130
131 public int getMaxCachedBufferSize() {
132 return maxCachedBufferSize;
133 }
134
135 Map<Integer, Queue<CachedBuffer>> newPoolMap() {
136 Map<Integer, Queue<CachedBuffer>> poolMap = new HashMap<Integer, Queue<CachedBuffer>>();
137 int poolSize = maxPoolSize == 0 ? DEFAULT_MAX_POOL_SIZE : maxPoolSize;
138
139 for (int i = 0; i < 31; i++) {
140 poolMap.put(1 << i, new ConcurrentLinkedQueue<CachedBuffer>());
141 }
142
143 poolMap.put(0, new ConcurrentLinkedQueue<CachedBuffer>());
144 poolMap.put(Integer.MAX_VALUE, new ConcurrentLinkedQueue<CachedBuffer>());
145
146 return poolMap;
147 }
148
149 public IoBuffer allocate(int requestedCapacity, boolean direct) {
150 int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity);
151 IoBuffer buf;
152
153 if ((maxCachedBufferSize != 0) && (actualCapacity > maxCachedBufferSize)) {
154 if (direct) {
155 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
156 } else {
157 buf = wrap(ByteBuffer.allocate(actualCapacity));
158 }
159 } else {
160 Queue<CachedBuffer> pool;
161
162 if (direct) {
163 pool = directBuffers.get().get(actualCapacity);
164 } else {
165 pool = heapBuffers.get().get(actualCapacity);
166 }
167
168
169 buf = pool.poll();
170
171 if (buf != null) {
172 buf.clear();
173 buf.setAutoExpand(false);
174 buf.order(ByteOrder.BIG_ENDIAN);
175 } else {
176 if (direct) {
177 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
178 } else {
179 buf = wrap(ByteBuffer.allocate(actualCapacity));
180 }
181 }
182 }
183
184 buf.limit(requestedCapacity);
185 return buf;
186 }
187
188 public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
189 return allocate(capacity, direct).buf();
190 }
191
192 public IoBuffer wrap(ByteBuffer nioBuffer) {
193 return new CachedBuffer(nioBuffer);
194 }
195
196 public void dispose() {
197
198 }
199
200 private class CachedBuffer extends AbstractIoBuffer {
201 private final Thread ownerThread;
202
203 private ByteBuffer buf;
204
205 protected CachedBuffer(ByteBuffer buf) {
206 super(CachedBufferAllocator.this, buf.capacity());
207 this.ownerThread = Thread.currentThread();
208 this.buf = buf;
209 buf.order(ByteOrder.BIG_ENDIAN);
210 }
211
212 protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
213 super(parent);
214 this.ownerThread = Thread.currentThread();
215 this.buf = buf;
216 }
217
218 @Override
219 public ByteBuffer buf() {
220 if (buf == null) {
221 throw new IllegalStateException("Buffer has been freed already.");
222 }
223 return buf;
224 }
225
226 @Override
227 protected void buf(ByteBuffer buf) {
228 ByteBuffer oldBuf = this.buf;
229 this.buf = buf;
230 free(oldBuf);
231 }
232
233 @Override
234 protected IoBuffer duplicate0() {
235 return new CachedBuffer(this, buf().duplicate());
236 }
237
238 @Override
239 protected IoBuffer slice0() {
240 return new CachedBuffer(this, buf().slice());
241 }
242
243 @Override
244 protected IoBuffer asReadOnlyBuffer0() {
245 return new CachedBuffer(this, buf().asReadOnlyBuffer());
246 }
247
248 @Override
249 public byte[] array() {
250 return buf().array();
251 }
252
253 @Override
254 public int arrayOffset() {
255 return buf().arrayOffset();
256 }
257
258 @Override
259 public boolean hasArray() {
260 return buf().hasArray();
261 }
262
263 @Override
264 public void free() {
265 free(buf);
266 buf = null;
267 }
268
269 private void free(ByteBuffer oldBuf) {
270 if ((oldBuf == null) || ((maxCachedBufferSize != 0) && (oldBuf.capacity() > maxCachedBufferSize))
271 || oldBuf.isReadOnly() || isDerived() || (Thread.currentThread() != ownerThread)) {
272 return;
273 }
274
275
276 Queue<CachedBuffer> pool;
277
278 if (oldBuf.isDirect()) {
279 pool = directBuffers.get().get(oldBuf.capacity());
280 } else {
281 pool = heapBuffers.get().get(oldBuf.capacity());
282 }
283
284 if (pool == null) {
285 return;
286 }
287
288
289 if ((maxPoolSize == 0) || (pool.size() < maxPoolSize)) {
290 pool.offer(new CachedBuffer(oldBuf));
291 }
292 }
293 }
294 }