View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.buffer;
21  
22  import java.nio.ByteBuffer;
23  import java.nio.ByteOrder;
24  import java.util.HashMap;
25  import java.util.Map;
26  import java.util.Queue;
27  
28  import org.apache.mina.util.CircularQueue;
29  
30  /**
31   * An {@link IoBufferAllocator} that caches the buffers which are likely to
32   * be reused during auto-expansion of the buffers.
33   * <p>
34   * In {@link SimpleBufferAllocator}, the underlying {@link ByteBuffer} of
35   * the {@link IoBuffer} is reallocated on its capacity change, which means
36   * the newly allocated bigger {@link ByteBuffer} replaces the old small
37   * {@link ByteBuffer}.  Consequently, the old {@link ByteBuffer} is marked
38   * for garbage collection.
39   * <p>
40   * It's not a problem in most cases as long as the capacity change doesn't
41   * happen frequently.  However, once it happens too often, it burdens the
42   * VM and the cost of filling the newly allocated {@link ByteBuffer} with
43   * {@code NUL} surpass the cost of accessing the cache.  In 2 dual-core
44   * Opteron Italy 270 processors, {@link CachedBufferAllocator} outperformed
45   * {@link SimpleBufferAllocator} in the following situation:
46   * <ul>
47   * <li>when a 32 bytes buffer is expanded 4 or more times,</li> 
48   * <li>when a 64 bytes buffer is expanded 4 or more times,</li>
49   * <li>when a 128 bytes buffer is expanded 2 or more times,</li>
50   * <li>and when a 256 bytes or bigger buffer is expanded 1 or more times.</li>
51   * </ul>
52   * Please note the observation above is subject to change in a different
53   * environment.
54   * <p>
55   * {@link CachedBufferAllocator} uses {@link ThreadLocal} to store the cached
56   * buffer, allocates buffers whose capacity is power of 2 only and provides
57   * performance advantage if {@link IoBuffer#free()} is called properly.
58   *
59   * @author The Apache MINA Project (dev@mina.apache.org)
60   * @version $Rev: 671827 $, $Date: 2008-06-26 10:49:48 +0200 (jeu, 26 jun 2008) $
61   */
62  public class CachedBufferAllocator implements IoBufferAllocator {
63  
64      private static final int DEFAULT_MAX_POOL_SIZE = 8;
65      private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18; // 256KB
66      
67      private final int maxPoolSize;
68      private final int maxCachedBufferSize;
69  
70      private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
71      private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
72      
73      /**
74       * Creates a new instance with the default parameters
75       * ({@literal #DEFAULT_MAX_POOL_SIZE} and {@literal #DEFAULT_MAX_CACHED_BUFFER_SIZE}). 
76       */
77      public CachedBufferAllocator() {
78          this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
79      }
80      
81      /**
82       * Creates a new instance.
83       * 
84       * @param maxPoolSize the maximum number of buffers with the same capacity per thread.
85       *                    <tt>0</tt> disables this limitation.
86       * @param maxCachedBufferSize the maximum capacity of a cached buffer.
87       *                            A buffer whose capacity is bigger than this value is
88       *                            not pooled. <tt>0</tt> disables this limitation.
89       */
90      public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) {
91          if (maxPoolSize < 0) {
92              throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize);
93          }
94          if (maxCachedBufferSize < 0) {
95              throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize);
96          }
97          
98          this.maxPoolSize = maxPoolSize;
99          this.maxCachedBufferSize = maxCachedBufferSize;
100         
101         this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
102             @Override
103             protected Map<Integer, Queue<CachedBuffer>> initialValue() {
104                 return newPoolMap();
105             }
106         };
107         this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
108             @Override
109             protected Map<Integer, Queue<CachedBuffer>> initialValue() {
110                 return newPoolMap();
111             }
112         };
113     }
114     
115     /**
116      * Returns the maximum number of buffers with the same capacity per thread.
117      * <tt>0</tt> means 'no limitation'.
118      */
119     public int getMaxPoolSize() {
120         return maxPoolSize;
121     }
122 
123     /**
124      * Returns the maximum capacity of a cached buffer.  A buffer whose
125      * capacity is bigger than this value is not pooled.  <tt>0</tt> means
126      * 'no limitation'.
127      */
128     public int getMaxCachedBufferSize() {
129         return maxCachedBufferSize;
130     }
131 
132     private Map<Integer, Queue<CachedBuffer>> newPoolMap() {
133         Map<Integer, Queue<CachedBuffer>> poolMap =
134             new HashMap<Integer, Queue<CachedBuffer>>();
135         int poolSize = maxPoolSize == 0? DEFAULT_MAX_POOL_SIZE : maxPoolSize;
136         for (int i = 0; i < 31; i ++) {
137             poolMap.put(1 << i, new CircularQueue<CachedBuffer>(poolSize));
138         }
139         poolMap.put(0, new CircularQueue<CachedBuffer>(poolSize));
140         poolMap.put(Integer.MAX_VALUE, new CircularQueue<CachedBuffer>(poolSize));
141         return poolMap;
142     }
143 
144     public IoBuffer allocate(int requestedCapacity, boolean direct) {
145         int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity);
146         IoBuffer buf ;
147         if (maxCachedBufferSize != 0 && actualCapacity > maxCachedBufferSize) {
148             if (direct) {
149                 buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
150             } else {
151                 buf = wrap(ByteBuffer.allocate(actualCapacity));
152             }
153         } else {
154             Queue<CachedBuffer> pool;
155             if (direct) {
156                 pool = directBuffers.get().get(actualCapacity);
157             } else {
158                 pool = heapBuffers.get().get(actualCapacity);
159             }
160             
161             // Recycle if possible.
162             buf = pool.poll();
163             if (buf != null) {
164                 buf.clear();
165                 buf.setAutoExpand(false);
166                 buf.order(ByteOrder.BIG_ENDIAN);
167             } else {
168                 if (direct) {
169                     buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
170                 } else {
171                     buf = wrap(ByteBuffer.allocate(actualCapacity));
172                 }
173             }
174         }
175         
176         buf.limit(requestedCapacity);
177         return buf;
178     }
179     
180     public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
181         return allocate(capacity, direct).buf();
182     }
183     
184     public IoBuffer wrap(ByteBuffer nioBuffer) {
185         return new CachedBuffer(nioBuffer);
186     }
187 
188     public void dispose() {
189     }
190     
191     private class CachedBuffer extends AbstractIoBuffer {
192         private final Thread ownerThread;
193         private ByteBuffer buf;
194 
195         protected CachedBuffer(ByteBuffer buf) {
196             super(CachedBufferAllocator.this, buf.capacity());
197             this.ownerThread = Thread.currentThread();
198             this.buf = buf;
199             buf.order(ByteOrder.BIG_ENDIAN);
200         }
201         
202         protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
203             super(parent);
204             this.ownerThread = Thread.currentThread();
205             this.buf = buf;
206         }
207 
208         @Override
209         public ByteBuffer buf() {
210             if (buf == null) {
211                 throw new IllegalStateException("Buffer has been freed already.");
212             }
213             return buf;
214         }
215         
216         @Override
217         protected void buf(ByteBuffer buf) {
218             ByteBuffer oldBuf = this.buf;
219             this.buf = buf;
220             free(oldBuf);
221         }
222 
223         @Override
224         protected IoBuffer duplicate0() {
225             return new CachedBuffer(this, buf().duplicate());
226         }
227 
228         @Override
229         protected IoBuffer slice0() {
230             return new CachedBuffer(this, buf().slice());
231         }
232 
233         @Override
234         protected IoBuffer asReadOnlyBuffer0() {
235             return new CachedBuffer(this, buf().asReadOnlyBuffer());
236         }
237 
238         @Override
239         public byte[] array() {
240             return buf().array();
241         }
242 
243         @Override
244         public int arrayOffset() {
245             return buf().arrayOffset();
246         }
247 
248         @Override
249         public boolean hasArray() {
250             return buf().hasArray();
251         }
252 
253         @Override
254         public void free() {
255             free(buf);
256             buf = null;
257         }
258         
259         private void free(ByteBuffer oldBuf) {
260             if (oldBuf == null || oldBuf.capacity() > maxCachedBufferSize ||
261                 oldBuf.isReadOnly() || isDerived() ||
262                 Thread.currentThread() != ownerThread) {
263                 return;
264             }
265 
266             // Add to the cache.
267             Queue<CachedBuffer> pool;
268             if (oldBuf.isDirect()) {
269                 pool = directBuffers.get().get(oldBuf.capacity());
270             } else {
271                 pool = heapBuffers.get().get(oldBuf.capacity());
272             }
273             
274             if (pool == null) {
275                 return;
276             }
277 
278             // Restrict the size of the pool to prevent OOM.
279             if (maxPoolSize == 0 || pool.size() < maxPoolSize) {
280                 pool.offer(new CachedBuffer(oldBuf));
281             }
282         }
283     }
284 }