View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.buffer;
21  
22  import java.nio.ByteBuffer;
23  import java.nio.ByteOrder;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  
29  /**
30   * An {@link IoBufferAllocator} that caches the buffers which are likely to
31   * be reused during auto-expansion of the buffers.
32   * <p>
33   * In {@link SimpleBufferAllocator}, the underlying {@link ByteBuffer} of
34   * the {@link IoBuffer} is reallocated on its capacity change, which means
35   * the newly allocated bigger {@link ByteBuffer} replaces the old small
36   * {@link ByteBuffer}.  Consequently, the old {@link ByteBuffer} is marked
37   * for garbage collection.
38   * <p>
39   * It's not a problem in most cases as long as the capacity change doesn't
40   * happen frequently.  However, once it happens too often, it burdens the
41   * VM and the cost of filling the newly allocated {@link ByteBuffer} with
42   * {@code NUL} surpass the cost of accessing the cache.  In 2 dual-core
43   * Opteron Italy 270 processors, {@link CachedBufferAllocator} outperformed
44   * {@link SimpleBufferAllocator} in the following situation:
45   * <ul>
46   * <li>when a 32 bytes buffer is expanded 4 or more times,</li> 
47   * <li>when a 64 bytes buffer is expanded 4 or more times,</li>
48   * <li>when a 128 bytes buffer is expanded 2 or more times,</li>
49   * <li>and when a 256 bytes or bigger buffer is expanded 1 or more times.</li>
50   * </ul>
51   * Please note the observation above is subject to change in a different
52   * environment.
53   * <p>
54   * {@link CachedBufferAllocator} uses {@link ThreadLocal} to store the cached
55   * buffer, allocates buffers whose capacity is power of 2 only and provides
56   * performance advantage if {@link IoBuffer#free()} is called properly.
57   *
58   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
59   */
60  public class CachedBufferAllocator implements IoBufferAllocator {
61  
62      private static final int DEFAULT_MAX_POOL_SIZE = 8;
63  
64      private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18; // 256KB
65  
66      private final int maxPoolSize;
67  
68      private final int maxCachedBufferSize;
69  
70      private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
71  
72      private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
73  
74      /**
75       * Creates a new instance with the default parameters
76       * ({@literal #DEFAULT_MAX_POOL_SIZE} and {@literal #DEFAULT_MAX_CACHED_BUFFER_SIZE}). 
77       */
78      public CachedBufferAllocator() {
79          this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
80      }
81  
82      /**
83       * Creates a new instance.
84       * 
85       * @param maxPoolSize the maximum number of buffers with the same capacity per thread.
86       *                    <tt>0</tt> disables this limitation.
87       * @param maxCachedBufferSize the maximum capacity of a cached buffer.
88       *                            A buffer whose capacity is bigger than this value is
89       *                            not pooled. <tt>0</tt> disables this limitation.
90       */
91      public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) {
92          if (maxPoolSize < 0) {
93              throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize);
94          }
95  
96          if (maxCachedBufferSize < 0) {
97              throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize);
98          }
99  
100         this.maxPoolSize = maxPoolSize;
101         this.maxCachedBufferSize = maxCachedBufferSize;
102 
103         this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
104             @Override
105             protected Map<Integer, Queue<CachedBuffer>> initialValue() {
106                 return newPoolMap();
107             }
108         };
109 
110         this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
111             @Override
112             protected Map<Integer, Queue<CachedBuffer>> initialValue() {
113                 return newPoolMap();
114             }
115         };
116     }
117 
118     /**
119      * @return the maximum number of buffers with the same capacity per thread.
120      * <tt>0</tt> means 'no limitation'.
121      */
122     public int getMaxPoolSize() {
123         return maxPoolSize;
124     }
125 
126     /**
127      * @return the maximum capacity of a cached buffer.  A buffer whose
128      * capacity is bigger than this value is not pooled.  <tt>0</tt> means
129      * 'no limitation'.
130      */
131     public int getMaxCachedBufferSize() {
132         return maxCachedBufferSize;
133     }
134 
135     Map<Integer, Queue<CachedBuffer>> newPoolMap() {
136         Map<Integer, Queue<CachedBuffer>> poolMap = new HashMap<>();
137 
138         for (int i = 0; i < 31; i++) {
139             poolMap.put(1 << i, new ConcurrentLinkedQueue<CachedBuffer>());
140         }
141 
142         poolMap.put(0, new ConcurrentLinkedQueue<CachedBuffer>());
143         poolMap.put(Integer.MAX_VALUE, new ConcurrentLinkedQueue<CachedBuffer>());
144 
145         return poolMap;
146     }
147 
148     /**
149      * {@inheritDoc} 
150      */
151     @Override
152     public IoBuffer allocate(int requestedCapacity, boolean direct) {
153         int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity);
154         IoBuffer buf;
155 
156         if ((maxCachedBufferSize != 0) && (actualCapacity > maxCachedBufferSize)) {
157             if (direct) {
158                 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
159             } else {
160                 buf = wrap(ByteBuffer.allocate(actualCapacity));
161             }
162         } else {
163             Queue<CachedBuffer> pool;
164 
165             if (direct) {
166                 pool = directBuffers.get().get(actualCapacity);
167             } else {
168                 pool = heapBuffers.get().get(actualCapacity);
169             }
170 
171             // Recycle if possible.
172             buf = pool.poll();
173 
174             if (buf != null) {
175                 buf.clear();
176                 buf.setAutoExpand(false);
177                 buf.order(ByteOrder.BIG_ENDIAN);
178             } else {
179                 if (direct) {
180                     buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
181                 } else {
182                     buf = wrap(ByteBuffer.allocate(actualCapacity));
183                 }
184             }
185         }
186 
187         buf.limit(requestedCapacity);
188         return buf;
189     }
190 
191     /**
192      * {@inheritDoc} 
193      */
194     @Override
195     public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
196         return allocate(capacity, direct).buf();
197     }
198 
199     /**
200      * {@inheritDoc} 
201      */
202     @Override
203     public IoBuffer wrap(ByteBuffer nioBuffer) {
204         return new CachedBuffer(nioBuffer);
205     }
206 
207     /**
208      * {@inheritDoc} 
209      */
210     @Override
211     public void dispose() {
212         // Do nothing
213     }
214 
215     private class CachedBuffer extends AbstractIoBuffer {
216         private final Thread ownerThread;
217 
218         private ByteBuffer buf;
219 
220         protected CachedBuffer(ByteBuffer buf) {
221             super(CachedBufferAllocator.this, buf.capacity());
222             this.ownerThread = Thread.currentThread();
223             this.buf = buf;
224             buf.order(ByteOrder.BIG_ENDIAN);
225         }
226 
227         protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
228             super(parent);
229             this.ownerThread = Thread.currentThread();
230             this.buf = buf;
231         }
232 
233         @Override
234         public ByteBuffer buf() {
235             if (buf == null) {
236                 throw new IllegalStateException("Buffer has been freed already.");
237             }
238             return buf;
239         }
240 
241         @Override
242         protected void buf(ByteBuffer buf) {
243             ByteBuffer oldBuf = this.buf;
244             this.buf = buf;
245             free(oldBuf);
246         }
247 
248         @Override
249         protected IoBuffer duplicate0() {
250             return new CachedBuffer(this, buf().duplicate());
251         }
252 
253         @Override
254         protected IoBuffer slice0() {
255             return new CachedBuffer(this, buf().slice());
256         }
257 
258         @Override
259         protected IoBuffer asReadOnlyBuffer0() {
260             return new CachedBuffer(this, buf().asReadOnlyBuffer());
261         }
262 
263         @Override
264         public byte[] array() {
265             return buf().array();
266         }
267 
268         @Override
269         public int arrayOffset() {
270             return buf().arrayOffset();
271         }
272 
273         @Override
274         public boolean hasArray() {
275             return buf().hasArray();
276         }
277 
278         @Override
279         public void free() {
280             free(buf);
281             buf = null;
282         }
283 
284         private void free(ByteBuffer oldBuf) {
285             if ((oldBuf == null) || ((maxCachedBufferSize != 0) && (oldBuf.capacity() > maxCachedBufferSize))
286                     || oldBuf.isReadOnly() || isDerived() || (Thread.currentThread() != ownerThread)) {
287                 return;
288             }
289 
290             // Add to the cache.
291             Queue<CachedBuffer> pool;
292 
293             if (oldBuf.isDirect()) {
294                 pool = directBuffers.get().get(oldBuf.capacity());
295             } else {
296                 pool = heapBuffers.get().get(oldBuf.capacity());
297             }
298 
299             if (pool == null) {
300                 return;
301             }
302 
303             // Restrict the size of the pool to prevent OOM.
304             if ((maxPoolSize == 0) || (pool.size() < maxPoolSize)) {
305                 pool.offer(new CachedBuffer(oldBuf));
306             }
307         }
308     }
309 }