View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.io.hfile;
20  
21  import java.lang.ref.WeakReference;
22  import java.nio.ByteBuffer;
23  import java.util.EnumMap;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.PriorityQueue;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicLong;
35  import java.util.concurrent.locks.ReentrantLock;
36  
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
43  import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.HasThread;
47  import org.apache.hadoop.util.StringUtils;
48  import org.codehaus.jackson.annotate.JsonIgnoreProperties;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.base.Objects;
52  import com.google.common.util.concurrent.ThreadFactoryBuilder;
53  
54  /**
55   * A block cache implementation that is memory-aware using {@link HeapSize},
56   * memory-bound using an LRU eviction algorithm, and concurrent: backed by a
57   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving
58   * constant-time {@link #cacheBlock} and {@link #getBlock} operations.<p>
59   *
60   * Contains three levels of block priority to allow for scan-resistance and in-memory families 
61   * {@link org.apache.hadoop.hbase.HColumnDescriptor#setInMemory(boolean)} (An in-memory column 
62   * family is a column family that should be served from memory if possible):
63   * single-access, multiple-accesses, and in-memory priority.
64   * A block is added with an in-memory priority flag if
65   * {@link org.apache.hadoop.hbase.HColumnDescriptor#isInMemory()}, otherwise a block becomes a
66   *  single access priority the first time it is read into this block cache.  If a block is
67   *  accessed again while in cache, it is marked as a multiple access priority block.  This
68   *  delineation of blocks is used to prevent scans from thrashing the cache adding a 
69   *  least-frequently-used element to the eviction algorithm.<p>
70   *
71   * Each priority is given its own chunk of the total cache to ensure
72   * fairness during eviction.  Each priority will retain close to its maximum
73   * size, however, if any priority is not using its entire chunk the others
74   * are able to grow beyond their chunk size.<p>
75   *
76   * Instantiated at a minimum with the total size and average block size.
77   * All sizes are in bytes.  The block size is not especially important as this
78   * cache is fully dynamic in its sizing of blocks.  It is only used for
79   * pre-allocating data structures and in initial heap estimation of the map.<p>
80   *
81   * The detailed constructor defines the sizes for the three priorities (they
82   * should total to the <code>maximum size</code> defined).  It also sets the levels that
83   * trigger and control the eviction thread.<p>
84   *
85   * The <code>acceptable size</code> is the cache size level which triggers the eviction
86   * process to start.  It evicts enough blocks to get the size below the
87   * minimum size specified.<p>
88   *
89   * Eviction happens in a separate thread and involves a single full-scan
90   * of the map.  It determines how many bytes must be freed to reach the minimum
91   * size, and then while scanning determines the fewest least-recently-used
92   * blocks necessary from each of the three priorities (would be 3 times bytes
93   * to free).  It then uses the priority chunk sizes to evict fairly according
94   * to the relative sizes and usage.
95   */
96  @InterfaceAudience.Private
97  @JsonIgnoreProperties({"encodingCountsForTest"})
98  public class LruBlockCache implements ResizableBlockCache, HeapSize {
99  
100   private static final Log LOG = LogFactory.getLog(LruBlockCache.class);
101 
102   /**
103    * Percentage of total size that eviction will evict until; e.g. if set to .8, then we will keep
104    * evicting during an eviction run till the cache size is down to 80% of the total.
105    */
106   static final String LRU_MIN_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.min.factor";
107 
108   /**
109    * Acceptable size of cache (no evictions if size < acceptable)
110    */
111   static final String LRU_ACCEPTABLE_FACTOR_CONFIG_NAME = "hbase.lru.blockcache.acceptable.factor";
112 
113   static final String LRU_SINGLE_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.single.percentage";
114   static final String LRU_MULTI_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.multi.percentage";
115   static final String LRU_MEMORY_PERCENTAGE_CONFIG_NAME = "hbase.lru.blockcache.memory.percentage";
116 
117   /**
118    * Configuration key to force data-block always (except in-memory are too much)
119    * cached in memory for in-memory hfile, unlike inMemory, which is a column-family
120    * configuration, inMemoryForceMode is a cluster-wide configuration
121    */
122   static final String LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME = "hbase.lru.rs.inmemoryforcemode";
123 
124   /** Default Configuration Parameters*/
125 
126   /** Backing Concurrent Map Configuration */
127   static final float DEFAULT_LOAD_FACTOR = 0.75f;
128   static final int DEFAULT_CONCURRENCY_LEVEL = 16;
129 
130   /** Eviction thresholds */
131   static final float DEFAULT_MIN_FACTOR = 0.95f;
132   static final float DEFAULT_ACCEPTABLE_FACTOR = 0.99f;
133 
134   /** Priority buckets */
135   static final float DEFAULT_SINGLE_FACTOR = 0.25f;
136   static final float DEFAULT_MULTI_FACTOR = 0.50f;
137   static final float DEFAULT_MEMORY_FACTOR = 0.25f;
138 
139   static final boolean DEFAULT_IN_MEMORY_FORCE_MODE = false;
140 
141   /** Statistics thread */
142   static final int statThreadPeriod = 60 * 5;
143 
144   /** Concurrent map (the cache) */
145   private final Map<BlockCacheKey,LruCachedBlock> map;
146 
147   /** Eviction lock (locked when eviction in process) */
148   private final ReentrantLock evictionLock = new ReentrantLock(true);
149 
150   /** Volatile boolean to track if we are in an eviction process or not */
151   private volatile boolean evictionInProgress = false;
152 
153   /** Eviction thread */
154   private final EvictionThread evictionThread;
155 
156   /** Statistics thread schedule pool (for heavy debugging, could remove) */
157   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1,
158     new ThreadFactoryBuilder().setNameFormat("LruBlockCacheStatsExecutor").setDaemon(true).build());
159 
160   /** Current size of cache */
161   private final AtomicLong size;
162 
163   /** Current number of cached elements */
164   private final AtomicLong elements;
165 
166   /** Cache access count (sequential ID) */
167   private final AtomicLong count;
168 
169   /** Cache statistics */
170   private final CacheStats stats;
171 
172   /** Maximum allowable size of cache (block put if size > max, evict) */
173   private long maxSize;
174 
175   /** Approximate block size */
176   private long blockSize;
177 
178   /** Acceptable size of cache (no evictions if size < acceptable) */
179   private float acceptableFactor;
180 
181   /** Minimum threshold of cache (when evicting, evict until size < min) */
182   private float minFactor;
183 
184   /** Single access bucket size */
185   private float singleFactor;
186 
187   /** Multiple access bucket size */
188   private float multiFactor;
189 
190   /** In-memory bucket size */
191   private float memoryFactor;
192 
193   /** Overhead of the structure itself */
194   private long overhead;
195 
196   /** Whether in-memory hfile's data block has higher priority when evicting */
197   private boolean forceInMemory;
198 
199   /** Where to send victims (blocks evicted/missing from the cache) */
200   private BlockCache victimHandler = null;
201 
202   /**
203    * Default constructor.  Specify maximum size and expected average block
204    * size (approximation is fine).
205    *
206    * <p>All other factors will be calculated based on defaults specified in
207    * this class.
208    * @param maxSize maximum size of cache, in bytes
209    * @param blockSize approximate size of each block, in bytes
210    */
211   public LruBlockCache(long maxSize, long blockSize) {
212     this(maxSize, blockSize, true);
213   }
214 
215   /**
216    * Constructor used for testing.  Allows disabling of the eviction thread.
217    */
218   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
219     this(maxSize, blockSize, evictionThread,
220         (int)Math.ceil(1.2*maxSize/blockSize),
221         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
222         DEFAULT_MIN_FACTOR, DEFAULT_ACCEPTABLE_FACTOR,
223         DEFAULT_SINGLE_FACTOR,
224         DEFAULT_MULTI_FACTOR,
225         DEFAULT_MEMORY_FACTOR,
226         false
227         );
228   }
229 
230   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, Configuration conf) {
231     this(maxSize, blockSize, evictionThread,
232         (int)Math.ceil(1.2*maxSize/blockSize),
233         DEFAULT_LOAD_FACTOR,
234         DEFAULT_CONCURRENCY_LEVEL,
235         conf.getFloat(LRU_MIN_FACTOR_CONFIG_NAME, DEFAULT_MIN_FACTOR),
236         conf.getFloat(LRU_ACCEPTABLE_FACTOR_CONFIG_NAME, DEFAULT_ACCEPTABLE_FACTOR),
237         conf.getFloat(LRU_SINGLE_PERCENTAGE_CONFIG_NAME, DEFAULT_SINGLE_FACTOR),
238         conf.getFloat(LRU_MULTI_PERCENTAGE_CONFIG_NAME, DEFAULT_MULTI_FACTOR),
239         conf.getFloat(LRU_MEMORY_PERCENTAGE_CONFIG_NAME, DEFAULT_MEMORY_FACTOR),
240         conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME, DEFAULT_IN_MEMORY_FORCE_MODE)
241         );
242   }
243 
244   public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
245     this(maxSize, blockSize, true, conf);
246   }
247 
248   /**
249    * Configurable constructor.  Use this constructor if not using defaults.
250    * @param maxSize maximum size of this cache, in bytes
251    * @param blockSize expected average size of blocks, in bytes
252    * @param evictionThread whether to run evictions in a bg thread or not
253    * @param mapInitialSize initial size of backing ConcurrentHashMap
254    * @param mapLoadFactor initial load factor of backing ConcurrentHashMap
255    * @param mapConcurrencyLevel initial concurrency factor for backing CHM
256    * @param minFactor percentage of total size that eviction will evict until
257    * @param acceptableFactor percentage of total size that triggers eviction
258    * @param singleFactor percentage of total size for single-access blocks
259    * @param multiFactor percentage of total size for multiple-access blocks
260    * @param memoryFactor percentage of total size for in-memory blocks
261    */
262   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
263       int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel,
264       float minFactor, float acceptableFactor, float singleFactor,
265       float multiFactor, float memoryFactor, boolean forceInMemory) {
266     if(singleFactor + multiFactor + memoryFactor != 1 ||
267         singleFactor < 0 || multiFactor < 0 || memoryFactor < 0) {
268       throw new IllegalArgumentException("Single, multi, and memory factors " +
269           " should be non-negative and total 1.0");
270     }
271     if(minFactor >= acceptableFactor) {
272       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
273     }
274     if(minFactor >= 1.0f || acceptableFactor >= 1.0f) {
275       throw new IllegalArgumentException("all factors must be < 1");
276     }
277     this.maxSize = maxSize;
278     this.blockSize = blockSize;
279     this.forceInMemory = forceInMemory;
280     map = new ConcurrentHashMap<BlockCacheKey,LruCachedBlock>(mapInitialSize,
281         mapLoadFactor, mapConcurrencyLevel);
282     this.minFactor = minFactor;
283     this.acceptableFactor = acceptableFactor;
284     this.singleFactor = singleFactor;
285     this.multiFactor = multiFactor;
286     this.memoryFactor = memoryFactor;
287     this.stats = new CacheStats(this.getClass().getSimpleName());
288     this.count = new AtomicLong(0);
289     this.elements = new AtomicLong(0);
290     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
291     this.size = new AtomicLong(this.overhead);
292     if(evictionThread) {
293       this.evictionThread = new EvictionThread(this);
294       this.evictionThread.start(); // FindBugs SC_START_IN_CTOR
295     } else {
296       this.evictionThread = null;
297     }
298     // TODO: Add means of turning this off.  Bit obnoxious running thread just to make a log
299     // every five minutes.
300     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
301         statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
302   }
303 
304   @Override
305   public void setMaxSize(long maxSize) {
306     this.maxSize = maxSize;
307     if(this.size.get() > acceptableSize() && !evictionInProgress) {
308       runEviction();
309     }
310   }
311 
312   // BlockCache implementation
313 
314   /**
315    * Cache the block with the specified name and buffer.
316    * <p>
317    * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)
318    * this can happen, for which we compare the buffer contents.
319    * @param cacheKey block's cache key
320    * @param buf block buffer
321    * @param inMemory if block is in-memory
322    * @param cacheDataInL1
323    */
324   @Override
325   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
326       final boolean cacheDataInL1) {
327     LruCachedBlock cb = map.get(cacheKey);
328     if (cb != null) {
329       // compare the contents, if they are not equal, we are in big trouble
330       if (compare(buf, cb.getBuffer()) != 0) {
331         throw new RuntimeException("Cached block contents differ, which should not have happened."
332           + "cacheKey:" + cacheKey);
333       }
334       String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
335       msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
336       LOG.warn(msg);
337       return;
338     }
339     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
340     long newSize = updateSizeMetrics(cb, false);
341     map.put(cacheKey, cb);
342     long val = elements.incrementAndGet();
343     if (LOG.isTraceEnabled()) {
344       long size = map.size();
345       assertCounterSanity(size, val);
346     }
347     if (newSize > acceptableSize() && !evictionInProgress) {
348       runEviction();
349     }
350   }
351 
352   /**
353    * Sanity-checking for parity between actual block cache content and metrics.
354    * Intended only for use with TRACE level logging and -ea JVM.
355    */
356   private static void assertCounterSanity(long mapSize, long counterVal) {
357     if (counterVal < 0) {
358       LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal +
359         ", mapSize=" + mapSize);
360       return;
361     }
362     if (mapSize < Integer.MAX_VALUE) {
363       double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.);
364       if (pct_diff > 0.05) {
365         LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal +
366           ", mapSize=" + mapSize);
367       }
368     }
369   }
370 
371   private int compare(Cacheable left, Cacheable right) {
372     ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
373     left.serialize(l);
374     ByteBuffer r = ByteBuffer.allocate(right.getSerializedLength());
375     right.serialize(r);
376     return Bytes.compareTo(l.array(), l.arrayOffset(), l.limit(),
377       r.array(), r.arrayOffset(), r.limit());
378   }
379 
380   /**
381    * Cache the block with the specified name and buffer.
382    * <p>
383    * @param cacheKey block's cache key
384    * @param buf block buffer
385    */
386   public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
387     cacheBlock(cacheKey, buf, false, false);
388   }
389 
390   /**
391    * Helper function that updates the local size counter and also updates any
392    * per-cf or per-blocktype metrics it can discern from given
393    * {@link LruCachedBlock}
394    *
395    * @param cb
396    * @param evict
397    */
398   protected long updateSizeMetrics(LruCachedBlock cb, boolean evict) {
399     long heapsize = cb.heapSize();
400     if (evict) {
401       heapsize *= -1;
402     }
403     return size.addAndGet(heapsize);
404   }
405 
406   /**
407    * Get the buffer of the block with the specified name.
408    * @param cacheKey block's cache key
409    * @param caching true if the caller caches blocks on cache misses
410    * @param repeat Whether this is a repeat lookup for the same block
411    *        (used to avoid double counting cache misses when doing double-check locking)
412    * @param updateCacheMetrics Whether to update cache metrics or not
413    * @return buffer of specified cache key, or null if not in cache
414    */
415   @Override
416   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
417       boolean updateCacheMetrics) {
418     LruCachedBlock cb = map.get(cacheKey);
419     if (cb == null) {
420       if (!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());
421       // If there is another block cache then try and read there.
422       // However if this is a retry ( second time in double checked locking )
423       // And it's already a miss then the l2 will also be a miss.
424       if (victimHandler != null && !repeat) {
425         Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
426 
427         // Promote this to L1.
428         if (result != null && caching) {
429           cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = */ true);
430         }
431         return result;
432       }
433       return null;
434     }
435     if (updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());
436     cb.access(count.incrementAndGet());
437     return cb.getBuffer();
438   }
439 
440   /**
441    * Whether the cache contains block with specified cacheKey
442    * @param cacheKey
443    * @return true if contains the block
444    */
445   public boolean containsBlock(BlockCacheKey cacheKey) {
446     return map.containsKey(cacheKey);
447   }
448 
449   @Override
450   public boolean evictBlock(BlockCacheKey cacheKey) {
451     LruCachedBlock cb = map.get(cacheKey);
452     if (cb == null) return false;
453     evictBlock(cb, false);
454     return true;
455   }
456 
457   /**
458    * Evicts all blocks for a specific HFile. This is an
459    * expensive operation implemented as a linear-time search through all blocks
460    * in the cache. Ideally this should be a search in a log-access-time map.
461    *
462    * <p>
463    * This is used for evict-on-close to remove all blocks of a specific HFile.
464    *
465    * @return the number of blocks evicted
466    */
467   @Override
468   public int evictBlocksByHfileName(String hfileName) {
469     int numEvicted = 0;
470     for (BlockCacheKey key : map.keySet()) {
471       if (key.getHfileName().equals(hfileName)) {
472         if (evictBlock(key))
473           ++numEvicted;
474       }
475     }
476     if (victimHandler != null) {
477       numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
478     }
479     return numEvicted;
480   }
481 
482   /**
483    * Evict the block, and it will be cached by the victim handler if exists &amp;&amp;
484    * block may be read again later
485    * @param block
486    * @param evictedByEvictionProcess true if the given block is evicted by
487    *          EvictionThread
488    * @return the heap size of evicted block
489    */
490   protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) {
491     map.remove(block.getCacheKey());
492     updateSizeMetrics(block, true);
493     long val = elements.decrementAndGet();
494     if (LOG.isTraceEnabled()) {
495       long size = map.size();
496       assertCounterSanity(size, val);
497     }
498     stats.evicted(block.getCachedTime(), block.getCacheKey().isPrimary());
499     if (evictedByEvictionProcess && victimHandler != null) {
500       if (victimHandler instanceof BucketCache) {
501         boolean wait = getCurrentSize() < acceptableSize();
502         boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
503         ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
504             inMemory, wait);
505       } else {
506         victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
507       }
508     }
509     return block.heapSize();
510   }
511 
512   /**
513    * Multi-threaded call to run the eviction process.
514    */
515   private void runEviction() {
516     if(evictionThread == null) {
517       evict();
518     } else {
519       evictionThread.evict();
520     }
521   }
522 
523   /**
524    * Eviction method.
525    */
526   void evict() {
527 
528     // Ensure only one eviction at a time
529     if(!evictionLock.tryLock()) return;
530 
531     try {
532       evictionInProgress = true;
533       long currentSize = this.size.get();
534       long bytesToFree = currentSize - minSize();
535 
536       if (LOG.isTraceEnabled()) {
537         LOG.trace("Block cache LRU eviction started; Attempting to free " +
538           StringUtils.byteDesc(bytesToFree) + " of total=" +
539           StringUtils.byteDesc(currentSize));
540       }
541 
542       if(bytesToFree <= 0) return;
543 
544       // Instantiate priority buckets
545       BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize,
546           singleSize());
547       BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize,
548           multiSize());
549       BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize,
550           memorySize());
551 
552       // Scan entire map putting into appropriate buckets
553       for(LruCachedBlock cachedBlock : map.values()) {
554         switch(cachedBlock.getPriority()) {
555           case SINGLE: {
556             bucketSingle.add(cachedBlock);
557             break;
558           }
559           case MULTI: {
560             bucketMulti.add(cachedBlock);
561             break;
562           }
563           case MEMORY: {
564             bucketMemory.add(cachedBlock);
565             break;
566           }
567         }
568       }
569 
570       long bytesFreed = 0;
571       if (forceInMemory || memoryFactor > 0.999f) {
572         long s = bucketSingle.totalSize();
573         long m = bucketMulti.totalSize();
574         if (bytesToFree > (s + m)) {
575           // this means we need to evict blocks in memory bucket to make room,
576           // so the single and multi buckets will be emptied
577           bytesFreed = bucketSingle.free(s);
578           bytesFreed += bucketMulti.free(m);
579           if (LOG.isTraceEnabled()) {
580             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
581               " from single and multi buckets");
582           }
583           bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
584           if (LOG.isTraceEnabled()) {
585             LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
586               " total from all three buckets ");
587           }
588         } else {
589           // this means no need to evict block in memory bucket,
590           // and we try best to make the ratio between single-bucket and
591           // multi-bucket is 1:2
592           long bytesRemain = s + m - bytesToFree;
593           if (3 * s <= bytesRemain) {
594             // single-bucket is small enough that no eviction happens for it
595             // hence all eviction goes from multi-bucket
596             bytesFreed = bucketMulti.free(bytesToFree);
597           } else if (3 * m <= 2 * bytesRemain) {
598             // multi-bucket is small enough that no eviction happens for it
599             // hence all eviction goes from single-bucket
600             bytesFreed = bucketSingle.free(bytesToFree);
601           } else {
602             // both buckets need to evict some blocks
603             bytesFreed = bucketSingle.free(s - bytesRemain / 3);
604             if (bytesFreed < bytesToFree) {
605               bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);
606             }
607           }
608         }
609       } else {
610         PriorityQueue<BlockBucket> bucketQueue =
611           new PriorityQueue<BlockBucket>(3);
612 
613         bucketQueue.add(bucketSingle);
614         bucketQueue.add(bucketMulti);
615         bucketQueue.add(bucketMemory);
616 
617         int remainingBuckets = 3;
618 
619         BlockBucket bucket;
620         while((bucket = bucketQueue.poll()) != null) {
621           long overflow = bucket.overflow();
622           if(overflow > 0) {
623             long bucketBytesToFree = Math.min(overflow,
624                 (bytesToFree - bytesFreed) / remainingBuckets);
625             bytesFreed += bucket.free(bucketBytesToFree);
626           }
627           remainingBuckets--;
628         }
629       }
630 
631       if (LOG.isTraceEnabled()) {
632         long single = bucketSingle.totalSize();
633         long multi = bucketMulti.totalSize();
634         long memory = bucketMemory.totalSize();
635         LOG.trace("Block cache LRU eviction completed; " +
636           "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
637           "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
638           "single=" + StringUtils.byteDesc(single) + ", " +
639           "multi=" + StringUtils.byteDesc(multi) + ", " +
640           "memory=" + StringUtils.byteDesc(memory));
641       }
642     } finally {
643       stats.evict();
644       evictionInProgress = false;
645       evictionLock.unlock();
646     }
647   }
648 
649   @Override
650   public String toString() {
651     return Objects.toStringHelper(this)
652       .add("blockCount", getBlockCount())
653       .add("currentSize", getCurrentSize())
654       .add("freeSize", getFreeSize())
655       .add("maxSize", getMaxSize())
656       .add("heapSize", heapSize())
657       .add("minSize", minSize())
658       .add("minFactor", minFactor)
659       .add("multiSize", multiSize())
660       .add("multiFactor", multiFactor)
661       .add("singleSize", singleSize())
662       .add("singleFactor", singleFactor)
663       .toString();
664   }
665 
666   /**
667    * Used to group blocks into priority buckets.  There will be a BlockBucket
668    * for each priority (single, multi, memory).  Once bucketed, the eviction
669    * algorithm takes the appropriate number of elements out of each according
670    * to configuration parameters and their relatives sizes.
671    */
672   private class BlockBucket implements Comparable<BlockBucket> {
673 
674     private final String name;
675     private LruCachedBlockQueue queue;
676     private long totalSize = 0;
677     private long bucketSize;
678 
679     public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) {
680       this.name = name;
681       this.bucketSize = bucketSize;
682       queue = new LruCachedBlockQueue(bytesToFree, blockSize);
683       totalSize = 0;
684     }
685 
686     public void add(LruCachedBlock block) {
687       totalSize += block.heapSize();
688       queue.add(block);
689     }
690 
691     public long free(long toFree) {
692       if (LOG.isTraceEnabled()) {
693         LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
694       }
695       LruCachedBlock cb;
696       long freedBytes = 0;
697       while ((cb = queue.pollLast()) != null) {
698         freedBytes += evictBlock(cb, true);
699         if (freedBytes >= toFree) {
700           return freedBytes;
701         }
702       }
703       if (LOG.isTraceEnabled()) {
704         LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this);
705       }
706       return freedBytes;
707     }
708 
709     public long overflow() {
710       return totalSize - bucketSize;
711     }
712 
713     public long totalSize() {
714       return totalSize;
715     }
716 
717     public int compareTo(BlockBucket that) {
718       if(this.overflow() == that.overflow()) return 0;
719       return this.overflow() > that.overflow() ? 1 : -1;
720     }
721 
722     @Override
723     public boolean equals(Object that) {
724       if (that == null || !(that instanceof BlockBucket)){
725         return false;
726       }
727       return compareTo((BlockBucket)that) == 0;
728     }
729 
730     @Override
731     public int hashCode() {
732       return Objects.hashCode(name, bucketSize, queue, totalSize);
733     }
734 
735     @Override
736     public String toString() {
737       return Objects.toStringHelper(this)
738         .add("name", name)
739         .add("totalSize", StringUtils.byteDesc(totalSize))
740         .add("bucketSize", StringUtils.byteDesc(bucketSize))
741         .toString();
742     }
743   }
744 
745   /**
746    * Get the maximum size of this cache.
747    * @return max size in bytes
748    */
749   public long getMaxSize() {
750     return this.maxSize;
751   }
752 
753   @Override
754   public long getCurrentSize() {
755     return this.size.get();
756   }
757 
758   @Override
759   public long getFreeSize() {
760     return getMaxSize() - getCurrentSize();
761   }
762 
763   @Override
764   public long size() {
765     return getMaxSize();
766   }
767 
768   @Override
769   public long getBlockCount() {
770     return this.elements.get();
771   }
772 
773   EvictionThread getEvictionThread() {
774     return this.evictionThread;
775   }
776 
777   /*
778    * Eviction thread.  Sits in waiting state until an eviction is triggered
779    * when the cache size grows above the acceptable level.<p>
780    *
781    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
782    */
783   static class EvictionThread extends HasThread {
784     private WeakReference<LruBlockCache> cache;
785     private volatile boolean go = true;
786     // flag set after enter the run method, used for test
787     private boolean enteringRun = false;
788 
789     public EvictionThread(LruBlockCache cache) {
790       super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
791       setDaemon(true);
792       this.cache = new WeakReference<LruBlockCache>(cache);
793     }
794 
795     @Override
796     public void run() {
797       enteringRun = true;
798       while (this.go) {
799         synchronized(this) {
800           try {
801             this.wait(1000 * 10/*Don't wait for ever*/);
802           } catch(InterruptedException e) {
803             LOG.warn("Interrupted eviction thread ", e);
804             Thread.currentThread().interrupt();
805           }
806         }
807         LruBlockCache cache = this.cache.get();
808         if (cache == null) break;
809         cache.evict();
810       }
811     }
812 
813     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
814         justification="This is what we want")
815     public void evict() {
816       synchronized(this) {
817         this.notifyAll();
818       }
819     }
820 
821     synchronized void shutdown() {
822       this.go = false;
823       this.notifyAll();
824     }
825 
826     /**
827      * Used for the test.
828      */
829     boolean isEnteringRun() {
830       return this.enteringRun;
831     }
832   }
833 
834   /*
835    * Statistics thread.  Periodically prints the cache statistics to the log.
836    */
837   static class StatisticsThread extends Thread {
838     private final LruBlockCache lru;
839 
840     public StatisticsThread(LruBlockCache lru) {
841       super("LruBlockCacheStats");
842       setDaemon(true);
843       this.lru = lru;
844     }
845 
846     @Override
847     public void run() {
848       lru.logStats();
849     }
850   }
851 
852   public void logStats() {
853     // Log size
854     long totalSize = heapSize();
855     long freeSize = maxSize - totalSize;
856     LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " +
857         "freeSize=" + StringUtils.byteDesc(freeSize) + ", " +
858         "max=" + StringUtils.byteDesc(this.maxSize) + ", " +
859         "blockCount=" + getBlockCount() + ", " +
860         "accesses=" + stats.getRequestCount() + ", " +
861         "hits=" + stats.getHitCount() + ", " +
862         "hitRatio=" + (stats.getHitCount() == 0 ?
863           "0" : (StringUtils.formatPercent(stats.getHitRatio(), 2)+ ", ")) + ", " +
864         "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
865         "cachingHits=" + stats.getHitCachingCount() + ", " +
866         "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
867           "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ", ")) +
868         "evictions=" + stats.getEvictionCount() + ", " +
869         "evicted=" + stats.getEvictedCount() + ", " +
870         "evictedPerRun=" + stats.evictedPerEviction());
871   }
872 
873   /**
874    * Get counter statistics for this cache.
875    *
876    * <p>Includes: total accesses, hits, misses, evicted blocks, and runs
877    * of the eviction processes.
878    */
879   public CacheStats getStats() {
880     return this.stats;
881   }
882 
883   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
884       (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
885       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
886       + ClassSize.OBJECT);
887 
888   @Override
889   public long heapSize() {
890     return getCurrentSize();
891   }
892 
893   public static long calculateOverhead(long maxSize, long blockSize, int concurrency){
894     // FindBugs ICAST_INTEGER_MULTIPLY_CAST_TO_LONG
895     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP +
896         ((long)Math.ceil(maxSize*1.2/blockSize)
897             * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
898         ((long)concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
899   }
900 
901   @Override
902   public Iterator<CachedBlock> iterator() {
903     final Iterator<LruCachedBlock> iterator = map.values().iterator();
904 
905     return new Iterator<CachedBlock>() {
906       private final long now = System.nanoTime();
907 
908       @Override
909       public boolean hasNext() {
910         return iterator.hasNext();
911       }
912 
913       @Override
914       public CachedBlock next() {
915         final LruCachedBlock b = iterator.next();
916         return new CachedBlock() {
917           @Override
918           public String toString() {
919             return BlockCacheUtil.toString(this, now);
920           }
921 
922           @Override
923           public BlockPriority getBlockPriority() {
924             return b.getPriority();
925           }
926 
927           @Override
928           public BlockType getBlockType() {
929             return b.getBuffer().getBlockType();
930           }
931 
932           @Override
933           public long getOffset() {
934             return b.getCacheKey().getOffset();
935           }
936 
937           @Override
938           public long getSize() {
939             return b.getBuffer().heapSize();
940           }
941 
942           @Override
943           public long getCachedTime() {
944             return b.getCachedTime();
945           }
946 
947           @Override
948           public String getFilename() {
949             return b.getCacheKey().getHfileName();
950           }
951 
952           @Override
953           public int compareTo(CachedBlock other) {
954             int diff = this.getFilename().compareTo(other.getFilename());
955             if (diff != 0) return diff;
956             diff = (int)(this.getOffset() - other.getOffset());
957             if (diff != 0) return diff;
958             if (other.getCachedTime() < 0 || this.getCachedTime() < 0) {
959               throw new IllegalStateException("" + this.getCachedTime() + ", " +
960                 other.getCachedTime());
961             }
962             return (int)(other.getCachedTime() - this.getCachedTime());
963           }
964 
965           @Override
966           public int hashCode() {
967             return b.hashCode();
968           }
969 
970           @Override
971           public boolean equals(Object obj) {
972             if (obj instanceof CachedBlock) {
973               CachedBlock cb = (CachedBlock)obj;
974               return compareTo(cb) == 0;
975             } else {
976               return false;
977             }
978           }
979         };
980       }
981 
982       @Override
983       public void remove() {
984         throw new UnsupportedOperationException();
985       }
986     };
987   }
988 
989   // Simple calculators of sizes given factors and maxSize
990 
991   long acceptableSize() {
992     return (long)Math.floor(this.maxSize * this.acceptableFactor);
993   }
994   private long minSize() {
995     return (long)Math.floor(this.maxSize * this.minFactor);
996   }
997   private long singleSize() {
998     return (long)Math.floor(this.maxSize * this.singleFactor * this.minFactor);
999   }
1000   private long multiSize() {
1001     return (long)Math.floor(this.maxSize * this.multiFactor * this.minFactor);
1002   }
1003   private long memorySize() {
1004     return (long)Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
1005   }
1006 
1007   public void shutdown() {
1008     if (victimHandler != null)
1009       victimHandler.shutdown();
1010     this.scheduleThreadPool.shutdown();
1011     for (int i = 0; i < 10; i++) {
1012       if (!this.scheduleThreadPool.isShutdown()) {
1013         try {
1014           Thread.sleep(10);
1015         } catch (InterruptedException e) {
1016           LOG.warn("Interrupted while sleeping");
1017           Thread.currentThread().interrupt();
1018           break;
1019         }
1020       }
1021     }
1022 
1023     if (!this.scheduleThreadPool.isShutdown()) {
1024       List<Runnable> runnables = this.scheduleThreadPool.shutdownNow();
1025       LOG.debug("Still running " + runnables);
1026     }
1027     this.evictionThread.shutdown();
1028   }
1029 
1030   /** Clears the cache. Used in tests. */
1031   @VisibleForTesting
1032   public void clearCache() {
1033     this.map.clear();
1034     this.elements.set(0);
1035   }
1036 
1037   /**
1038    * Used in testing. May be very inefficient.
1039    * @return the set of cached file names
1040    */
1041   @VisibleForTesting
1042   SortedSet<String> getCachedFileNamesForTest() {
1043     SortedSet<String> fileNames = new TreeSet<String>();
1044     for (BlockCacheKey cacheKey : map.keySet()) {
1045       fileNames.add(cacheKey.getHfileName());
1046     }
1047     return fileNames;
1048   }
1049 
1050   @VisibleForTesting
1051   Map<BlockType, Integer> getBlockTypeCountsForTest() {
1052     Map<BlockType, Integer> counts =
1053         new EnumMap<BlockType, Integer>(BlockType.class);
1054     for (LruCachedBlock cb : map.values()) {
1055       BlockType blockType = ((Cacheable)cb.getBuffer()).getBlockType();
1056       Integer count = counts.get(blockType);
1057       counts.put(blockType, (count == null ? 0 : count) + 1);
1058     }
1059     return counts;
1060   }
1061 
1062   @VisibleForTesting
1063   public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
1064     Map<DataBlockEncoding, Integer> counts =
1065         new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
1066     for (LruCachedBlock block : map.values()) {
1067       DataBlockEncoding encoding =
1068               ((HFileBlock) block.getBuffer()).getDataBlockEncoding();
1069       Integer count = counts.get(encoding);
1070       counts.put(encoding, (count == null ? 0 : count) + 1);
1071     }
1072     return counts;
1073   }
1074 
1075   public void setVictimCache(BlockCache handler) {
1076     assert victimHandler == null;
1077     victimHandler = handler;
1078   }
1079 
1080   @VisibleForTesting
1081   Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
1082     return map;
1083   }
1084 
1085   BlockCache getVictimHandler() {
1086     return this.victimHandler;
1087   }
1088 
1089   @Override
1090   public BlockCache[] getBlockCaches() {
1091     return null;
1092   }
1093 
1094   @Override
1095   public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
1096     // There is no SHARED type here. Just return
1097   }
1098 }