001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.mina.core.buffer; 021 022import java.nio.ByteBuffer; 023import java.nio.ByteOrder; 024import java.util.HashMap; 025import java.util.Map; 026import java.util.Queue; 027import java.util.concurrent.ConcurrentLinkedQueue; 028 029/** 030 * An {@link IoBufferAllocator} that caches the buffers which are likely to 031 * be reused during auto-expansion of the buffers. 032 * <p> 033 * In {@link SimpleBufferAllocator}, the underlying {@link ByteBuffer} of 034 * the {@link IoBuffer} is reallocated on its capacity change, which means 035 * the newly allocated bigger {@link ByteBuffer} replaces the old small 036 * {@link ByteBuffer}. Consequently, the old {@link ByteBuffer} is marked 037 * for garbage collection. 038 * <p> 039 * It's not a problem in most cases as long as the capacity change doesn't 040 * happen frequently. However, once it happens too often, it burdens the 041 * VM and the cost of filling the newly allocated {@link ByteBuffer} with 042 * {@code NUL} surpass the cost of accessing the cache. In 2 dual-core 043 * Opteron Italy 270 processors, {@link CachedBufferAllocator} outperformed 044 * {@link SimpleBufferAllocator} in the following situation: 045 * <ul> 046 * <li>when a 32 bytes buffer is expanded 4 or more times,</li> 047 * <li>when a 64 bytes buffer is expanded 4 or more times,</li> 048 * <li>when a 128 bytes buffer is expanded 2 or more times,</li> 049 * <li>and when a 256 bytes or bigger buffer is expanded 1 or more times.</li> 050 * </ul> 051 * Please note the observation above is subject to change in a different 052 * environment. 053 * <p> 054 * {@link CachedBufferAllocator} uses {@link ThreadLocal} to store the cached 055 * buffer, allocates buffers whose capacity is power of 2 only and provides 056 * performance advantage if {@link IoBuffer#free()} is called properly. 057 * 058 * @author <a href="http://mina.apache.org">Apache MINA Project</a> 059 */ 060public class CachedBufferAllocator implements IoBufferAllocator { 061 062 private static final int DEFAULT_MAX_POOL_SIZE = 8; 063 064 private static final int DEFAULT_MAX_CACHED_BUFFER_SIZE = 1 << 18; // 256KB 065 066 private final int maxPoolSize; 067 068 private final int maxCachedBufferSize; 069 070 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers; 071 072 private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers; 073 074 /** 075 * Creates a new instance with the default parameters 076 * ({@literal #DEFAULT_MAX_POOL_SIZE} and {@literal #DEFAULT_MAX_CACHED_BUFFER_SIZE}). 077 */ 078 public CachedBufferAllocator() { 079 this(DEFAULT_MAX_POOL_SIZE, DEFAULT_MAX_CACHED_BUFFER_SIZE); 080 } 081 082 /** 083 * Creates a new instance. 084 * 085 * @param maxPoolSize the maximum number of buffers with the same capacity per thread. 086 * <tt>0</tt> disables this limitation. 087 * @param maxCachedBufferSize the maximum capacity of a cached buffer. 088 * A buffer whose capacity is bigger than this value is 089 * not pooled. <tt>0</tt> disables this limitation. 090 */ 091 public CachedBufferAllocator(int maxPoolSize, int maxCachedBufferSize) { 092 if (maxPoolSize < 0) { 093 throw new IllegalArgumentException("maxPoolSize: " + maxPoolSize); 094 } 095 096 if (maxCachedBufferSize < 0) { 097 throw new IllegalArgumentException("maxCachedBufferSize: " + maxCachedBufferSize); 098 } 099 100 this.maxPoolSize = maxPoolSize; 101 this.maxCachedBufferSize = maxCachedBufferSize; 102 103 this.heapBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() { 104 @Override 105 protected Map<Integer, Queue<CachedBuffer>> initialValue() { 106 return newPoolMap(); 107 } 108 }; 109 110 this.directBuffers = new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() { 111 @Override 112 protected Map<Integer, Queue<CachedBuffer>> initialValue() { 113 return newPoolMap(); 114 } 115 }; 116 } 117 118 /** 119 * Returns the maximum number of buffers with the same capacity per thread. 120 * <tt>0</tt> means 'no limitation'. 121 */ 122 public int getMaxPoolSize() { 123 return maxPoolSize; 124 } 125 126 /** 127 * Returns the maximum capacity of a cached buffer. A buffer whose 128 * capacity is bigger than this value is not pooled. <tt>0</tt> means 129 * 'no limitation'. 130 */ 131 public int getMaxCachedBufferSize() { 132 return maxCachedBufferSize; 133 } 134 135 Map<Integer, Queue<CachedBuffer>> newPoolMap() { 136 Map<Integer, Queue<CachedBuffer>> poolMap = new HashMap<Integer, Queue<CachedBuffer>>(); 137 138 for (int i = 0; i < 31; i++) { 139 poolMap.put(1 << i, new ConcurrentLinkedQueue<CachedBuffer>()); 140 } 141 142 poolMap.put(0, new ConcurrentLinkedQueue<CachedBuffer>()); 143 poolMap.put(Integer.MAX_VALUE, new ConcurrentLinkedQueue<CachedBuffer>()); 144 145 return poolMap; 146 } 147 148 public IoBuffer allocate(int requestedCapacity, boolean direct) { 149 int actualCapacity = IoBuffer.normalizeCapacity(requestedCapacity); 150 IoBuffer buf; 151 152 if ((maxCachedBufferSize != 0) && (actualCapacity > maxCachedBufferSize)) { 153 if (direct) { 154 buf = wrap(ByteBuffer.allocateDirect(actualCapacity)); 155 } else { 156 buf = wrap(ByteBuffer.allocate(actualCapacity)); 157 } 158 } else { 159 Queue<CachedBuffer> pool; 160 161 if (direct) { 162 pool = directBuffers.get().get(actualCapacity); 163 } else { 164 pool = heapBuffers.get().get(actualCapacity); 165 } 166 167 // Recycle if possible. 168 buf = pool.poll(); 169 170 if (buf != null) { 171 buf.clear(); 172 buf.setAutoExpand(false); 173 buf.order(ByteOrder.BIG_ENDIAN); 174 } else { 175 if (direct) { 176 buf = wrap(ByteBuffer.allocateDirect(actualCapacity)); 177 } else { 178 buf = wrap(ByteBuffer.allocate(actualCapacity)); 179 } 180 } 181 } 182 183 buf.limit(requestedCapacity); 184 return buf; 185 } 186 187 public ByteBuffer allocateNioBuffer(int capacity, boolean direct) { 188 return allocate(capacity, direct).buf(); 189 } 190 191 public IoBuffer wrap(ByteBuffer nioBuffer) { 192 return new CachedBuffer(nioBuffer); 193 } 194 195 public void dispose() { 196 // Do nothing 197 } 198 199 private class CachedBuffer extends AbstractIoBuffer { 200 private final Thread ownerThread; 201 202 private ByteBuffer buf; 203 204 protected CachedBuffer(ByteBuffer buf) { 205 super(CachedBufferAllocator.this, buf.capacity()); 206 this.ownerThread = Thread.currentThread(); 207 this.buf = buf; 208 buf.order(ByteOrder.BIG_ENDIAN); 209 } 210 211 protected CachedBuffer(CachedBuffer parent, ByteBuffer buf) { 212 super(parent); 213 this.ownerThread = Thread.currentThread(); 214 this.buf = buf; 215 } 216 217 @Override 218 public ByteBuffer buf() { 219 if (buf == null) { 220 throw new IllegalStateException("Buffer has been freed already."); 221 } 222 return buf; 223 } 224 225 @Override 226 protected void buf(ByteBuffer buf) { 227 ByteBuffer oldBuf = this.buf; 228 this.buf = buf; 229 free(oldBuf); 230 } 231 232 @Override 233 protected IoBuffer duplicate0() { 234 return new CachedBuffer(this, buf().duplicate()); 235 } 236 237 @Override 238 protected IoBuffer slice0() { 239 return new CachedBuffer(this, buf().slice()); 240 } 241 242 @Override 243 protected IoBuffer asReadOnlyBuffer0() { 244 return new CachedBuffer(this, buf().asReadOnlyBuffer()); 245 } 246 247 @Override 248 public byte[] array() { 249 return buf().array(); 250 } 251 252 @Override 253 public int arrayOffset() { 254 return buf().arrayOffset(); 255 } 256 257 @Override 258 public boolean hasArray() { 259 return buf().hasArray(); 260 } 261 262 @Override 263 public void free() { 264 free(buf); 265 buf = null; 266 } 267 268 private void free(ByteBuffer oldBuf) { 269 if ((oldBuf == null) || ((maxCachedBufferSize != 0) && (oldBuf.capacity() > maxCachedBufferSize)) 270 || oldBuf.isReadOnly() || isDerived() || (Thread.currentThread() != ownerThread)) { 271 return; 272 } 273 274 // Add to the cache. 275 Queue<CachedBuffer> pool; 276 277 if (oldBuf.isDirect()) { 278 pool = directBuffers.get().get(oldBuf.capacity()); 279 } else { 280 pool = heapBuffers.get().get(oldBuf.capacity()); 281 } 282 283 if (pool == null) { 284 return; 285 } 286 287 // Restrict the size of the pool to prevent OOM. 288 if ((maxPoolSize == 0) || (pool.size() < maxPoolSize)) { 289 pool.offer(new CachedBuffer(oldBuf)); 290 } 291 } 292 } 293}