1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.accumulo.core.file.blockfile.cache;
21
22 import java.lang.ref.WeakReference;
23 import java.util.PriorityQueue;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicLong;
29 import java.util.concurrent.locks.ReentrantLock;
30
31 import org.apache.accumulo.core.util.NamingThreadFactory;
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35 /**
36 * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a
37 * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations.
38 * <p>
39 *
40 * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a
41 * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the
42 * cache, adding a least-frequently-used element to the eviction algorithm.
43 * <p>
44 *
45 * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if
46 * any priority is not using its entire chunk the others are able to grow beyond their chunk size.
47 * <p>
48 *
49 * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is
50 * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map.
51 * <p>
52 *
53 * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and
54 * control the eviction thread.
55 * <p>
56 *
57 * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size
58 * specified.
59 * <p>
60 *
61 * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and
62 * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then
63 * uses the priority chunk sizes to evict fairly according to the relative sizes and usage.
64 */
65 public class LruBlockCache implements BlockCache, HeapSize {
66
67 static final Log LOG = LogFactory.getLog(LruBlockCache.class);
68
69 /** Default Configuration Parameters */
70
71 /** Backing Concurrent Map Configuration */
72 static final float DEFAULT_LOAD_FACTOR = 0.75f;
73 static final int DEFAULT_CONCURRENCY_LEVEL = 16;
74
75 /** Eviction thresholds */
76 static final float DEFAULT_MIN_FACTOR = 0.75f;
77 static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
78
79 /** Priority buckets */
80 static final float DEFAULT_SINGLE_FACTOR = 0.25f;
81 static final float DEFAULT_MULTI_FACTOR = 0.50f;
82 static final float DEFAULT_MEMORY_FACTOR = 0.25f;
83
84 /** Statistics thread */
85 static final int statThreadPeriod = 60;
86
87 /** Concurrent map (the cache) */
88 private final ConcurrentHashMap<String,CachedBlock> map;
89
90 /** Eviction lock (locked when eviction in process) */
91 private final ReentrantLock evictionLock = new ReentrantLock(true);
92
93 /** Volatile boolean to track if we are in an eviction process or not */
94 private volatile boolean evictionInProgress = false;
95
96 /** Eviction thread */
97 private final EvictionThread evictionThread;
98
99 /** Statistics thread schedule pool (for heavy debugging, could remove) */
100 private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats"));
101
102 /** Current size of cache */
103 private final AtomicLong size;
104
105 /** Current number of cached elements */
106 private final AtomicLong elements;
107
108 /** Cache access count (sequential ID) */
109 private final AtomicLong count;
110
111 /** Cache statistics */
112 private final CacheStats stats;
113
114 /** Maximum allowable size of cache (block put if size > max, evict) */
115 private long maxSize;
116
117 /** Approximate block size */
118 private long blockSize;
119
120 /** Acceptable size of cache (no evictions if size < acceptable) */
121 private float acceptableFactor;
122
123 /** Minimum threshold of cache (when evicting, evict until size < min) */
124 private float minFactor;
125
126 /** Single access bucket size */
127 private float singleFactor;
128
129 /** Multiple access bucket size */
130 private float multiFactor;
131
132 /** In-memory bucket size */
133 private float memoryFactor;
134
135 /** Overhead of the structure itself */
136 private long overhead;
137
138 /**
139 * Default constructor. Specify maximum size and expected average block size (approximation is fine).
140 *
141 * <p>
142 * All other factors will be calculated based on defaults specified in this class.
143 *
144 * @param maxSize
145 * maximum size of cache, in bytes
146 * @param blockSize
147 * approximate size of each block, in bytes
148 */
149 public LruBlockCache(long maxSize, long blockSize) {
150 this(maxSize, blockSize, true);
151 }
152
153 /**
154 * Constructor used for testing. Allows disabling of the eviction thread.
155 */
156 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
157 this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR,
158 DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR);
159 }
160
161 /**
162 * Configurable constructor. Use this constructor if not using defaults.
163 *
164 * @param maxSize
165 * maximum size of this cache, in bytes
166 * @param blockSize
167 * expected average size of blocks, in bytes
168 * @param evictionThread
169 * whether to run evictions in a bg thread or not
170 * @param mapInitialSize
171 * initial size of backing ConcurrentHashMap
172 * @param mapLoadFactor
173 * initial load factor of backing ConcurrentHashMap
174 * @param mapConcurrencyLevel
175 * initial concurrency factor for backing CHM
176 * @param minFactor
177 * percentage of total size that eviction will evict until
178 * @param acceptableFactor
179 * percentage of total size that triggers eviction
180 * @param singleFactor
181 * percentage of total size for single-access blocks
182 * @param multiFactor
183 * percentage of total size for multiple-access blocks
184 * @param memoryFactor
185 * percentage of total size for in-memory blocks
186 */
187 public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor,
188 float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) {
189 if (singleFactor + multiFactor + memoryFactor != 1) {
190 throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0");
191 }
192 if (minFactor >= acceptableFactor) {
193 throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor");
194 }
195 if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
196 throw new IllegalArgumentException("all factors must be < 1");
197 }
198 this.maxSize = maxSize;
199 this.blockSize = blockSize;
200 map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel);
201 this.minFactor = minFactor;
202 this.acceptableFactor = acceptableFactor;
203 this.singleFactor = singleFactor;
204 this.multiFactor = multiFactor;
205 this.memoryFactor = memoryFactor;
206 this.stats = new CacheStats();
207 this.count = new AtomicLong(0);
208 this.elements = new AtomicLong(0);
209 this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
210 this.size = new AtomicLong(this.overhead);
211
212 if (evictionThread) {
213 this.evictionThread = new EvictionThread(this);
214 this.evictionThread.start();
215 while (!this.evictionThread.running()) {
216 try {
217 Thread.sleep(10);
218 } catch (InterruptedException ex) {
219 throw new RuntimeException(ex);
220 }
221 }
222 } else {
223 this.evictionThread = null;
224 }
225 this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
226 }
227
228 public void setMaxSize(long maxSize) {
229 this.maxSize = maxSize;
230 if (this.size.get() > acceptableSize() && !evictionInProgress) {
231 runEviction();
232 }
233 }
234
235
236
237 /**
238 * Cache the block with the specified name and buffer.
239 * <p>
240 * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
241 * race condition and will update the buffer but not modify the size of the cache.
242 *
243 * @param blockName
244 * block name
245 * @param buf
246 * block buffer
247 * @param inMemory
248 * if block is in-memory
249 */
250 public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
251 CachedBlock cb = map.get(blockName);
252 if (cb != null) {
253 stats.duplicateReads();
254 cb.access(count.incrementAndGet());
255
256 } else {
257 cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
258 long newSize = size.addAndGet(cb.heapSize());
259 map.put(blockName, cb);
260 elements.incrementAndGet();
261 if (newSize > acceptableSize() && !evictionInProgress) {
262 runEviction();
263 }
264 }
265
266 return cb;
267 }
268
269 /**
270 * Cache the block with the specified name and buffer.
271 * <p>
272 * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a
273 * race condition and will update the buffer but not modify the size of the cache.
274 *
275 * @param blockName
276 * block name
277 * @param buf
278 * block buffer
279 */
280 public CacheEntry cacheBlock(String blockName, byte buf[]) {
281 return cacheBlock(blockName, buf, false);
282 }
283
284 /**
285 * Get the buffer of the block with the specified name.
286 *
287 * @param blockName
288 * block name
289 * @return buffer of specified block name, or null if not in cache
290 */
291
292 public CachedBlock getBlock(String blockName) {
293 CachedBlock cb = map.get(blockName);
294 if (cb == null) {
295 stats.miss();
296 return null;
297 }
298 stats.hit();
299 cb.access(count.incrementAndGet());
300 return cb;
301 }
302
303 protected long evictBlock(CachedBlock block) {
304 map.remove(block.getName());
305 size.addAndGet(-1 * block.heapSize());
306 elements.decrementAndGet();
307 stats.evicted();
308 return block.heapSize();
309 }
310
311 /**
312 * Multi-threaded call to run the eviction process.
313 */
314 private void runEviction() {
315 if (evictionThread == null) {
316 evict();
317 } else {
318 evictionThread.evict();
319 }
320 }
321
322 /**
323 * Eviction method.
324 */
325 void evict() {
326
327
328 if (!evictionLock.tryLock())
329 return;
330
331 try {
332 evictionInProgress = true;
333
334 long bytesToFree = size.get() - minSize();
335
336 LOG.debug("Block cache LRU eviction started. Attempting to free " + bytesToFree + " bytes");
337
338 if (bytesToFree <= 0)
339 return;
340
341
342 BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize());
343 BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize());
344 BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize());
345
346
347 for (CachedBlock cachedBlock : map.values()) {
348 switch (cachedBlock.getPriority()) {
349 case SINGLE: {
350 bucketSingle.add(cachedBlock);
351 break;
352 }
353 case MULTI: {
354 bucketMulti.add(cachedBlock);
355 break;
356 }
357 case MEMORY: {
358 bucketMemory.add(cachedBlock);
359 break;
360 }
361 }
362 }
363
364 PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3);
365
366 bucketQueue.add(bucketSingle);
367 bucketQueue.add(bucketMulti);
368 bucketQueue.add(bucketMemory);
369
370 int remainingBuckets = 3;
371 long bytesFreed = 0;
372
373 BlockBucket bucket;
374 while ((bucket = bucketQueue.poll()) != null) {
375 long overflow = bucket.overflow();
376 if (overflow > 0) {
377 long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets));
378 bytesFreed += bucket.free(bucketBytesToFree);
379 }
380 remainingBuckets--;
381 }
382
383 float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024));
384 float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024));
385 float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024));
386
387 LOG.debug("Block cache LRU eviction completed. " + "Freed " + bytesFreed + " bytes. " + "Priority Sizes: " + "Single=" + singleMB + "MB ("
388 + bucketSingle.totalSize() + "), " + "Multi=" + multiMB + "MB (" + bucketMulti.totalSize() + ")," + "Memory=" + memoryMB + "MB ("
389 + bucketMemory.totalSize() + ")");
390
391 } finally {
392 stats.evict();
393 evictionInProgress = false;
394 evictionLock.unlock();
395 }
396 }
397
398 /**
399 * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm
400 * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes.
401 */
402 private class BlockBucket implements Comparable<BlockBucket> {
403
404 private CachedBlockQueue queue;
405 private long totalSize = 0;
406 private long bucketSize;
407
408 public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
409 this.bucketSize = bucketSize;
410 queue = new CachedBlockQueue(bytesToFree, blockSize);
411 totalSize = 0;
412 }
413
414 public void add(CachedBlock block) {
415 totalSize += block.heapSize();
416 queue.add(block);
417 }
418
419 public long free(long toFree) {
420 CachedBlock[] blocks = queue.get();
421 long freedBytes = 0;
422 for (int i = 0; i < blocks.length; i++) {
423 freedBytes += evictBlock(blocks[i]);
424 if (freedBytes >= toFree) {
425 return freedBytes;
426 }
427 }
428 return freedBytes;
429 }
430
431 public long overflow() {
432 return totalSize - bucketSize;
433 }
434
435 public long totalSize() {
436 return totalSize;
437 }
438
439 public int compareTo(BlockBucket that) {
440 if (this.overflow() == that.overflow())
441 return 0;
442 return this.overflow() > that.overflow() ? 1 : -1;
443 }
444 }
445
446 /**
447 * Get the maximum size of this cache.
448 *
449 * @return max size in bytes
450 */
451 public long getMaxSize() {
452 return this.maxSize;
453 }
454
455 /**
456 * Get the current size of this cache.
457 *
458 * @return current size in bytes
459 */
460 public long getCurrentSize() {
461 return this.size.get();
462 }
463
464 /**
465 * Get the current size of this cache.
466 *
467 * @return current size in bytes
468 */
469 public long getFreeSize() {
470 return getMaxSize() - getCurrentSize();
471 }
472
473 /**
474 * Get the size of this cache (number of cached blocks)
475 *
476 * @return number of cached blocks
477 */
478 public long size() {
479 return this.elements.get();
480 }
481
482 /**
483 * Get the number of eviction runs that have occurred
484 */
485 public long getEvictionCount() {
486 return this.stats.getEvictionCount();
487 }
488
489 /**
490 * Get the number of blocks that have been evicted during the lifetime of this cache.
491 */
492 public long getEvictedCount() {
493 return this.stats.getEvictedCount();
494 }
495
496
497
498
499
500
501 private static class EvictionThread extends Thread {
502 private WeakReference<LruBlockCache> cache;
503 private boolean running = false;
504
505 public EvictionThread(LruBlockCache cache) {
506 super("LruBlockCache.EvictionThread");
507 setDaemon(true);
508 this.cache = new WeakReference<LruBlockCache>(cache);
509 }
510
511 public synchronized boolean running() {
512 return running;
513 }
514
515 @Override
516 public void run() {
517 while (true) {
518 synchronized (this) {
519 running = true;
520 try {
521 this.wait();
522 } catch (InterruptedException e) {}
523 }
524 LruBlockCache cache = this.cache.get();
525 if (cache == null)
526 break;
527 cache.evict();
528 }
529 }
530
531 public void evict() {
532 synchronized (this) {
533 this.notify();
534 }
535 }
536 }
537
538
539
540
541 private static class StatisticsThread extends Thread {
542 LruBlockCache lru;
543
544 public StatisticsThread(LruBlockCache lru) {
545 super("LruBlockCache.StatisticsThread");
546 setDaemon(true);
547 this.lru = lru;
548 }
549
550 @Override
551 public void run() {
552 lru.logStats();
553 }
554 }
555
556 public void logStats() {
557
558 long totalSize = heapSize();
559 long freeSize = maxSize - totalSize;
560 float sizeMB = ((float) totalSize) / ((float) (1024 * 1024));
561 float freeMB = ((float) freeSize) / ((float) (1024 * 1024));
562 float maxMB = ((float) maxSize) / ((float) (1024 * 1024));
563 LruBlockCache.LOG.debug("Cache Stats: Sizes: " + "Total=" + sizeMB + "MB (" + totalSize + "), " + "Free=" + freeMB + "MB (" + freeSize + "), " + "Max="
564 + maxMB + "MB (" + maxSize + ")" + ", Counts: " + "Blocks=" + size() + ", " + "Access=" + stats.getRequestCount() + ", " + "Hit=" + stats.getHitCount()
565 + ", " + "Miss=" + stats.getMissCount() + ", " + "Evictions=" + stats.getEvictionCount() + ", " + "Evicted=" + stats.getEvictedCount() + ", Ratios: "
566 + "Hit Ratio=" + stats.getHitRatio() * 100 + "%, " + "Miss Ratio=" + stats.getMissRatio() * 100 + "%, " + "Evicted/Run=" + stats.evictedPerEviction()
567 + ", " + "Duplicate Reads=" + stats.getDuplicateReads());
568 }
569
570 /**
571 * Get counter statistics for this cache.
572 *
573 * <p>
574 * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
575 */
576 public CacheStats getStats() {
577 return this.stats;
578 }
579
580 public static class CacheStats {
581 private final AtomicLong accessCount = new AtomicLong(0);
582 private final AtomicLong hitCount = new AtomicLong(0);
583 private final AtomicLong missCount = new AtomicLong(0);
584 private final AtomicLong evictionCount = new AtomicLong(0);
585 private final AtomicLong evictedCount = new AtomicLong(0);
586 private final AtomicLong duplicateReads = new AtomicLong(0);
587
588 public void miss() {
589 missCount.incrementAndGet();
590 accessCount.incrementAndGet();
591 }
592
593 public void hit() {
594 hitCount.incrementAndGet();
595 accessCount.incrementAndGet();
596 }
597
598 public void evict() {
599 evictionCount.incrementAndGet();
600 }
601
602 public void duplicateReads() {
603 duplicateReads.incrementAndGet();
604 }
605
606 public void evicted() {
607 evictedCount.incrementAndGet();
608 }
609
610 public long getRequestCount() {
611 return accessCount.get();
612 }
613
614 public long getMissCount() {
615 return missCount.get();
616 }
617
618 public long getHitCount() {
619 return hitCount.get();
620 }
621
622 public long getEvictionCount() {
623 return evictionCount.get();
624 }
625
626 public long getDuplicateReads() {
627 return duplicateReads.get();
628 }
629
630 public long getEvictedCount() {
631 return evictedCount.get();
632 }
633
634 public double getHitRatio() {
635 return ((float) getHitCount() / (float) getRequestCount());
636 }
637
638 public double getMissRatio() {
639 return ((float) getMissCount() / (float) getRequestCount());
640 }
641
642 public double evictedPerEviction() {
643 return (float) ((float) getEvictedCount() / (float) getEvictionCount());
644 }
645 }
646
647 public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE)
648 + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT);
649
650
651 public long heapSize() {
652 return getCurrentSize();
653 }
654
655 public static long calculateOverhead(long maxSize, long blockSize, int concurrency) {
656 return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
657 + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
658 }
659
660
661
662 private long acceptableSize() {
663 return (long) Math.floor(this.maxSize * this.acceptableFactor);
664 }
665
666 private long minSize() {
667 return (long) Math.floor(this.maxSize * this.minFactor);
668 }
669
670 private long singleSize() {
671 return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor);
672 }
673
674 private long multiSize() {
675 return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
676 }
677
678 private long memorySize() {
679 return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
680 }
681
682 public void shutdown() {
683 this.scheduleThreadPool.shutdown();
684 }
685 }