View Javadoc

1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.accumulo.core.file.blockfile.cache;
21  
22  import java.lang.ref.WeakReference;
23  import java.util.PriorityQueue;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.ScheduledExecutorService;
27  import java.util.concurrent.TimeUnit;
28  import java.util.concurrent.atomic.AtomicLong;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import org.apache.accumulo.core.util.NamingThreadFactory;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  
35  /**
36   * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a
37   * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations.
38   * <p>
39   * 
40   * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a
41   * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the
42   * cache, adding a least-frequently-used element to the eviction algorithm.
43   * <p>
44   * 
45   * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if
46   * any priority is not using its entire chunk the others are able to grow beyond their chunk size.
47   * <p>
48   * 
49   * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is
50   * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map.
51   * <p>
52   * 
53   * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and
54   * control the eviction thread.
55   * <p>
56   * 
57   * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size
58   * specified.
59   * <p>
60   * 
61   * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and
62   * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then
63   * uses the priority chunk sizes to evict fairly according to the relative sizes and usage.
64   */
65  public class LruBlockCache implements BlockCache, HeapSize {
66    
67    static final Log LOG = LogFactory.getLog(LruBlockCache.class);
68    
69    /** Default Configuration Parameters */
70    
71    /** Backing Concurrent Map Configuration */
72    static final float DEFAULT_LOAD_FACTOR = 0.75f;
73    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
74    
75    /** Eviction thresholds */
76    static final float DEFAULT_MIN_FACTOR = 0.75f;
77    static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
78    
79    /** Priority buckets */
80    static final float DEFAULT_SINGLE_FACTOR = 0.25f;
81    static final float DEFAULT_MULTI_FACTOR = 0.50f;
82    static final float DEFAULT_MEMORY_FACTOR = 0.25f;
83    
84    /** Statistics thread */
85    static final int statThreadPeriod = 60;
86    
87    /** Concurrent map (the cache) */
88    private final ConcurrentHashMap<String,CachedBlock> map;
89    
90    /** Eviction lock (locked when eviction in process) */
91    private final ReentrantLock evictionLock = new ReentrantLock(true);
92    
93    /** Volatile boolean to track if we are in an eviction process or not */
94    private volatile boolean evictionInProgress = false;
95    
96    /** Eviction thread */
97    private final EvictionThread evictionThread;
98    
99    /** Statistics thread schedule pool (for heavy debugging, could remove) */
100   private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats"));
101   
102   /** Current size of cache */
103   private final AtomicLong size;
104   
105   /** Current number of cached elements */
106   private final AtomicLong elements;
107   
108   /** Cache access count (sequential ID) */
109   private final AtomicLong count;
110   
111   /** Cache statistics */
112   private final CacheStats stats;
113   
114   /** Maximum allowable size of cache (block put if size > max, evict) */
115   private long maxSize;
116   
117   /** Approximate block size */
118   private long blockSize;
119   
120   /** Acceptable size of cache (no evictions if size < acceptable) */
121   private float acceptableFactor;
122   
123   /** Minimum threshold of cache (when evicting, evict until size < min) */
124   private float minFactor;
125   
126   /** Single access bucket size */
127   private float singleFactor;
128   
129   /** Multiple access bucket size */
130   private float multiFactor;
131   
132   /** In-memory bucket size */
133   private float memoryFactor;
134   
135   /** Overhead of the structure itself */
136   private long overhead;
137   
138   /**
139    * Default constructor. Specify maximum size and expected average block size (approximation is fine).
140    * 
141    * <p>
142    * All other factors will be calculated based on defaults specified in this class.
143    * 
144    * @param maxSize
145    *          maximum size of cache, in bytes
146    * @param blockSize
147    *          approximate size of each block, in bytes
148    */
149   public LruBlockCache(long maxSize, long blockSize) {
150     this(maxSize, blockSize, true);
151   }
152   
153   /**
154    * Constructor used for testing. Allows disabling of the eviction thread.
155    */
156   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
157     this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR,
158         DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR);
159   }
160   
161   /**
162    * Configurable constructor. Use this constructor if not using defaults.
163    * 
164    * @param maxSize
165    *          maximum size of this cache, in bytes
166    * @param blockSize
167    *          expected average size of blocks, in bytes
168    * @param evictionThread
169    *          whether to run evictions in a bg thread or not
170    * @param mapInitialSize
171    *          initial size of backing ConcurrentHashMap
172    * @param mapLoadFactor
173    *          initial load factor of backing ConcurrentHashMap
174    * @param mapConcurrencyLevel
175    *          initial concurrency factor for backing CHM
176    * @param minFactor
177    *          percentage of total size that eviction will evict until
178    * @param acceptableFactor
179    *          percentage of total size that triggers eviction
180    * @param singleFactor
181    *          percentage of total size for single-access blocks
182    * @param multiFactor
183    *          percentage of total size for multiple-access blocks
184    * @param memoryFactor
185    *          percentage of total size for in-memory blocks
186    */
187   public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor,
188       float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) {
189     if (singleFactor + multiFactor + memoryFactor != 1) {
190       throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0");
191     }
192     if (minFactor >= acceptableFactor) {
193       throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
194     }
195     if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
196       throw new IllegalArgumentException("all factors must be < 1");
197     }
198     this.maxSize = maxSize;
199     this.blockSize = blockSize;
200     map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
201     this.minFactor = minFactor;
202     this.acceptableFactor = acceptableFactor;
203     this.singleFactor = singleFactor;
204     this.multiFactor = multiFactor;
205     this.memoryFactor = memoryFactor;
206     this.stats = new CacheStats();
207     this.count = new AtomicLong(0);
208     this.elements = new AtomicLong(0);
209     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
210     this.size = new AtomicLong(this.overhead);
211     
212     if (evictionThread) {
213       this.evictionThread = new EvictionThread(this);
214       this.evictionThread.start();
215       while (!this.evictionThread.running()) {
216         try {
217           Thread.sleep(10);
218         } catch (InterruptedException ex) {
219           throw new RuntimeException(ex);
220         }
221       }
222     } else {
223       this.evictionThread = null;
224     }
225     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
226   }
227   
228   public void setMaxSize(long maxSize) {
229     this.maxSize = maxSize;
230     if (this.size.get() > acceptableSize() && !evictionInProgress) {
231       runEviction();
232     }
233   }
234   
235   // BlockCache implementation
236   
237   /**
238    * Cache the block with the specified name and buffer.
239    * <p>
240    * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
241    * race condition and will update the buffer but not modify the size of the cache.
242    * 
243    * @param blockName
244    *          block name
245    * @param buf
246    *          block buffer
247    * @param inMemory
248    *          if block is in-memory
249    */
250   public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
251     CachedBlock cb = map.get(blockName);
252     if (cb != null) {
253       stats.duplicateReads();
254       cb.access(count.incrementAndGet());
255       
256     } else {
257       cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
258       long newSize = size.addAndGet(cb.heapSize());
259       map.put(blockName, cb);
260       elements.incrementAndGet();
261       if (newSize > acceptableSize() && !evictionInProgress) {
262         runEviction();
263       }
264     }
265     
266     return cb;
267   }
268   
269   /**
270    * Cache the block with the specified name and buffer.
271    * <p>
272    * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
273    * race condition and will update the buffer but not modify the size of the cache.
274    * 
275    * @param blockName
276    *          block name
277    * @param buf
278    *          block buffer
279    */
280   public CacheEntry cacheBlock(String blockName, byte buf[]) {
281     return cacheBlock(blockName, buf, false);
282   }
283   
284   /**
285    * Get the buffer of the block with the specified name.
286    * 
287    * @param blockName
288    *          block name
289    * @return buffer of specified block name, or null if not in cache
290    */
291   
292   public CachedBlock getBlock(String blockName) {
293     CachedBlock cb = map.get(blockName);
294     if (cb == null) {
295       stats.miss();
296       return null;
297     }
298     stats.hit();
299     cb.access(count.incrementAndGet());
300     return cb;
301   }
302   
303   protected long evictBlock(CachedBlock block) {
304     map.remove(block.getName());
305     size.addAndGet(-1 * block.heapSize());
306     elements.decrementAndGet();
307     stats.evicted();
308     return block.heapSize();
309   }
310   
311   /**
312    * Multi-threaded call to run the eviction process.
313    */
314   private void runEviction() {
315     if (evictionThread == null) {
316       evict();
317     } else {
318       evictionThread.evict();
319     }
320   }
321   
322   /**
323    * Eviction method.
324    */
325   void evict() {
326     
327     // Ensure only one eviction at a time
328     if (!evictionLock.tryLock())
329       return;
330     
331     try {
332       evictionInProgress = true;
333       
334       long bytesToFree = size.get() - minSize();
335       
336       LOG.debug("Block cache LRU eviction started.  Attempting to free " + bytesToFree + " bytes");
337       
338       if (bytesToFree <= 0)
339         return;
340       
341       // Instantiate priority buckets
342       BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize());
343       BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize());
344       BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize());
345       
346       // Scan entire map putting into appropriate buckets
347       for (CachedBlock cachedBlock : map.values()) {
348         switch (cachedBlock.getPriority()) {
349           case SINGLE: {
350             bucketSingle.add(cachedBlock);
351             break;
352           }
353           case MULTI: {
354             bucketMulti.add(cachedBlock);
355             break;
356           }
357           case MEMORY: {
358             bucketMemory.add(cachedBlock);
359             break;
360           }
361         }
362       }
363       
364       PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3);
365       
366       bucketQueue.add(bucketSingle);
367       bucketQueue.add(bucketMulti);
368       bucketQueue.add(bucketMemory);
369       
370       int remainingBuckets = 3;
371       long bytesFreed = 0;
372       
373       BlockBucket bucket;
374       while ((bucket = bucketQueue.poll()) != null) {
375         long overflow = bucket.overflow();
376         if (overflow > 0) {
377           long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets));
378           bytesFreed += bucket.free(bucketBytesToFree);
379         }
380         remainingBuckets--;
381       }
382       
383       float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024));
384       float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024));
385       float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024));
386       
387       LOG.debug("Block cache LRU eviction completed. " + "Freed " + bytesFreed + " bytes.  " + "Priority Sizes: " + "Single=" + singleMB + "MB ("
388           + bucketSingle.totalSize() + "), " + "Multi=" + multiMB + "MB (" + bucketMulti.totalSize() + ")," + "Memory=" + memoryMB + "MB ("
389           + bucketMemory.totalSize() + ")");
390       
391     } finally {
392       stats.evict();
393       evictionInProgress = false;
394       evictionLock.unlock();
395     }
396   }
397   
398   /**
399    * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm
400    * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes.
401    */
402   private class BlockBucket implements Comparable<BlockBucket> {
403     
404     private CachedBlockQueue queue;
405     private long totalSize = 0;
406     private long bucketSize;
407     
408     public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
409       this.bucketSize = bucketSize;
410       queue = new CachedBlockQueue(bytesToFree, blockSize);
411       totalSize = 0;
412     }
413     
414     public void add(CachedBlock block) {
415       totalSize += block.heapSize();
416       queue.add(block);
417     }
418     
419     public long free(long toFree) {
420       CachedBlock[] blocks = queue.get();
421       long freedBytes = 0;
422       for (int i = 0; i < blocks.length; i++) {
423         freedBytes += evictBlock(blocks[i]);
424         if (freedBytes >= toFree) {
425           return freedBytes;
426         }
427       }
428       return freedBytes;
429     }
430     
431     public long overflow() {
432       return totalSize - bucketSize;
433     }
434     
435     public long totalSize() {
436       return totalSize;
437     }
438     
439     public int compareTo(BlockBucket that) {
440       if (this.overflow() == that.overflow())
441         return 0;
442       return this.overflow() > that.overflow() ? 1 : -1;
443     }
444   }
445   
446   /**
447    * Get the maximum size of this cache.
448    * 
449    * @return max size in bytes
450    */
451   public long getMaxSize() {
452     return this.maxSize;
453   }
454   
455   /**
456    * Get the current size of this cache.
457    * 
458    * @return current size in bytes
459    */
460   public long getCurrentSize() {
461     return this.size.get();
462   }
463   
464   /**
465    * Get the current size of this cache.
466    * 
467    * @return current size in bytes
468    */
469   public long getFreeSize() {
470     return getMaxSize() - getCurrentSize();
471   }
472   
473   /**
474    * Get the size of this cache (number of cached blocks)
475    * 
476    * @return number of cached blocks
477    */
478   public long size() {
479     return this.elements.get();
480   }
481   
482   /**
483    * Get the number of eviction runs that have occurred
484    */
485   public long getEvictionCount() {
486     return this.stats.getEvictionCount();
487   }
488   
489   /**
490    * Get the number of blocks that have been evicted during the lifetime of this cache.
491    */
492   public long getEvictedCount() {
493     return this.stats.getEvictedCount();
494   }
495   
496   /*
497    * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level.<p>
498    * 
499    * Thread is triggered into action by {@link LruBlockCache#runEviction()}
500    */
501   private static class EvictionThread extends Thread {
502     private WeakReference<LruBlockCache> cache;
503     private boolean running = false;
504     
505     public EvictionThread(LruBlockCache cache) {
506       super("LruBlockCache.EvictionThread");
507       setDaemon(true);
508       this.cache = new WeakReference<LruBlockCache>(cache);
509     }
510     
511     public synchronized boolean running() {
512       return running;
513     }
514     
515     @Override
516     public void run() {
517       while (true) {
518         synchronized (this) {
519           running = true;
520           try {
521             this.wait();
522           } catch (InterruptedException e) {}
523         }
524         LruBlockCache cache = this.cache.get();
525         if (cache == null)
526           break;
527         cache.evict();
528       }
529     }
530     
531     public void evict() {
532       synchronized (this) {
533         this.notify();
534       }
535     }
536   }
537   
538   /*
539    * Statistics thread. Periodically prints the cache statistics to the log.
540    */
541   private static class StatisticsThread extends Thread {
542     LruBlockCache lru;
543     
544     public StatisticsThread(LruBlockCache lru) {
545       super("LruBlockCache.StatisticsThread");
546       setDaemon(true);
547       this.lru = lru;
548     }
549     
550     @Override
551     public void run() {
552       lru.logStats();
553     }
554   }
555   
556   public void logStats() {
557     // Log size
558     long totalSize = heapSize();
559     long freeSize = maxSize - totalSize;
560     float sizeMB = ((float) totalSize) / ((float) (1024 * 1024));
561     float freeMB = ((float) freeSize) / ((float) (1024 * 1024));
562     float maxMB = ((float) maxSize) / ((float) (1024 * 1024));
563     LruBlockCache.LOG.debug("Cache Stats: Sizes: " + "Total=" + sizeMB + "MB (" + totalSize + "), " + "Free=" + freeMB + "MB (" + freeSize + "), " + "Max="
564         + maxMB + "MB (" + maxSize + ")" + ", Counts: " + "Blocks=" + size() + ", " + "Access=" + stats.getRequestCount() + ", " + "Hit=" + stats.getHitCount()
565         + ", " + "Miss=" + stats.getMissCount() + ", " + "Evictions=" + stats.getEvictionCount() + ", " + "Evicted=" + stats.getEvictedCount() + ", Ratios: "
566         + "Hit Ratio=" + stats.getHitRatio() * 100 + "%, " + "Miss Ratio=" + stats.getMissRatio() * 100 + "%, " + "Evicted/Run=" + stats.evictedPerEviction()
567         + ", " + "Duplicate Reads=" + stats.getDuplicateReads());
568   }
569   
570   /**
571    * Get counter statistics for this cache.
572    * 
573    * <p>
574    * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
575    */
576   public CacheStats getStats() {
577     return this.stats;
578   }
579   
580   public static class CacheStats {
581     private final AtomicLong accessCount = new AtomicLong(0);
582     private final AtomicLong hitCount = new AtomicLong(0);
583     private final AtomicLong missCount = new AtomicLong(0);
584     private final AtomicLong evictionCount = new AtomicLong(0);
585     private final AtomicLong evictedCount = new AtomicLong(0);
586     private final AtomicLong duplicateReads = new AtomicLong(0);
587     
588     public void miss() {
589       missCount.incrementAndGet();
590       accessCount.incrementAndGet();
591     }
592     
593     public void hit() {
594       hitCount.incrementAndGet();
595       accessCount.incrementAndGet();
596     }
597     
598     public void evict() {
599       evictionCount.incrementAndGet();
600     }
601     
602     public void duplicateReads() {
603       duplicateReads.incrementAndGet();
604     }
605     
606     public void evicted() {
607       evictedCount.incrementAndGet();
608     }
609     
610     public long getRequestCount() {
611       return accessCount.get();
612     }
613     
614     public long getMissCount() {
615       return missCount.get();
616     }
617     
618     public long getHitCount() {
619       return hitCount.get();
620     }
621     
622     public long getEvictionCount() {
623       return evictionCount.get();
624     }
625     
626     public long getDuplicateReads() {
627       return duplicateReads.get();
628     }
629     
630     public long getEvictedCount() {
631       return evictedCount.get();
632     }
633     
634     public double getHitRatio() {
635       return ((float) getHitCount() / (float) getRequestCount());
636     }
637     
638     public double getMissRatio() {
639       return ((float) getMissCount() / (float) getRequestCount());
640     }
641     
642     public double evictedPerEviction() {
643       return (float) ((float) getEvictedCount() / (float) getEvictionCount());
644     }
645   }
646   
647   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE)
648       + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT);
649   
650   // HeapSize implementation
651   public long heapSize() {
652     return getCurrentSize();
653   }
654   
655   public static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
656     return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
657         + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
658   }
659   
660   // Simple calculators of sizes given factors and maxSize
661   
662   private long acceptableSize() {
663     return (long) Math.floor(this.maxSize * this.acceptableFactor);
664   }
665   
666   private long minSize() {
667     return (long) Math.floor(this.maxSize * this.minFactor);
668   }
669   
670   private long singleSize() {
671     return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor);
672   }
673   
674   private long multiSize() {
675     return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
676   }
677   
678   private long memorySize() {
679     return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
680   }
681   
682   public void shutdown() {
683     this.scheduleThreadPool.shutdown();
684   }
685 }