001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.buffer;
021
022import java.nio.ByteBuffer;
023import java.nio.ByteOrder;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Queue;
027import java.util.concurrent.ConcurrentLinkedQueue;
028
029/**
030 * An {@link IoBufferAllocator} that caches the buffers which are likely to
031 * be reused during auto-expansion of the buffers.
032 * <p>
033 * In {@link SimpleBufferAllocator}, the underlying {@link ByteBuffer} of
034 * the {@link IoBuffer} is reallocated on its capacity change, which means
035 * the newly allocated bigger {@link ByteBuffer} replaces the old small
036 * {@link ByteBuffer}.  Consequently, the old {@link ByteBuffer} is marked
037 * for garbage collection.
038 * <p>
039 * It's not a problem in most cases as long as the capacity change doesn't
040 * happen frequently.  However, once it happens too often, it burdens the
041 * VM and the cost of filling the newly allocated {@link ByteBuffer} with
042 * {@code NUL} surpass the cost of accessing the cache.  In 2 dual-core
043 * Opteron Italy 270 processors, {@link CachedBufferAllocator} outperformed
044 * {@link SimpleBufferAllocator} in the following situation:
045 * <ul>
046 * <li>when a 32 bytes buffer is expanded 4 or more times,</li> 
047 * <li>when a 64 bytes buffer is expanded 4 or more times,</li>
048 * <li>when a 128 bytes buffer is expanded 2 or more times,</li>
049 * <li>and when a 256 bytes or bigger buffer is expanded 1 or more times.</li>
050 * </ul>
051 * Please note the observation above is subject to change in a different
052 * environment.
053 * <p>
054 * {@link CachedBufferAllocator} uses {@link ThreadLocal} to store the cached
055 * buffer, allocates buffers whose capacity is power of 2 only and provides
056 * performance advantage if {@link IoBuffer#free()} is called properly.
057 *
058 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
059 */
060public class CachedBufferAllocator implements IoBufferAllocator {
061
062    private static final int DEFAULT_MAX_POOL_SIZE = 8;
063
064    private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18; // 256KB
065
066    private final int maxPoolSize;
067
068    private final int maxCachedBufferSize;
069
070    private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers;
071
072    private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers;
073
074    /**
075     * Creates a new instance with the default parameters
076     * ({@literal #DEFAULT_MAX_POOL_SIZE} and {@literal #DEFAULT_MAX_CACHED_BUFFER_SIZE}). 
077     */
078    public CachedBufferAllocator() {
079        this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE);
080    }
081
082    /**
083     * Creates a new instance.
084     * 
085     * @param maxPoolSize the maximum number of buffers with the same capacity per thread.
086     *                    <tt>0</tt> disables this limitation.
087     * @param maxCachedBufferSize the maximum capacity of a cached buffer.
088     *                            A buffer whose capacity is bigger than this value is
089     *                            not pooled. <tt>0</tt> disables this limitation.
090     */
091    public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) {
092        if (maxPoolSize < 0) {
093            throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize);
094        }
095
096        if (maxCachedBufferSize < 0) {
097            throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize);
098        }
099
100        this.maxPoolSize = maxPoolSize;
101        this.maxCachedBufferSize = maxCachedBufferSize;
102
103        this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
104            @Override
105            protected Map<Integer, Queue<CachedBuffer>> initialValue() {
106                return newPoolMap();
107            }
108        };
109
110        this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
111            @Override
112            protected Map<Integer, Queue<CachedBuffer>> initialValue() {
113                return newPoolMap();
114            }
115        };
116    }
117
118    /**
119     * @return the maximum number of buffers with the same capacity per thread.
120     * <tt>0</tt> means 'no limitation'.
121     */
122    public int getMaxPoolSize() {
123        return maxPoolSize;
124    }
125
126    /**
127     * @return the maximum capacity of a cached buffer.  A buffer whose
128     * capacity is bigger than this value is not pooled.  <tt>0</tt> means
129     * 'no limitation'.
130     */
131    public int getMaxCachedBufferSize() {
132        return maxCachedBufferSize;
133    }
134
135    Map<Integer, Queue<CachedBuffer>> newPoolMap() {
136        Map<Integer, Queue<CachedBuffer>> poolMap = new HashMap<Integer, Queue<CachedBuffer>>();
137
138        for (int i = 0; i < 31; i++) {
139            poolMap.put(1 << i, new ConcurrentLinkedQueue<CachedBuffer>());
140        }
141
142        poolMap.put(0, new ConcurrentLinkedQueue<CachedBuffer>());
143        poolMap.put(Integer.MAX_VALUE, new ConcurrentLinkedQueue<CachedBuffer>());
144
145        return poolMap;
146    }
147
148    public IoBuffer allocate(int requestedCapacity, boolean direct) {
149        int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity);
150        IoBuffer buf;
151
152        if ((maxCachedBufferSize != 0) && (actualCapacity > maxCachedBufferSize)) {
153            if (direct) {
154                buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
155            } else {
156                buf = wrap(ByteBuffer.allocate(actualCapacity));
157            }
158        } else {
159            Queue<CachedBuffer> pool;
160
161            if (direct) {
162                pool = directBuffers.get().get(actualCapacity);
163            } else {
164                pool = heapBuffers.get().get(actualCapacity);
165            }
166
167            // Recycle if possible.
168            buf = pool.poll();
169
170            if (buf != null) {
171                buf.clear();
172                buf.setAutoExpand(false);
173                buf.order(ByteOrder.BIG_ENDIAN);
174            } else {
175                if (direct) {
176                    buf = wrap(ByteBuffer.allocateDirect(actualCapacity));
177                } else {
178                    buf = wrap(ByteBuffer.allocate(actualCapacity));
179                }
180            }
181        }
182
183        buf.limit(requestedCapacity);
184        return buf;
185    }
186
187    public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
188        return allocate(capacity, direct).buf();
189    }
190
191    public IoBuffer wrap(ByteBuffer nioBuffer) {
192        return new CachedBuffer(nioBuffer);
193    }
194
195    public void dispose() {
196        // Do nothing
197    }
198
199    private class CachedBuffer extends AbstractIoBuffer {
200        private final Thread ownerThread;
201
202        private ByteBuffer buf;
203
204        protected CachedBuffer(ByteBuffer buf) {
205            super(CachedBufferAllocator.this, buf.capacity());
206            this.ownerThread = Thread.currentThread();
207            this.buf = buf;
208            buf.order(ByteOrder.BIG_ENDIAN);
209        }
210
211        protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) {
212            super(parent);
213            this.ownerThread = Thread.currentThread();
214            this.buf = buf;
215        }
216
217        @Override
218        public ByteBuffer buf() {
219            if (buf == null) {
220                throw new IllegalStateException("Buffer has been freed already.");
221            }
222            return buf;
223        }
224
225        @Override
226        protected void buf(ByteBuffer buf) {
227            ByteBuffer oldBuf = this.buf;
228            this.buf = buf;
229            free(oldBuf);
230        }
231
232        @Override
233        protected IoBuffer duplicate0() {
234            return new CachedBuffer(this, buf().duplicate());
235        }
236
237        @Override
238        protected IoBuffer slice0() {
239            return new CachedBuffer(this, buf().slice());
240        }
241
242        @Override
243        protected IoBuffer asReadOnlyBuffer0() {
244            return new CachedBuffer(this, buf().asReadOnlyBuffer());
245        }
246
247        @Override
248        public byte[] array() {
249            return buf().array();
250        }
251
252        @Override
253        public int arrayOffset() {
254            return buf().arrayOffset();
255        }
256
257        @Override
258        public boolean hasArray() {
259            return buf().hasArray();
260        }
261
262        @Override
263        public void free() {
264            free(buf);
265            buf = null;
266        }
267
268        private void free(ByteBuffer oldBuf) {
269            if ((oldBuf == null) || ((maxCachedBufferSize != 0) && (oldBuf.capacity() > maxCachedBufferSize))
270                    || oldBuf.isReadOnly() || isDerived() || (Thread.currentThread() != ownerThread)) {
271                return;
272            }
273
274            // Add to the cache.
275            Queue<CachedBuffer> pool;
276
277            if (oldBuf.isDirect()) {
278                pool = directBuffers.get().get(oldBuf.capacity());
279            } else {
280                pool = heapBuffers.get().get(oldBuf.capacity());
281            }
282
283            if (pool == null) {
284                return;
285            }
286
287            // Restrict the size of the pool to prevent OOM.
288            if ((maxPoolSize == 0) || (pool.size() < maxPoolSize)) {
289                pool.offer(new CachedBuffer(oldBuf));
290            }
291        }
292    }
293}