1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.core.buffer;
21
22 import java.nio.ByteBuffer;
23 import java.nio.ByteOrder;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Queue;
27
28 import org.apache.mina.util.CircularQueue;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public class CachedBufferAllocator implements IoBufferAllocator {
62
63 private static final int DEFAULT_MAX_POOL_SIZE = 8;
64 private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18;
65
66 private final int maxPoolSize;
67 private final int maxCachedBufferSize;
68
69 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
70 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
71
72
73
74
75
76 public CachedBufferAllocator() {
77 this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
78 }
79
80
81
82
83
84
85
86
87
88
89 public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) {
90 if (maxPoolSize < 0) {
91 throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize);
92 }
93 if (maxCachedBufferSize < 0) {
94 throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize);
95 }
96
97 this.maxPoolSize = maxPoolSize;
98 this.maxCachedBufferSize = maxCachedBufferSize;
99
100 this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
101 @Override
102 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
103 return newPoolMap();
104 }
105 };
106 this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
107 @Override
108 protected Map<Integer, Queue<CachedBuffer>> initialValue() {
109 return newPoolMap();
110 }
111 };
112 }
113
114
115
116
117
118 public int getMaxPoolSize() {
119 return maxPoolSize;
120 }
121
122
123
124
125
126
127 public int getMaxCachedBufferSize() {
128 return maxCachedBufferSize;
129 }
130
131 Map<Integer, Queue<CachedBuffer>> newPoolMap() {
132 Map<Integer, Queue<CachedBuffer>> poolMap =
133 new HashMap<Integer, Queue<CachedBuffer>>();
134 int poolSize = maxPoolSize == 0? DEFAULT_MAX_POOL_SIZE : maxPoolSize;
135 for (int i = 0; i < 31; i ++) {
136 poolMap.put(1 << i, new CircularQueue<CachedBuffer>(poolSize));
137 }
138 poolMap.put(0, new CircularQueue<CachedBuffer>(poolSize));
139 poolMap.put(Integer.MAX_VALUE, new CircularQueue<CachedBuffer>(poolSize));
140 return poolMap;
141 }
142
143 public IoBuffer allocate(int requestedCapacity, boolean direct) {
144 int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity);
145 IoBuffer buf ;
146 if (maxCachedBufferSize != 0 && actualCapacity > maxCachedBufferSize) {
147 if (direct) {
148 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
149 } else {
150 buf = wrap(ByteBuffer.allocate(actualCapacity));
151 }
152 } else {
153 Queue<CachedBuffer> pool;
154 if (direct) {
155 pool = directBuffers.get().get(actualCapacity);
156 } else {
157 pool = heapBuffers.get().get(actualCapacity);
158 }
159
160
161 buf = pool.poll();
162 if (buf != null) {
163 buf.clear();
164 buf.setAutoExpand(false);
165 buf.order(ByteOrder.BIG_ENDIAN);
166 } else {
167 if (direct) {
168 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
169 } else {
170 buf = wrap(ByteBuffer.allocate(actualCapacity));
171 }
172 }
173 }
174
175 buf.limit(requestedCapacity);
176 return buf;
177 }
178
179 public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
180 return allocate(capacity, direct).buf();
181 }
182
183 public IoBuffer wrap(ByteBuffer nioBuffer) {
184 return new CachedBuffer(nioBuffer);
185 }
186
187 public void dispose() {
188
189 }
190
191 private class CachedBuffer extends AbstractIoBuffer {
192 private final Thread ownerThread;
193 private ByteBuffer buf;
194
195 protected CachedBuffer(ByteBuffer buf) {
196 super(CachedBufferAllocator.this, buf.capacity());
197 this.ownerThread = Thread.currentThread();
198 this.buf = buf;
199 buf.order(ByteOrder.BIG_ENDIAN);
200 }
201
202 protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
203 super(parent);
204 this.ownerThread = Thread.currentThread();
205 this.buf = buf;
206 }
207
208 @Override
209 public ByteBuffer buf() {
210 if (buf == null) {
211 throw new IllegalStateException("Buffer has been freed already.");
212 }
213 return buf;
214 }
215
216 @Override
217 protected void buf(ByteBuffer buf) {
218 ByteBuffer oldBuf = this.buf;
219 this.buf = buf;
220 free(oldBuf);
221 }
222
223 @Override
224 protected IoBuffer duplicate0() {
225 return new CachedBuffer(this, buf().duplicate());
226 }
227
228 @Override
229 protected IoBuffer slice0() {
230 return new CachedBuffer(this, buf().slice());
231 }
232
233 @Override
234 protected IoBuffer asReadOnlyBuffer0() {
235 return new CachedBuffer(this, buf().asReadOnlyBuffer());
236 }
237
238 @Override
239 public byte[] array() {
240 return buf().array();
241 }
242
243 @Override
244 public int arrayOffset() {
245 return buf().arrayOffset();
246 }
247
248 @Override
249 public boolean hasArray() {
250 return buf().hasArray();
251 }
252
253 @Override
254 public void free() {
255 free(buf);
256 buf = null;
257 }
258
259 private void free(ByteBuffer oldBuf) {
260 if (oldBuf == null || oldBuf.capacity() > maxCachedBufferSize ||
261 oldBuf.isReadOnly() || isDerived() ||
262 Thread.currentThread() != ownerThread) {
263 return;
264 }
265
266
267 Queue<CachedBuffer> pool;
268 if (oldBuf.isDirect()) {
269 pool = directBuffers.get().get(oldBuf.capacity());
270 } else {
271 pool = heapBuffers.get().get(oldBuf.capacity());
272 }
273
274 if (pool == null) {
275 return;
276 }
277
278
279 if (maxPoolSize == 0 || pool.size() < maxPoolSize) {
280 pool.offer(new CachedBuffer(oldBuf));
281 }
282 }
283 }
284 }