View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.lang.management.ManagementFactory;
23  import java.lang.management.RuntimeMXBean;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.NavigableSet;
29  import java.util.SortedSet;
30  import java.util.concurrent.atomic.AtomicLong;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellComparator;
37  import org.apache.hadoop.hbase.CellUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.KeyValueUtil;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.util.ByteRange;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  import org.apache.hadoop.hbase.util.CollectionBackedScanner;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.util.ReflectionUtils;
49  import org.apache.htrace.Trace;
50  
51  /**
52   * The MemStore holds in-memory modifications to the Store.  Modifications
53   * are {@link Cell}s.  When asked to flush, current memstore is moved
54   * to snapshot and is cleared.  We continue to serve edits out of new memstore
55   * and backing snapshot until flusher reports in that the flush succeeded. At
56   * this point we let the snapshot go.
57   *  <p>
58   * The MemStore functions should not be called in parallel. Callers should hold
59   *  write and read locks. This is done in {@link HStore}.
60   *  </p>
61   *
62   * TODO: Adjust size of the memstore when we remove items because they have
63   * been deleted.
64   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
65   * in KV size.
66   */
67  @InterfaceAudience.Private
68  public class DefaultMemStore implements MemStore {
69    private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
70    static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
71    private static final boolean USEMSLAB_DEFAULT = true;
72    static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
73  
74    private Configuration conf;
75  
76    // MemStore.  Use a CellSkipListSet rather than SkipListSet because of the
77    // better semantics.  The Map will overwrite if passed a key it already had
78    // whereas the Set will not add new Cell if key is same though value might be
79    // different.  Value is not important -- just make sure always same
80    // reference passed.
81    volatile CellSkipListSet cellSet;
82  
83    // Snapshot of memstore.  Made for flusher.
84    volatile CellSkipListSet snapshot;
85  
86    final CellComparator comparator;
87  
88    // Used to track own heapSize
89    final AtomicLong size;
90    private volatile long snapshotSize;
91  
92    // Used to track when to flush
93    volatile long timeOfOldestEdit = Long.MAX_VALUE;
94  
95    TimeRangeTracker timeRangeTracker;
96    TimeRangeTracker snapshotTimeRangeTracker;
97  
98    volatile MemStoreLAB allocator;
99    volatile MemStoreLAB snapshotAllocator;
100   volatile long snapshotId;
101   volatile boolean tagsPresent;
102 
103   /**
104    * Default constructor. Used for tests.
105    */
106   public DefaultMemStore() {
107     this(HBaseConfiguration.create(), CellComparator.COMPARATOR);
108   }
109 
110   /**
111    * Constructor.
112    * @param c Comparator
113    */
114   public DefaultMemStore(final Configuration conf,
115                   final CellComparator c) {
116     this.conf = conf;
117     this.comparator = c;
118     this.cellSet = new CellSkipListSet(c);
119     this.snapshot = new CellSkipListSet(c);
120     timeRangeTracker = new TimeRangeTracker();
121     snapshotTimeRangeTracker = new TimeRangeTracker();
122     this.size = new AtomicLong(DEEP_OVERHEAD);
123     this.snapshotSize = 0;
124     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
125       String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
126       this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
127           new Class[] { Configuration.class }, new Object[] { conf });
128     } else {
129       this.allocator = null;
130     }
131   }
132 
133   void dump() {
134     for (Cell cell: this.cellSet) {
135       LOG.info(cell);
136     }
137     for (Cell cell: this.snapshot) {
138       LOG.info(cell);
139     }
140   }
141 
142   /**
143    * Creates a snapshot of the current memstore.
144    * Snapshot must be cleared by call to {@link #clearSnapshot(long)}
145    */
146   @Override
147   public MemStoreSnapshot snapshot() {
148     // If snapshot currently has entries, then flusher failed or didn't call
149     // cleanup.  Log a warning.
150     if (!this.snapshot.isEmpty()) {
151       LOG.warn("Snapshot called again without clearing previous. " +
152           "Doing nothing. Another ongoing flush or did we fail last attempt?");
153     } else {
154       this.snapshotId = EnvironmentEdgeManager.currentTime();
155       this.snapshotSize = keySize();
156       if (!this.cellSet.isEmpty()) {
157         this.snapshot = this.cellSet;
158         this.cellSet = new CellSkipListSet(this.comparator);
159         this.snapshotTimeRangeTracker = this.timeRangeTracker;
160         this.timeRangeTracker = new TimeRangeTracker();
161         // Reset heap to not include any keys
162         this.size.set(DEEP_OVERHEAD);
163         this.snapshotAllocator = this.allocator;
164         // Reset allocator so we get a fresh buffer for the new memstore
165         if (allocator != null) {
166           String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
167           this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
168               new Class[] { Configuration.class }, new Object[] { conf });
169         }
170         timeOfOldestEdit = Long.MAX_VALUE;
171       }
172     }
173     MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
174         this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
175         this.tagsPresent);
176     this.tagsPresent = false;
177     return memStoreSnapshot;
178   }
179 
180   /**
181    * The passed snapshot was successfully persisted; it can be let go.
182    * @param id Id of the snapshot to clean out.
183    * @throws UnexpectedStateException
184    * @see #snapshot()
185    */
186   @Override
187   public void clearSnapshot(long id) throws UnexpectedStateException {
188     MemStoreLAB tmpAllocator = null;
189     if (this.snapshotId != id) {
190       throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed "
191           + id);
192     }
193     // OK. Passed in snapshot is same as current snapshot. If not-empty,
194     // create a new snapshot and let the old one go.
195     if (!this.snapshot.isEmpty()) {
196       this.snapshot = new CellSkipListSet(this.comparator);
197       this.snapshotTimeRangeTracker = new TimeRangeTracker();
198     }
199     this.snapshotSize = 0;
200     this.snapshotId = -1;
201     if (this.snapshotAllocator != null) {
202       tmpAllocator = this.snapshotAllocator;
203       this.snapshotAllocator = null;
204     }
205     if (tmpAllocator != null) {
206       tmpAllocator.close();
207     }
208   }
209 
210   @Override
211   public long getFlushableSize() {
212     return this.snapshotSize > 0 ? this.snapshotSize : keySize();
213   }
214 
215   @Override
216   public long getSnapshotSize() {
217     return this.snapshotSize;
218   }
219 
220   /**
221    * Write an update
222    * @param cell
223    * @return approximate size of the passed Cell.
224    */
225   @Override
226   public long add(Cell cell) {
227     Cell toAdd = maybeCloneWithAllocator(cell);
228     return internalAdd(toAdd);
229   }
230 
231   @Override
232   public long timeOfOldestEdit() {
233     return timeOfOldestEdit;
234   }
235 
236   private boolean addToCellSet(Cell e) {
237     boolean b = this.cellSet.add(e);
238     // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
239     // When we use ACL CP or Visibility CP which deals with Tags during
240     // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
241     // parse the byte[] to identify the tags length.
242     if(e.getTagsLength() > 0) {
243       tagsPresent = true;
244     }
245     setOldestEditTimeToNow();
246     return b;
247   }
248 
249   private boolean removeFromCellSet(Cell e) {
250     boolean b = this.cellSet.remove(e);
251     setOldestEditTimeToNow();
252     return b;
253   }
254 
255   void setOldestEditTimeToNow() {
256     if (timeOfOldestEdit == Long.MAX_VALUE) {
257       timeOfOldestEdit = EnvironmentEdgeManager.currentTime();
258     }
259   }
260 
261   /**
262    * Internal version of add() that doesn't clone Cells with the
263    * allocator, and doesn't take the lock.
264    *
265    * Callers should ensure they already have the read lock taken
266    */
267   private long internalAdd(final Cell toAdd) {
268     long s = heapSizeChange(toAdd, addToCellSet(toAdd));
269     timeRangeTracker.includeTimestamp(toAdd);
270     this.size.addAndGet(s);
271     return s;
272   }
273 
274   private Cell maybeCloneWithAllocator(Cell cell) {
275     if (allocator == null) {
276       return cell;
277     }
278 
279     int len = KeyValueUtil.length(cell);
280     ByteRange alloc = allocator.allocateBytes(len);
281     if (alloc == null) {
282       // The allocation was too large, allocator decided
283       // not to do anything with it.
284       return cell;
285     }
286     assert alloc.getBytes() != null;
287     KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset());
288     KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
289     newKv.setSequenceId(cell.getSequenceId());
290     return newKv;
291   }
292 
293   /**
294    * Remove n key from the memstore. Only cells that have the same key and the
295    * same memstoreTS are removed.  It is ok to not update timeRangeTracker
296    * in this call. It is possible that we can optimize this method by using
297    * tailMap/iterator, but since this method is called rarely (only for
298    * error recovery), we can leave those optimization for the future.
299    * @param cell
300    */
301   @Override
302   public void rollback(Cell cell) {
303     // If the key is in the snapshot, delete it. We should not update
304     // this.size, because that tracks the size of only the memstore and
305     // not the snapshot. The flush of this snapshot to disk has not
306     // yet started because Store.flush() waits for all rwcc transactions to
307     // commit before starting the flush to disk.
308     Cell found = this.snapshot.get(cell);
309     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
310       this.snapshot.remove(cell);
311       long sz = heapSizeChange(cell, true);
312       this.snapshotSize -= sz;
313     }
314     // If the key is in the memstore, delete it. Update this.size.
315     found = this.cellSet.get(cell);
316     if (found != null && found.getSequenceId() == cell.getSequenceId()) {
317       removeFromCellSet(cell);
318       long s = heapSizeChange(cell, true);
319       this.size.addAndGet(-s);
320     }
321   }
322 
323   /**
324    * Write a delete
325    * @param deleteCell
326    * @return approximate size of the passed key and value.
327    */
328   @Override
329   public long delete(Cell deleteCell) {
330     long s = 0;
331     Cell toAdd = maybeCloneWithAllocator(deleteCell);
332     s += heapSizeChange(toAdd, addToCellSet(toAdd));
333     timeRangeTracker.includeTimestamp(toAdd);
334     this.size.addAndGet(s);
335     return s;
336   }
337 
338   /**
339    * @param cell Find the row that comes after this one.  If null, we return the
340    * first.
341    * @return Next row or null if none found.
342    */
343   Cell getNextRow(final Cell cell) {
344     return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot));
345   }
346 
347   /*
348    * @param a
349    * @param b
350    * @return Return lowest of a or b or null if both a and b are null
351    */
352   private Cell getLowest(final Cell a, final Cell b) {
353     if (a == null) {
354       return b;
355     }
356     if (b == null) {
357       return a;
358     }
359     return comparator.compareRows(a, b) <= 0? a: b;
360   }
361 
362   /*
363    * @param key Find row that follows this one.  If null, return first.
364    * @param map Set to look in for a row beyond <code>row</code>.
365    * @return Next row or null if none found.  If one found, will be a new
366    * KeyValue -- can be destroyed by subsequent calls to this method.
367    */
368   private Cell getNextRow(final Cell key,
369       final NavigableSet<Cell> set) {
370     Cell result = null;
371     SortedSet<Cell> tail = key == null? set: set.tailSet(key);
372     // Iterate until we fall into the next row; i.e. move off current row
373     for (Cell cell: tail) {
374       if (comparator.compareRows(cell, key) <= 0)
375         continue;
376       // Note: Not suppressing deletes or expired cells.  Needs to be handled
377       // by higher up functions.
378       result = cell;
379       break;
380     }
381     return result;
382   }
383 
384   /**
385    * Only used by tests. TODO: Remove
386    *
387    * Given the specs of a column, update it, first by inserting a new record,
388    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
389    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
390    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
391    * get the new value, or the old value and all readers will eventually only see the new
392    * value after the old was removed.
393    *
394    * @param row
395    * @param family
396    * @param qualifier
397    * @param newValue
398    * @param now
399    * @return  Timestamp
400    */
401   @Override
402   public long updateColumnValue(byte[] row,
403                                 byte[] family,
404                                 byte[] qualifier,
405                                 long newValue,
406                                 long now) {
407     Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier);
408     // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
409     SortedSet<Cell> snSs = snapshot.tailSet(firstCell);
410     if (!snSs.isEmpty()) {
411       Cell snc = snSs.first();
412       // is there a matching Cell in the snapshot?
413       if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) {
414         if (snc.getTimestamp() == now) {
415           // poop,
416           now += 1;
417         }
418       }
419     }
420 
421     // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
422     // But the timestamp should also be max(now, mostRecentTsInMemstore)
423 
424     // so we cant add the new Cell w/o knowing what's there already, but we also
425     // want to take this chance to delete some cells. So two loops (sad)
426 
427     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
428     for (Cell cell : ss) {
429       // if this isnt the row we are interested in, then bail:
430       if (!CellUtil.matchingColumn(cell, family, qualifier)
431           || !CellUtil.matchingRow(cell, firstCell)) {
432         break; // rows dont match, bail.
433       }
434 
435       // if the qualifier matches and it's a put, just RM it out of the cellSet.
436       if (cell.getTypeByte() == KeyValue.Type.Put.getCode() &&
437           cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) {
438         now = cell.getTimestamp();
439       }
440     }
441 
442     // create or update (upsert) a new Cell with
443     // 'now' and a 0 memstoreTS == immediately visible
444     List<Cell> cells = new ArrayList<Cell>(1);
445     cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
446     return upsert(cells, 1L);
447   }
448 
449   /**
450    * Update or insert the specified KeyValues.
451    * <p>
452    * For each KeyValue, insert into MemStore.  This will atomically upsert the
453    * value for that row/family/qualifier.  If a KeyValue did already exist,
454    * it will then be removed.
455    * <p>
456    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
457    * be immediately visible.  May want to change this so it is atomic across
458    * all KeyValues.
459    * <p>
460    * This is called under row lock, so Get operations will still see updates
461    * atomically.  Scans will only see each KeyValue update as atomic.
462    *
463    * @param cells
464    * @param readpoint readpoint below which we can safely remove duplicate KVs
465    * @return change in memstore size
466    */
467   @Override
468   public long upsert(Iterable<Cell> cells, long readpoint) {
469     long size = 0;
470     for (Cell cell : cells) {
471       size += upsert(cell, readpoint);
472     }
473     return size;
474   }
475 
476   /**
477    * Inserts the specified KeyValue into MemStore and deletes any existing
478    * versions of the same row/family/qualifier as the specified KeyValue.
479    * <p>
480    * First, the specified KeyValue is inserted into the Memstore.
481    * <p>
482    * If there are any existing KeyValues in this MemStore with the same row,
483    * family, and qualifier, they are removed.
484    * <p>
485    * Callers must hold the read lock.
486    *
487    * @param cell
488    * @return change in size of MemStore
489    */
490   private long upsert(Cell cell, long readpoint) {
491     // Add the Cell to the MemStore
492     // Use the internalAdd method here since we (a) already have a lock
493     // and (b) cannot safely use the MSLAB here without potentially
494     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
495     // test that triggers the pathological case if we don't avoid MSLAB
496     // here.
497     long addedSize = internalAdd(cell);
498 
499     // Get the Cells for the row/family/qualifier regardless of timestamp.
500     // For this case we want to clean up any other puts
501     Cell firstCell = KeyValueUtil.createFirstOnRow(
502         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
503         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
504         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
505     SortedSet<Cell> ss = cellSet.tailSet(firstCell);
506     Iterator<Cell> it = ss.iterator();
507     // versions visible to oldest scanner
508     int versionsVisible = 0;
509     while ( it.hasNext() ) {
510       Cell cur = it.next();
511 
512       if (cell == cur) {
513         // ignore the one just put in
514         continue;
515       }
516       // check that this is the row and column we are interested in, otherwise bail
517       if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
518         // only remove Puts that concurrent scanners cannot possibly see
519         if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
520             cur.getSequenceId() <= readpoint) {
521           if (versionsVisible >= 1) {
522             // if we get here we have seen at least one version visible to the oldest scanner,
523             // which means we can prove that no scanner will see this version
524 
525             // false means there was a change, so give us the size.
526             long delta = heapSizeChange(cur, true);
527             addedSize -= delta;
528             this.size.addAndGet(-delta);
529             it.remove();
530             setOldestEditTimeToNow();
531           } else {
532             versionsVisible++;
533           }
534         }
535       } else {
536         // past the row or column, done
537         break;
538       }
539     }
540     return addedSize;
541   }
542 
543   /**
544    * @return scanner on memstore and snapshot in this order.
545    */
546   @Override
547   public List<KeyValueScanner> getScanners(long readPt) {
548     return Collections.<KeyValueScanner> singletonList(new MemStoreScanner(readPt));
549   }
550 
551   /**
552    * Check if this memstore may contain the required keys
553    * @param scan
554    * @return False if the key definitely does not exist in this Memstore
555    */
556   public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) {
557     return (timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
558         snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()))
559         && (Math.max(timeRangeTracker.getMaximumTimestamp(),
560                      snapshotTimeRangeTracker.getMaximumTimestamp()) >=
561             oldestUnexpiredTS);
562   }
563 
564   /*
565    * MemStoreScanner implements the KeyValueScanner.
566    * It lets the caller scan the contents of a memstore -- both current
567    * map and snapshot.
568    * This behaves as if it were a real scanner but does not maintain position.
569    */
570   protected class MemStoreScanner extends NonLazyKeyValueScanner {
571     // Next row information for either cellSet or snapshot
572     private Cell cellSetNextRow = null;
573     private Cell snapshotNextRow = null;
574 
575     // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek)
576     private Cell cellSetItRow = null;
577     private Cell snapshotItRow = null;
578     
579     // iterator based scanning.
580     private Iterator<Cell> cellSetIt;
581     private Iterator<Cell> snapshotIt;
582 
583     // The cellSet and snapshot at the time of creating this scanner
584     private CellSkipListSet cellSetAtCreation;
585     private CellSkipListSet snapshotAtCreation;
586 
587     // the pre-calculated Cell to be returned by peek() or next()
588     private Cell theNext;
589 
590     // The allocator and snapshot allocator at the time of creating this scanner
591     volatile MemStoreLAB allocatorAtCreation;
592     volatile MemStoreLAB snapshotAllocatorAtCreation;
593     
594     // A flag represents whether could stop skipping Cells for MVCC
595     // if have encountered the next row. Only used for reversed scan
596     private boolean stopSkippingCellsIfNextRow = false;
597 
598     private long readPoint;
599 
600     /*
601     Some notes...
602 
603      So memstorescanner is fixed at creation time. this includes pointers/iterators into
604     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
605     snapshot is moved.  since kvset is null there is no point on reseeking on both,
606       we can save us the trouble. During the snapshot->hfile transition, the memstore
607       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
608       potentially do something smarter by adjusting the existing memstore scanner.
609 
610       But there is a greater problem here, that being once a scanner has progressed
611       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
612       if a scan lasts a little while, there is a chance for new entries in kvset to
613       become available but we will never see them.  This needs to be handled at the
614       StoreScanner level with coordination with MemStoreScanner.
615 
616       Currently, this problem is only partly managed: during the small amount of time
617       when the StoreScanner has not yet created a new MemStoreScanner, we will miss
618       the adds to kvset in the MemStoreScanner.
619     */
620 
621     MemStoreScanner(long readPoint) {
622       super();
623 
624       this.readPoint = readPoint;
625       cellSetAtCreation = cellSet;
626       snapshotAtCreation = snapshot;
627       if (allocator != null) {
628         this.allocatorAtCreation = allocator;
629         this.allocatorAtCreation.incScannerCount();
630       }
631       if (snapshotAllocator != null) {
632         this.snapshotAllocatorAtCreation = snapshotAllocator;
633         this.snapshotAllocatorAtCreation.incScannerCount();
634       }
635       if (Trace.isTracing() && Trace.currentSpan() != null) {
636         Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
637       }
638     }
639 
640     /**
641      * Lock on 'this' must be held by caller.
642      * @param it
643      * @return Next Cell
644      */
645     private Cell getNext(Iterator<Cell> it) {
646       Cell startCell = theNext;
647       Cell v = null;
648       try {
649         while (it.hasNext()) {
650           v = it.next();
651           if (v.getSequenceId() <= this.readPoint) {
652             return v;
653           }
654           if (stopSkippingCellsIfNextRow && startCell != null
655               && comparator.compareRows(v, startCell) > 0) {
656             return null;
657           }
658         }
659 
660         return null;
661       } finally {
662         if (v != null) {
663           // in all cases, remember the last Cell iterated to
664           if (it == snapshotIt) {
665             snapshotItRow = v;
666           } else {
667             cellSetItRow = v;
668           }
669         }
670       }
671     }
672 
673     /**
674      *  Set the scanner at the seek key.
675      *  Must be called only once: there is no thread safety between the scanner
676      *   and the memStore.
677      * @param key seek value
678      * @return false if the key is null or if there is no data
679      */
680     @Override
681     public synchronized boolean seek(Cell key) {
682       if (key == null) {
683         close();
684         return false;
685       }
686       // kvset and snapshot will never be null.
687       // if tailSet can't find anything, SortedSet is empty (not null).
688       cellSetIt = cellSetAtCreation.tailSet(key).iterator();
689       snapshotIt = snapshotAtCreation.tailSet(key).iterator();
690       cellSetItRow = null;
691       snapshotItRow = null;
692 
693       return seekInSubLists(key);
694     }
695 
696 
697     /**
698      * (Re)initialize the iterators after a seek or a reseek.
699      */
700     private synchronized boolean seekInSubLists(Cell key){
701       cellSetNextRow = getNext(cellSetIt);
702       snapshotNextRow = getNext(snapshotIt);
703 
704       // Calculate the next value
705       theNext = getLowest(cellSetNextRow, snapshotNextRow);
706 
707       // has data
708       return (theNext != null);
709     }
710 
711 
712     /**
713      * Move forward on the sub-lists set previously by seek.
714      * @param key seek value (should be non-null)
715      * @return true if there is at least one KV to read, false otherwise
716      */
717     @Override
718     public synchronized boolean reseek(Cell key) {
719       /*
720       See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
721       This code is executed concurrently with flush and puts, without locks.
722       Two points must be known when working on this code:
723       1) It's not possible to use the 'kvTail' and 'snapshot'
724        variables, as they are modified during a flush.
725       2) The ideal implementation for performance would use the sub skip list
726        implicitly pointed by the iterators 'kvsetIt' and
727        'snapshotIt'. Unfortunately the Java API does not offer a method to
728        get it. So we remember the last keys we iterated to and restore
729        the reseeked set to at least that point.
730        */
731       cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator();
732       snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator();
733 
734       return seekInSubLists(key);
735     }
736 
737 
738     @Override
739     public synchronized Cell peek() {
740       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
741       return theNext;
742     }
743 
744     @Override
745     public synchronized Cell next() {
746       if (theNext == null) {
747           return null;
748       }
749 
750       final Cell ret = theNext;
751 
752       // Advance one of the iterators
753       if (theNext == cellSetNextRow) {
754         cellSetNextRow = getNext(cellSetIt);
755       } else {
756         snapshotNextRow = getNext(snapshotIt);
757       }
758 
759       // Calculate the next value
760       theNext = getLowest(cellSetNextRow, snapshotNextRow);
761 
762       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
763       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
764       //    getLowest() + " threadpoint=" + readpoint);
765       return ret;
766     }
767 
768     /*
769      * Returns the lower of the two key values, or null if they are both null.
770      * This uses comparator.compare() to compare the KeyValue using the memstore
771      * comparator.
772      */
773     private Cell getLowest(Cell first, Cell second) {
774       if (first == null && second == null) {
775         return null;
776       }
777       if (first != null && second != null) {
778         int compare = comparator.compare(first, second);
779         return (compare <= 0 ? first : second);
780       }
781       return (first != null ? first : second);
782     }
783 
784     /*
785      * Returns the higher of the two cells, or null if they are both null.
786      * This uses comparator.compare() to compare the Cell using the memstore
787      * comparator.
788      */
789     private Cell getHighest(Cell first, Cell second) {
790       if (first == null && second == null) {
791         return null;
792       }
793       if (first != null && second != null) {
794         int compare = comparator.compare(first, second);
795         return (compare > 0 ? first : second);
796       }
797       return (first != null ? first : second);
798     }
799 
800     public synchronized void close() {
801       this.cellSetNextRow = null;
802       this.snapshotNextRow = null;
803 
804       this.cellSetIt = null;
805       this.snapshotIt = null;
806       
807       if (allocatorAtCreation != null) {
808         this.allocatorAtCreation.decScannerCount();
809         this.allocatorAtCreation = null;
810       }
811       if (snapshotAllocatorAtCreation != null) {
812         this.snapshotAllocatorAtCreation.decScannerCount();
813         this.snapshotAllocatorAtCreation = null;
814       }
815 
816       this.cellSetItRow = null;
817       this.snapshotItRow = null;
818     }
819 
820     /**
821      * MemStoreScanner returns max value as sequence id because it will
822      * always have the latest data among all files.
823      */
824     @Override
825     public long getSequenceID() {
826       return Long.MAX_VALUE;
827     }
828 
829     @Override
830     public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns,
831         long oldestUnexpiredTS) {
832       return shouldSeek(scan, oldestUnexpiredTS);
833     }
834 
835     /**
836      * Seek scanner to the given key first. If it returns false(means
837      * peek()==null) or scanner's peek row is bigger than row of given key, seek
838      * the scanner to the previous row of given key
839      */
840     @Override
841     public synchronized boolean backwardSeek(Cell key) {
842       seek(key);
843       if (peek() == null || comparator.compareRows(peek(), key) > 0) {
844         return seekToPreviousRow(key);
845       }
846       return true;
847     }
848 
849     /**
850      * Separately get the KeyValue before the specified key from kvset and
851      * snapshotset, and use the row of higher one as the previous row of
852      * specified key, then seek to the first KeyValue of previous row
853      */
854     @Override
855     public synchronized boolean seekToPreviousRow(Cell originalKey) {
856       boolean keepSeeking = false;
857       Cell key = originalKey;
858       do {
859         Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
860         SortedSet<Cell> cellHead = cellSetAtCreation.headSet(firstKeyOnRow);
861         Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
862         SortedSet<Cell> snapshotHead = snapshotAtCreation
863             .headSet(firstKeyOnRow);
864         Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead
865             .last();
866         Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow);
867         if (lastCellBeforeRow == null) {
868           theNext = null;
869           return false;
870         }
871         Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
872         this.stopSkippingCellsIfNextRow = true;
873         seek(firstKeyOnPreviousRow);
874         this.stopSkippingCellsIfNextRow = false;
875         if (peek() == null
876             || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) {
877           keepSeeking = true;
878           key = firstKeyOnPreviousRow;
879           continue;
880         } else {
881           keepSeeking = false;
882         }
883       } while (keepSeeking);
884       return true;
885     }
886 
887     @Override
888     public synchronized boolean seekToLastRow() {
889       Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation
890           .last();
891       Cell second = snapshotAtCreation.isEmpty() ? null
892           : snapshotAtCreation.last();
893       Cell higherCell = getHighest(first, second);
894       if (higherCell == null) {
895         return false;
896       }
897       Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
898       if (seek(firstCellOnLastRow)) {
899         return true;
900       } else {
901         return seekToPreviousRow(higherCell);
902       }
903 
904     }
905   }
906 
907   public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
908       + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
909 
910   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
911       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
912       (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
913 
914   /*
915    * Calculate how the MemStore size has changed.  Includes overhead of the
916    * backing Map.
917    * @param cell
918    * @param notpresent True if the cell was NOT present in the set.
919    * @return Size
920    */
921   static long heapSizeChange(final Cell cell, final boolean notpresent) {
922     return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY
923         + CellUtil.estimatedHeapSizeOf(cell)) : 0;
924   }
925 
926   private long keySize() {
927     return heapSize() - DEEP_OVERHEAD;
928   }
929 
930   /**
931    * Get the entire heap usage for this MemStore not including keys in the
932    * snapshot.
933    */
934   @Override
935   public long heapSize() {
936     return size.get();
937   }
938 
939   @Override
940   public long size() {
941     return heapSize();
942   }
943 
944   /**
945    * Code to help figure if our approximation of object heap sizes is close
946    * enough.  See hbase-900.  Fills memstores then waits so user can heap
947    * dump and bring up resultant hprof in something like jprofiler which
948    * allows you get 'deep size' on objects.
949    * @param args main args
950    */
951   public static void main(String [] args) {
952     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
953     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
954       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
955     LOG.info("vmInputArguments=" + runtime.getInputArguments());
956     DefaultMemStore memstore1 = new DefaultMemStore();
957     // TODO: x32 vs x64
958     long size = 0;
959     final int count = 10000;
960     byte [] fam = Bytes.toBytes("col");
961     byte [] qf = Bytes.toBytes("umn");
962     byte [] empty = new byte[0];
963     for (int i = 0; i < count; i++) {
964       // Give each its own ts
965       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
966     }
967     LOG.info("memstore1 estimated size=" + size);
968     for (int i = 0; i < count; i++) {
969       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
970     }
971     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
972     // Make a variably sized memstore.
973     DefaultMemStore memstore2 = new DefaultMemStore();
974     for (int i = 0; i < count; i++) {
975       size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, new byte[i]));
976     }
977     LOG.info("memstore2 estimated size=" + size);
978     final int seconds = 30;
979     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
980     for (int i = 0; i < seconds; i++) {
981       // Thread.sleep(1000);
982     }
983     LOG.info("Exiting.");
984   }
985 
986 }