View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import static org.apache.hadoop.hbase.util.Bytes.len;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.EOFException;
27  import java.io.IOException;
28  import java.io.InputStream;
29  import java.io.OutputStream;
30  import java.nio.ByteBuffer;
31  import java.util.ArrayList;
32  import java.util.Arrays;
33  import java.util.Comparator;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.io.HeapSize;
42  import org.apache.hadoop.hbase.util.ByteBufferUtils;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.ClassSize;
45  import org.apache.hadoop.io.IOUtils;
46  import org.apache.hadoop.io.RawComparator;
47  
48  import com.google.common.annotations.VisibleForTesting;
49  
50  /**
51   * An HBase Key/Value. This is the fundamental HBase Type.
52   * <p>
53   * HBase applications and users should use the Cell interface and avoid directly using KeyValue and
54   * member functions not defined in Cell.
55   * <p>
56   * If being used client-side, the primary methods to access individual fields are
57   * {@link #getRowArray()}, {@link #getFamilyArray()}, {@link #getQualifierArray()},
58   * {@link #getTimestamp()}, and {@link #getValueArray()}. These methods allocate new byte arrays
59   * and return copies. Avoid their use server-side.
60   * <p>
61   * Instances of this class are immutable. They do not implement Comparable but Comparators are
62   * provided. Comparators change with context, whether user table or a catalog table comparison. Its
63   * critical you use the appropriate comparator. There are Comparators for normal HFiles, Meta's
64   * Hfiles, and bloom filter keys.
65   * <p>
66   * KeyValue wraps a byte array and takes offsets and lengths into passed array at where to start
67   * interpreting the content as KeyValue. The KeyValue format inside a byte array is:
68   * <code>&lt;keylength&gt; &lt;valuelength&gt; &lt;key&gt; &lt;value&gt;</code> Key is further
69   * decomposed as: <code>&lt;rowlength&gt; &lt;row&gt; &lt;columnfamilylength&gt;
70   * &lt;columnfamily&gt; &lt;columnqualifier&gt;
71   * &lt;timestamp&gt; &lt;keytype&gt;</code> The <code>rowlength</code> maximum is
72   * <code>Short.MAX_SIZE</code>, column family length maximum is <code>Byte.MAX_SIZE</code>, and
73   * column qualifier + key length must be &lt; <code>Integer.MAX_SIZE</code>. The column does not
74   * contain the family/qualifier delimiter, {@link #COLUMN_FAMILY_DELIMITER}<br>
75   * KeyValue can optionally contain Tags. When it contains tags, it is added in the byte array after
76   * the value part. The format for this part is: <code>&lt;tagslength&gt;&lt;tagsbytes&gt;</code>.
77   * <code>tagslength</code> maximum is <code>Short.MAX_SIZE</code>. The <code>tagsbytes</code>
78   * contain one or more tags where as each tag is of the form
79   * <code>&lt;taglength&gt;&lt;tagtype&gt;&lt;tagbytes&gt;</code>. <code>tagtype</code> is one byte
80   * and <code>taglength</code> maximum is <code>Short.MAX_SIZE</code> and it includes 1 byte type
81   * length and actual tag bytes length.
82   */
83  @InterfaceAudience.Private
84  public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
85      SettableTimestamp, Streamable {
86    private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
87  
88    private static final Log LOG = LogFactory.getLog(KeyValue.class);
89  
90    /**
91     * Colon character in UTF-8
92     */
93    public static final char COLUMN_FAMILY_DELIMITER = ':';
94  
95    public static final byte[] COLUMN_FAMILY_DELIM_ARRAY =
96      new byte[]{COLUMN_FAMILY_DELIMITER};
97  
98    /**
99     * Comparator for plain key/values; i.e. non-catalog table key/values. Works on Key portion
100    * of KeyValue only.
101    * @deprecated Use {@link CellComparator#COMPARATOR} instead
102    */
103   @Deprecated
104   public static final KVComparator COMPARATOR = new KVComparator();
105   /**
106    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
107    * {@link KeyValue}s.
108    * @deprecated Use {@link CellComparator#META_COMPARATOR} instead
109    */
110   @Deprecated
111   public static final KVComparator META_COMPARATOR = new MetaComparator();
112 
113   /**
114    * Needed for Bloom Filters.
115    *    * @deprecated Use {@link Bytes#BYTES_RAWCOMPARATOR} instead
116    */
117   @Deprecated
118   public static final KVComparator RAW_COMPARATOR = new RawBytesComparator();
119 
120   /** Size of the key length field in bytes*/
121   public static final int KEY_LENGTH_SIZE = Bytes.SIZEOF_INT;
122 
123   /** Size of the key type field in bytes */
124   public static final int TYPE_SIZE = Bytes.SIZEOF_BYTE;
125 
126   /** Size of the row length field in bytes */
127   public static final int ROW_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
128 
129   /** Size of the family length field in bytes */
130   public static final int FAMILY_LENGTH_SIZE = Bytes.SIZEOF_BYTE;
131 
132   /** Size of the timestamp field in bytes */
133   public static final int TIMESTAMP_SIZE = Bytes.SIZEOF_LONG;
134 
135   // Size of the timestamp and type byte on end of a key -- a long + a byte.
136   public static final int TIMESTAMP_TYPE_SIZE = TIMESTAMP_SIZE + TYPE_SIZE;
137 
138   // Size of the length shorts and bytes in key.
139   public static final int KEY_INFRASTRUCTURE_SIZE = ROW_LENGTH_SIZE
140       + FAMILY_LENGTH_SIZE + TIMESTAMP_TYPE_SIZE;
141 
142   // How far into the key the row starts at. First thing to read is the short
143   // that says how long the row is.
144   public static final int ROW_OFFSET =
145     Bytes.SIZEOF_INT /*keylength*/ +
146     Bytes.SIZEOF_INT /*valuelength*/;
147 
148   public static final int ROW_KEY_OFFSET = ROW_OFFSET + ROW_LENGTH_SIZE;
149 
150   // Size of the length ints in a KeyValue datastructure.
151   public static final int KEYVALUE_INFRASTRUCTURE_SIZE = ROW_OFFSET;
152 
153   /** Size of the tags length field in bytes */
154   public static final int TAGS_LENGTH_SIZE = Bytes.SIZEOF_SHORT;
155 
156   public static final int KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE = ROW_OFFSET + TAGS_LENGTH_SIZE;
157 
158   private static final int MAX_TAGS_LENGTH = (2 * Short.MAX_VALUE) + 1;
159 
160   /**
161    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
162    * characteristics would take up for its underlying data structure.
163    *
164    * @param rlength row length
165    * @param flength family length
166    * @param qlength qualifier length
167    * @param vlength value length
168    *
169    * @return the <code>KeyValue</code> data structure length
170    */
171   public static long getKeyValueDataStructureSize(int rlength,
172       int flength, int qlength, int vlength) {
173     return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE
174         + getKeyDataStructureSize(rlength, flength, qlength) + vlength;
175   }
176 
177   /**
178    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
179    * characteristics would take up for its underlying data structure.
180    *
181    * @param rlength row length
182    * @param flength family length
183    * @param qlength qualifier length
184    * @param vlength value length
185    * @param tagsLength total length of the tags
186    *
187    * @return the <code>KeyValue</code> data structure length
188    */
189   public static long getKeyValueDataStructureSize(int rlength, int flength, int qlength,
190       int vlength, int tagsLength) {
191     if (tagsLength == 0) {
192       return getKeyValueDataStructureSize(rlength, flength, qlength, vlength);
193     }
194     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE
195         + getKeyDataStructureSize(rlength, flength, qlength) + vlength + tagsLength;
196   }
197 
198   /**
199    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
200    * characteristics would take up for its underlying data structure.
201    *
202    * @param klength key length
203    * @param vlength value length
204    * @param tagsLength total length of the tags
205    *
206    * @return the <code>KeyValue</code> data structure length
207    */
208   public static long getKeyValueDataStructureSize(int klength, int vlength, int tagsLength) {
209     if (tagsLength == 0) {
210       return KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + klength + vlength;
211     }
212     return KeyValue.KEYVALUE_WITH_TAGS_INFRASTRUCTURE_SIZE + klength + vlength + tagsLength;
213   }
214 
215   /**
216    * Computes the number of bytes that a <code>KeyValue</code> instance with the provided
217    * characteristics would take up in its underlying data structure for the key.
218    *
219    * @param rlength row length
220    * @param flength family length
221    * @param qlength qualifier length
222    *
223    * @return the key data structure length
224    */
225   public static long getKeyDataStructureSize(int rlength, int flength, int qlength) {
226     return KeyValue.KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength;
227   }
228 
229   /**
230    * Key type.
231    * Has space for other key types to be added later.  Cannot rely on
232    * enum ordinals . They change if item is removed or moved.  Do our own codes.
233    */
234   public static enum Type {
235     Minimum((byte)0),
236     Put((byte)4),
237 
238     Delete((byte)8),
239     DeleteFamilyVersion((byte)10),
240     DeleteColumn((byte)12),
241     DeleteFamily((byte)14),
242 
243     // Maximum is used when searching; you look from maximum on down.
244     Maximum((byte)255);
245 
246     private final byte code;
247 
248     Type(final byte c) {
249       this.code = c;
250     }
251 
252     public byte getCode() {
253       return this.code;
254     }
255 
256     /**
257      * Cannot rely on enum ordinals . They change if item is removed or moved.
258      * Do our own codes.
259      * @param b
260      * @return Type associated with passed code.
261      */
262     public static Type codeToType(final byte b) {
263       for (Type t : Type.values()) {
264         if (t.getCode() == b) {
265           return t;
266         }
267       }
268       throw new RuntimeException("Unknown code " + b);
269     }
270   }
271 
272   /**
273    * Lowest possible key.
274    * Makes a Key with highest possible Timestamp, empty row and column.  No
275    * key can be equal or lower than this one in memstore or in store file.
276    */
277   public static final KeyValue LOWESTKEY =
278     new KeyValue(HConstants.EMPTY_BYTE_ARRAY, HConstants.LATEST_TIMESTAMP);
279 
280   ////
281   // KeyValue core instance fields.
282   protected byte [] bytes = null;  // an immutable byte array that contains the KV
283   protected int offset = 0;  // offset into bytes buffer KV starts at
284   protected int length = 0;  // length of the KV starting from offset.
285 
286   /**
287    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
288    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
289    * KeyValue type.
290    */
291   public static boolean isDelete(byte t) {
292     return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
293   }
294 
295   /** Here be dragons **/
296 
297   /**
298    * used to achieve atomic operations in the memstore.
299    */
300   @Override
301   public long getSequenceId() {
302     return seqId;
303   }
304 
305   @Override
306   public void setSequenceId(long seqId) {
307     this.seqId = seqId;
308   }
309 
310   // multi-version concurrency control version.  default value is 0, aka do not care.
311   private long seqId = 0;
312 
313   /** Dragon time over, return to normal business */
314 
315 
316   /** Writable Constructor -- DO NOT USE */
317   public KeyValue() {}
318 
319   /**
320    * Creates a KeyValue from the start of the specified byte array.
321    * Presumes <code>bytes</code> content is formatted as a KeyValue blob.
322    * @param bytes byte array
323    */
324   public KeyValue(final byte [] bytes) {
325     this(bytes, 0);
326   }
327 
328   /**
329    * Creates a KeyValue from the specified byte array and offset.
330    * Presumes <code>bytes</code> content starting at <code>offset</code> is
331    * formatted as a KeyValue blob.
332    * @param bytes byte array
333    * @param offset offset to start of KeyValue
334    */
335   public KeyValue(final byte [] bytes, final int offset) {
336     this(bytes, offset, getLength(bytes, offset));
337   }
338 
339   /**
340    * Creates a KeyValue from the specified byte array, starting at offset, and
341    * for length <code>length</code>.
342    * @param bytes byte array
343    * @param offset offset to start of the KeyValue
344    * @param length length of the KeyValue
345    */
346   public KeyValue(final byte [] bytes, final int offset, final int length) {
347     this.bytes = bytes;
348     this.offset = offset;
349     this.length = length;
350   }
351 
352   /**
353    * Creates a KeyValue from the specified byte array, starting at offset, and
354    * for length <code>length</code>.
355    *
356    * @param bytes  byte array
357    * @param offset offset to start of the KeyValue
358    * @param length length of the KeyValue
359    * @param ts
360    */
361   public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
362     this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
363   }
364 
365   /** Constructors that build a new backing byte array from fields */
366 
367   /**
368    * Constructs KeyValue structure filled with null value.
369    * Sets type to {@link KeyValue.Type#Maximum}
370    * @param row - row key (arbitrary byte array)
371    * @param timestamp
372    */
373   public KeyValue(final byte [] row, final long timestamp) {
374     this(row, null, null, timestamp, Type.Maximum, null);
375   }
376 
377   /**
378    * Constructs KeyValue structure filled with null value.
379    * @param row - row key (arbitrary byte array)
380    * @param timestamp
381    */
382   public KeyValue(final byte [] row, final long timestamp, Type type) {
383     this(row, null, null, timestamp, type, null);
384   }
385 
386   /**
387    * Constructs KeyValue structure filled with null value.
388    * Sets type to {@link KeyValue.Type#Maximum}
389    * @param row - row key (arbitrary byte array)
390    * @param family family name
391    * @param qualifier column qualifier
392    */
393   public KeyValue(final byte [] row, final byte [] family,
394       final byte [] qualifier) {
395     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
396   }
397 
398   /**
399    * Constructs KeyValue structure as a put filled with specified values and
400    * LATEST_TIMESTAMP.
401    * @param row - row key (arbitrary byte array)
402    * @param family family name
403    * @param qualifier column qualifier
404    */
405   public KeyValue(final byte [] row, final byte [] family,
406       final byte [] qualifier, final byte [] value) {
407     this(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value);
408   }
409 
410   /**
411    * Constructs KeyValue structure filled with specified values.
412    * @param row row key
413    * @param family family name
414    * @param qualifier column qualifier
415    * @param timestamp version timestamp
416    * @param type key type
417    * @throws IllegalArgumentException
418    */
419   public KeyValue(final byte[] row, final byte[] family,
420       final byte[] qualifier, final long timestamp, Type type) {
421     this(row, family, qualifier, timestamp, type, null);
422   }
423 
424   /**
425    * Constructs KeyValue structure filled with specified values.
426    * @param row row key
427    * @param family family name
428    * @param qualifier column qualifier
429    * @param timestamp version timestamp
430    * @param value column value
431    * @throws IllegalArgumentException
432    */
433   public KeyValue(final byte[] row, final byte[] family,
434       final byte[] qualifier, final long timestamp, final byte[] value) {
435     this(row, family, qualifier, timestamp, Type.Put, value);
436   }
437 
438   /**
439    * Constructs KeyValue structure filled with specified values.
440    * @param row row key
441    * @param family family name
442    * @param qualifier column qualifier
443    * @param timestamp version timestamp
444    * @param value column value
445    * @param tags tags
446    * @throws IllegalArgumentException
447    */
448   public KeyValue(final byte[] row, final byte[] family,
449       final byte[] qualifier, final long timestamp, final byte[] value,
450       final Tag[] tags) {
451     this(row, family, qualifier, timestamp, value, tags != null ? Arrays.asList(tags) : null);
452   }
453 
454   /**
455    * Constructs KeyValue structure filled with specified values.
456    * @param row row key
457    * @param family family name
458    * @param qualifier column qualifier
459    * @param timestamp version timestamp
460    * @param value column value
461    * @param tags tags non-empty list of tags or null
462    * @throws IllegalArgumentException
463    */
464   public KeyValue(final byte[] row, final byte[] family,
465       final byte[] qualifier, final long timestamp, final byte[] value,
466       final List<Tag> tags) {
467     this(row, 0, row==null ? 0 : row.length,
468       family, 0, family==null ? 0 : family.length,
469       qualifier, 0, qualifier==null ? 0 : qualifier.length,
470       timestamp, Type.Put,
471       value, 0, value==null ? 0 : value.length, tags);
472   }
473 
474   /**
475    * Constructs KeyValue structure filled with specified values.
476    * @param row row key
477    * @param family family name
478    * @param qualifier column qualifier
479    * @param timestamp version timestamp
480    * @param type key type
481    * @param value column value
482    * @throws IllegalArgumentException
483    */
484   public KeyValue(final byte[] row, final byte[] family,
485       final byte[] qualifier, final long timestamp, Type type,
486       final byte[] value) {
487     this(row, 0, len(row),   family, 0, len(family),   qualifier, 0, len(qualifier),
488         timestamp, type,   value, 0, len(value));
489   }
490 
491   /**
492    * Constructs KeyValue structure filled with specified values.
493    * <p>
494    * Column is split into two fields, family and qualifier.
495    * @param row row key
496    * @param family family name
497    * @param qualifier column qualifier
498    * @param timestamp version timestamp
499    * @param type key type
500    * @param value column value
501    * @throws IllegalArgumentException
502    */
503   public KeyValue(final byte[] row, final byte[] family,
504       final byte[] qualifier, final long timestamp, Type type,
505       final byte[] value, final List<Tag> tags) {
506     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
507         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
508   }
509 
510   /**
511    * Constructs KeyValue structure filled with specified values.
512    * @param row row key
513    * @param family family name
514    * @param qualifier column qualifier
515    * @param timestamp version timestamp
516    * @param type key type
517    * @param value column value
518    * @throws IllegalArgumentException
519    */
520   public KeyValue(final byte[] row, final byte[] family,
521       final byte[] qualifier, final long timestamp, Type type,
522       final byte[] value, final byte[] tags) {
523     this(row, family, qualifier, 0, qualifier==null ? 0 : qualifier.length,
524         timestamp, type, value, 0, value==null ? 0 : value.length, tags);
525   }
526 
527   /**
528    * Constructs KeyValue structure filled with specified values.
529    * @param row row key
530    * @param family family name
531    * @param qualifier column qualifier
532    * @param qoffset qualifier offset
533    * @param qlength qualifier length
534    * @param timestamp version timestamp
535    * @param type key type
536    * @param value column value
537    * @param voffset value offset
538    * @param vlength value length
539    * @throws IllegalArgumentException
540    */
541   public KeyValue(byte [] row, byte [] family,
542       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
543       byte [] value, int voffset, int vlength, List<Tag> tags) {
544     this(row, 0, row==null ? 0 : row.length,
545         family, 0, family==null ? 0 : family.length,
546         qualifier, qoffset, qlength, timestamp, type,
547         value, voffset, vlength, tags);
548   }
549 
550   /**
551    * @param row
552    * @param family
553    * @param qualifier
554    * @param qoffset
555    * @param qlength
556    * @param timestamp
557    * @param type
558    * @param value
559    * @param voffset
560    * @param vlength
561    * @param tags
562    */
563   public KeyValue(byte [] row, byte [] family,
564       byte [] qualifier, int qoffset, int qlength, long timestamp, Type type,
565       byte [] value, int voffset, int vlength, byte[] tags) {
566     this(row, 0, row==null ? 0 : row.length,
567         family, 0, family==null ? 0 : family.length,
568         qualifier, qoffset, qlength, timestamp, type,
569         value, voffset, vlength, tags, 0, tags==null ? 0 : tags.length);
570   }
571 
572   /**
573    * Constructs KeyValue structure filled with specified values.
574    * <p>
575    * Column is split into two fields, family and qualifier.
576    * @param row row key
577    * @throws IllegalArgumentException
578    */
579   public KeyValue(final byte [] row, final int roffset, final int rlength,
580       final byte [] family, final int foffset, final int flength,
581       final byte [] qualifier, final int qoffset, final int qlength,
582       final long timestamp, final Type type,
583       final byte [] value, final int voffset, final int vlength) {
584     this(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
585       qlength, timestamp, type, value, voffset, vlength, null);
586   }
587 
588   /**
589    * Constructs KeyValue structure filled with specified values. Uses the provided buffer as the
590    * data buffer.
591    * <p>
592    * Column is split into two fields, family and qualifier.
593    *
594    * @param buffer the bytes buffer to use
595    * @param boffset buffer offset
596    * @param row row key
597    * @param roffset row offset
598    * @param rlength row length
599    * @param family family name
600    * @param foffset family offset
601    * @param flength family length
602    * @param qualifier column qualifier
603    * @param qoffset qualifier offset
604    * @param qlength qualifier length
605    * @param timestamp version timestamp
606    * @param type key type
607    * @param value column value
608    * @param voffset value offset
609    * @param vlength value length
610    * @param tags non-empty list of tags or null
611    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
612    * remaining in the buffer
613    */
614   public KeyValue(byte [] buffer, final int boffset,
615       final byte [] row, final int roffset, final int rlength,
616       final byte [] family, final int foffset, final int flength,
617       final byte [] qualifier, final int qoffset, final int qlength,
618       final long timestamp, final Type type,
619       final byte [] value, final int voffset, final int vlength,
620       final Tag[] tags) {
621      this.bytes  = buffer;
622      this.length = writeByteArray(buffer, boffset,
623          row, roffset, rlength,
624          family, foffset, flength, qualifier, qoffset, qlength,
625         timestamp, type, value, voffset, vlength, tags);
626      this.offset = boffset;
627    }
628 
629   /**
630    * Constructs KeyValue structure filled with specified values.
631    * <p>
632    * Column is split into two fields, family and qualifier.
633    * @param row row key
634    * @param roffset row offset
635    * @param rlength row length
636    * @param family family name
637    * @param foffset family offset
638    * @param flength family length
639    * @param qualifier column qualifier
640    * @param qoffset qualifier offset
641    * @param qlength qualifier length
642    * @param timestamp version timestamp
643    * @param type key type
644    * @param value column value
645    * @param voffset value offset
646    * @param vlength value length
647    * @param tags tags
648    * @throws IllegalArgumentException
649    */
650   public KeyValue(final byte [] row, final int roffset, final int rlength,
651       final byte [] family, final int foffset, final int flength,
652       final byte [] qualifier, final int qoffset, final int qlength,
653       final long timestamp, final Type type,
654       final byte [] value, final int voffset, final int vlength,
655       final List<Tag> tags) {
656     this.bytes = createByteArray(row, roffset, rlength,
657         family, foffset, flength, qualifier, qoffset, qlength,
658         timestamp, type, value, voffset, vlength, tags);
659     this.length = bytes.length;
660     this.offset = 0;
661   }
662 
663   /**
664    * @param row
665    * @param roffset
666    * @param rlength
667    * @param family
668    * @param foffset
669    * @param flength
670    * @param qualifier
671    * @param qoffset
672    * @param qlength
673    * @param timestamp
674    * @param type
675    * @param value
676    * @param voffset
677    * @param vlength
678    * @param tags
679    */
680   public KeyValue(final byte [] row, final int roffset, final int rlength,
681       final byte [] family, final int foffset, final int flength,
682       final byte [] qualifier, final int qoffset, final int qlength,
683       final long timestamp, final Type type,
684       final byte [] value, final int voffset, final int vlength,
685       final byte[] tags, final int tagsOffset, final int tagsLength) {
686     this.bytes = createByteArray(row, roffset, rlength,
687         family, foffset, flength, qualifier, qoffset, qlength,
688         timestamp, type, value, voffset, vlength, tags, tagsOffset, tagsLength);
689     this.length = bytes.length;
690     this.offset = 0;
691   }
692 
693   /**
694    * Constructs an empty KeyValue structure, with specified sizes.
695    * This can be used to partially fill up KeyValues.
696    * <p>
697    * Column is split into two fields, family and qualifier.
698    * @param rlength row length
699    * @param flength family length
700    * @param qlength qualifier length
701    * @param timestamp version timestamp
702    * @param type key type
703    * @param vlength value length
704    * @throws IllegalArgumentException
705    */
706   public KeyValue(final int rlength,
707       final int flength,
708       final int qlength,
709       final long timestamp, final Type type,
710       final int vlength) {
711     this(rlength, flength, qlength, timestamp, type, vlength, 0);
712   }
713 
714   /**
715    * Constructs an empty KeyValue structure, with specified sizes.
716    * This can be used to partially fill up KeyValues.
717    * <p>
718    * Column is split into two fields, family and qualifier.
719    * @param rlength row length
720    * @param flength family length
721    * @param qlength qualifier length
722    * @param timestamp version timestamp
723    * @param type key type
724    * @param vlength value length
725    * @param tagsLength
726    * @throws IllegalArgumentException
727    */
728   public KeyValue(final int rlength,
729       final int flength,
730       final int qlength,
731       final long timestamp, final Type type,
732       final int vlength, final int tagsLength) {
733     this.bytes = createEmptyByteArray(rlength, flength, qlength, timestamp, type, vlength,
734         tagsLength);
735     this.length = bytes.length;
736     this.offset = 0;
737   }
738 
739 
740   public KeyValue(byte[] row, int roffset, int rlength,
741                   byte[] family, int foffset, int flength,
742                   ByteBuffer qualifier, long ts, Type type, ByteBuffer value, List<Tag> tags) {
743     this.bytes = createByteArray(row, roffset, rlength, family, foffset, flength,
744         qualifier, 0, qualifier == null ? 0 : qualifier.remaining(), ts, type,
745         value, 0, value == null ? 0 : value.remaining(), tags);
746     this.length = bytes.length;
747     this.offset = 0;
748   }
749 
750   public KeyValue(Cell c) {
751     this(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
752         c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
753         c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
754         c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
755         c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
756     this.seqId = c.getSequenceId();
757   }
758 
759   /**
760    * Create an empty byte[] representing a KeyValue
761    * All lengths are preset and can be filled in later.
762    * @param rlength
763    * @param flength
764    * @param qlength
765    * @param timestamp
766    * @param type
767    * @param vlength
768    * @return The newly created byte array.
769    */
770   private static byte[] createEmptyByteArray(final int rlength, int flength,
771       int qlength, final long timestamp, final Type type, int vlength, int tagsLength) {
772     if (rlength > Short.MAX_VALUE) {
773       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
774     }
775     if (flength > Byte.MAX_VALUE) {
776       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
777     }
778     // Qualifier length
779     if (qlength > Integer.MAX_VALUE - rlength - flength) {
780       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
781     }
782     checkForTagsLength(tagsLength);
783     // Key length
784     long longkeylength = getKeyDataStructureSize(rlength, flength, qlength);
785     if (longkeylength > Integer.MAX_VALUE) {
786       throw new IllegalArgumentException("keylength " + longkeylength + " > " +
787         Integer.MAX_VALUE);
788     }
789     int keylength = (int)longkeylength;
790     // Value length
791     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
792       throw new IllegalArgumentException("Valuer > " +
793           HConstants.MAXIMUM_VALUE_LENGTH);
794     }
795 
796     // Allocate right-sized byte array.
797     byte[] bytes= new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
798         tagsLength)];
799     // Write the correct size markers
800     int pos = 0;
801     pos = Bytes.putInt(bytes, pos, keylength);
802     pos = Bytes.putInt(bytes, pos, vlength);
803     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
804     pos += rlength;
805     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
806     pos += flength + qlength;
807     pos = Bytes.putLong(bytes, pos, timestamp);
808     pos = Bytes.putByte(bytes, pos, type.getCode());
809     pos += vlength;
810     if (tagsLength > 0) {
811       pos = Bytes.putAsShort(bytes, pos, tagsLength);
812     }
813     return bytes;
814   }
815 
816   /**
817    * Checks the parameters passed to a constructor.
818    *
819    * @param row row key
820    * @param rlength row length
821    * @param family family name
822    * @param flength family length
823    * @param qlength qualifier length
824    * @param vlength value length
825    *
826    * @throws IllegalArgumentException an illegal value was passed
827    */
828   private static void checkParameters(final byte [] row, final int rlength,
829       final byte [] family, int flength, int qlength, int vlength)
830           throws IllegalArgumentException {
831     if (rlength > Short.MAX_VALUE) {
832       throw new IllegalArgumentException("Row > " + Short.MAX_VALUE);
833     }
834     if (row == null) {
835       throw new IllegalArgumentException("Row is null");
836     }
837     // Family length
838     flength = family == null ? 0 : flength;
839     if (flength > Byte.MAX_VALUE) {
840       throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE);
841     }
842     // Qualifier length
843     if (qlength > Integer.MAX_VALUE - rlength - flength) {
844       throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE);
845     }
846     // Key length
847     long longKeyLength = getKeyDataStructureSize(rlength, flength, qlength);
848     if (longKeyLength > Integer.MAX_VALUE) {
849       throw new IllegalArgumentException("keylength " + longKeyLength + " > " +
850           Integer.MAX_VALUE);
851     }
852     // Value length
853     if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON
854       throw new IllegalArgumentException("Value length " + vlength + " > " +
855           HConstants.MAXIMUM_VALUE_LENGTH);
856     }
857   }
858 
859   /**
860    * Write KeyValue format into the provided byte array.
861    *
862    * @param buffer the bytes buffer to use
863    * @param boffset buffer offset
864    * @param row row key
865    * @param roffset row offset
866    * @param rlength row length
867    * @param family family name
868    * @param foffset family offset
869    * @param flength family length
870    * @param qualifier column qualifier
871    * @param qoffset qualifier offset
872    * @param qlength qualifier length
873    * @param timestamp version timestamp
874    * @param type key type
875    * @param value column value
876    * @param voffset value offset
877    * @param vlength value length
878    *
879    * @return The number of useful bytes in the buffer.
880    *
881    * @throws IllegalArgumentException an illegal value was passed or there is insufficient space
882    * remaining in the buffer
883    */
884   public static int writeByteArray(byte [] buffer, final int boffset,
885       final byte [] row, final int roffset, final int rlength,
886       final byte [] family, final int foffset, int flength,
887       final byte [] qualifier, final int qoffset, int qlength,
888       final long timestamp, final Type type,
889       final byte [] value, final int voffset, int vlength, Tag[] tags) {
890 
891     checkParameters(row, rlength, family, flength, qlength, vlength);
892 
893     // Calculate length of tags area
894     int tagsLength = 0;
895     if (tags != null && tags.length > 0) {
896       for (Tag t: tags) {
897         tagsLength += t.getLength();
898       }
899     }
900     checkForTagsLength(tagsLength);
901     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
902     int keyValueLength = (int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
903         tagsLength);
904     if (keyValueLength > buffer.length - boffset) {
905       throw new IllegalArgumentException("Buffer size " + (buffer.length - boffset) + " < " +
906           keyValueLength);
907     }
908 
909     // Write key, value and key row length.
910     int pos = boffset;
911     pos = Bytes.putInt(buffer, pos, keyLength);
912     pos = Bytes.putInt(buffer, pos, vlength);
913     pos = Bytes.putShort(buffer, pos, (short)(rlength & 0x0000ffff));
914     pos = Bytes.putBytes(buffer, pos, row, roffset, rlength);
915     pos = Bytes.putByte(buffer, pos, (byte) (flength & 0x0000ff));
916     if (flength != 0) {
917       pos = Bytes.putBytes(buffer, pos, family, foffset, flength);
918     }
919     if (qlength != 0) {
920       pos = Bytes.putBytes(buffer, pos, qualifier, qoffset, qlength);
921     }
922     pos = Bytes.putLong(buffer, pos, timestamp);
923     pos = Bytes.putByte(buffer, pos, type.getCode());
924     if (value != null && value.length > 0) {
925       pos = Bytes.putBytes(buffer, pos, value, voffset, vlength);
926     }
927     // Write the number of tags. If it is 0 then it means there are no tags.
928     if (tagsLength > 0) {
929       pos = Bytes.putAsShort(buffer, pos, tagsLength);
930       for (Tag t : tags) {
931         pos = Bytes.putBytes(buffer, pos, t.getBuffer(), t.getOffset(), t.getLength());
932       }
933     }
934     return keyValueLength;
935   }
936 
937   private static void checkForTagsLength(int tagsLength) {
938     if (tagsLength > MAX_TAGS_LENGTH) {
939       throw new IllegalArgumentException("tagslength "+ tagsLength + " > " + MAX_TAGS_LENGTH);
940     }
941   }
942 
943   /**
944    * Write KeyValue format into a byte array.
945    * @param row row key
946    * @param roffset row offset
947    * @param rlength row length
948    * @param family family name
949    * @param foffset family offset
950    * @param flength family length
951    * @param qualifier column qualifier
952    * @param qoffset qualifier offset
953    * @param qlength qualifier length
954    * @param timestamp version timestamp
955    * @param type key type
956    * @param value column value
957    * @param voffset value offset
958    * @param vlength value length
959    * @return The newly created byte array.
960    */
961   private static byte [] createByteArray(final byte [] row, final int roffset,
962       final int rlength, final byte [] family, final int foffset, int flength,
963       final byte [] qualifier, final int qoffset, int qlength,
964       final long timestamp, final Type type,
965       final byte [] value, final int voffset,
966       int vlength, byte[] tags, int tagsOffset, int tagsLength) {
967 
968     checkParameters(row, rlength, family, flength, qlength, vlength);
969     checkForTagsLength(tagsLength);
970     // Allocate right-sized byte array.
971     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
972     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
973       tagsLength)];
974     // Write key, value and key row length.
975     int pos = 0;
976     pos = Bytes.putInt(bytes, pos, keyLength);
977     pos = Bytes.putInt(bytes, pos, vlength);
978     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
979     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
980     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
981     if(flength != 0) {
982       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
983     }
984     if(qlength != 0) {
985       pos = Bytes.putBytes(bytes, pos, qualifier, qoffset, qlength);
986     }
987     pos = Bytes.putLong(bytes, pos, timestamp);
988     pos = Bytes.putByte(bytes, pos, type.getCode());
989     if (value != null && value.length > 0) {
990       pos = Bytes.putBytes(bytes, pos, value, voffset, vlength);
991     }
992     // Add the tags after the value part
993     if (tagsLength > 0) {
994       pos = Bytes.putAsShort(bytes, pos, tagsLength);
995       pos = Bytes.putBytes(bytes, pos, tags, tagsOffset, tagsLength);
996     }
997     return bytes;
998   }
999 
1000   /**
1001    * @param qualifier can be a ByteBuffer or a byte[], or null.
1002    * @param value can be a ByteBuffer or a byte[], or null.
1003    */
1004   private static byte [] createByteArray(final byte [] row, final int roffset,
1005       final int rlength, final byte [] family, final int foffset, int flength,
1006       final Object qualifier, final int qoffset, int qlength,
1007       final long timestamp, final Type type,
1008       final Object value, final int voffset, int vlength, List<Tag> tags) {
1009 
1010     checkParameters(row, rlength, family, flength, qlength, vlength);
1011 
1012     // Calculate length of tags area
1013     int tagsLength = 0;
1014     if (tags != null && !tags.isEmpty()) {
1015       for (Tag t : tags) {
1016         tagsLength += t.getLength();
1017       }
1018     }
1019     checkForTagsLength(tagsLength);
1020     // Allocate right-sized byte array.
1021     int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength);
1022     byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength,
1023         tagsLength)];
1024 
1025     // Write key, value and key row length.
1026     int pos = 0;
1027     pos = Bytes.putInt(bytes, pos, keyLength);
1028 
1029     pos = Bytes.putInt(bytes, pos, vlength);
1030     pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff));
1031     pos = Bytes.putBytes(bytes, pos, row, roffset, rlength);
1032     pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff));
1033     if(flength != 0) {
1034       pos = Bytes.putBytes(bytes, pos, family, foffset, flength);
1035     }
1036     if (qlength > 0) {
1037       if (qualifier instanceof ByteBuffer) {
1038         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) qualifier);
1039       } else {
1040         pos = Bytes.putBytes(bytes, pos, (byte[]) qualifier, qoffset, qlength);
1041       }
1042     }
1043     pos = Bytes.putLong(bytes, pos, timestamp);
1044     pos = Bytes.putByte(bytes, pos, type.getCode());
1045     if (vlength > 0) {
1046       if (value instanceof ByteBuffer) {
1047         pos = Bytes.putByteBuffer(bytes, pos, (ByteBuffer) value);
1048       } else {
1049         pos = Bytes.putBytes(bytes, pos, (byte[]) value, voffset, vlength);
1050       }
1051     }
1052     // Add the tags after the value part
1053     if (tagsLength > 0) {
1054       pos = Bytes.putAsShort(bytes, pos, tagsLength);
1055       for (Tag t : tags) {
1056         pos = Bytes.putBytes(bytes, pos, t.getBuffer(), t.getOffset(), t.getLength());
1057       }
1058     }
1059     return bytes;
1060   }
1061 
1062   /**
1063    * Needed doing 'contains' on List.  Only compares the key portion, not the value.
1064    */
1065   @Override
1066   public boolean equals(Object other) {
1067     if (!(other instanceof Cell)) {
1068       return false;
1069     }
1070     return CellUtil.equals(this, (Cell)other);
1071   }
1072 
1073   /**
1074    * In line with {@link #equals(Object)}, only uses the key portion, not the value.
1075    */
1076   @Override
1077   public int hashCode() {
1078     return calculateHashForKey(this);
1079   }
1080 
1081   private int calculateHashForKey(Cell cell) {
1082     // pre-calculate the 3 hashes made of byte ranges
1083     int rowHash = Bytes.hashCode(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
1084     int familyHash = Bytes.hashCode(cell.getFamilyArray(), cell.getFamilyOffset(),
1085         cell.getFamilyLength());
1086     int qualifierHash = Bytes.hashCode(cell.getQualifierArray(), cell.getQualifierOffset(),
1087         cell.getQualifierLength());
1088 
1089     // combine the 6 sub-hashes
1090     int hash = 31 * rowHash + familyHash;
1091     hash = 31 * hash + qualifierHash;
1092     hash = 31 * hash + (int) cell.getTimestamp();
1093     hash = 31 * hash + cell.getTypeByte();
1094     return hash;
1095   }
1096 
1097   //---------------------------------------------------------------------------
1098   //
1099   //  KeyValue cloning
1100   //
1101   //---------------------------------------------------------------------------
1102 
1103   /**
1104    * Clones a KeyValue.  This creates a copy, re-allocating the buffer.
1105    * @return Fully copied clone of this KeyValue
1106    * @throws CloneNotSupportedException
1107    */
1108   @Override
1109   public KeyValue clone() throws CloneNotSupportedException {
1110     super.clone();
1111     byte [] b = new byte[this.length];
1112     System.arraycopy(this.bytes, this.offset, b, 0, this.length);
1113     KeyValue ret = new KeyValue(b, 0, b.length);
1114     // Important to clone the memstoreTS as well - otherwise memstore's
1115     // update-in-place methods (eg increment) will end up creating
1116     // new entries
1117     ret.setSequenceId(seqId);
1118     return ret;
1119   }
1120 
1121   /**
1122    * Creates a shallow copy of this KeyValue, reusing the data byte buffer.
1123    * http://en.wikipedia.org/wiki/Object_copy
1124    * @return Shallow copy of this KeyValue
1125    */
1126   public KeyValue shallowCopy() {
1127     KeyValue shallowCopy = new KeyValue(this.bytes, this.offset, this.length);
1128     shallowCopy.setSequenceId(this.seqId);
1129     return shallowCopy;
1130   }
1131 
1132   //---------------------------------------------------------------------------
1133   //
1134   //  String representation
1135   //
1136   //---------------------------------------------------------------------------
1137 
1138   @Override
1139   public String toString() {
1140     if (this.bytes == null || this.bytes.length == 0) {
1141       return "empty";
1142     }
1143     return keyToString(this.bytes, this.offset + ROW_OFFSET, getKeyLength()) + "/vlen="
1144       + getValueLength() + "/seqid=" + seqId;
1145   }
1146 
1147   /**
1148    * @param k Key portion of a KeyValue.
1149    * @return Key as a String, empty string if k is null.
1150    */
1151   public static String keyToString(final byte [] k) {
1152     if (k == null) {
1153       return "";
1154     }
1155     return keyToString(k, 0, k.length);
1156   }
1157 
1158   /**
1159    * Produces a string map for this key/value pair. Useful for programmatic use
1160    * and manipulation of the data stored in an WALKey, for example, printing
1161    * as JSON. Values are left out due to their tendency to be large. If needed,
1162    * they can be added manually.
1163    *
1164    * @return the Map&lt;String,?&gt; containing data from this key
1165    */
1166   public Map<String, Object> toStringMap() {
1167     Map<String, Object> stringMap = new HashMap<String, Object>();
1168     stringMap.put("row", Bytes.toStringBinary(getRowArray(), getRowOffset(), getRowLength()));
1169     stringMap.put("family",
1170       Bytes.toStringBinary(getFamilyArray(), getFamilyOffset(), getFamilyLength()));
1171     stringMap.put("qualifier",
1172       Bytes.toStringBinary(getQualifierArray(), getQualifierOffset(), getQualifierLength()));
1173     stringMap.put("timestamp", getTimestamp());
1174     stringMap.put("vlen", getValueLength());
1175     List<Tag> tags = getTags();
1176     if (tags != null) {
1177       List<String> tagsString = new ArrayList<String>();
1178       for (Tag t : tags) {
1179         tagsString.add((t.getType()) + ":" +Bytes.toStringBinary(t.getValue()));
1180       }
1181       stringMap.put("tag", tagsString);
1182     }
1183     return stringMap;
1184   }
1185 
1186   /**
1187    * Use for logging.
1188    * @param b Key portion of a KeyValue.
1189    * @param o Offset to start of key
1190    * @param l Length of key.
1191    * @return Key as a String.
1192    */
1193   public static String keyToString(final byte [] b, final int o, final int l) {
1194     if (b == null) return "";
1195     int rowlength = Bytes.toShort(b, o);
1196     String row = Bytes.toStringBinary(b, o + Bytes.SIZEOF_SHORT, rowlength);
1197     int columnoffset = o + Bytes.SIZEOF_SHORT + 1 + rowlength;
1198     int familylength = b[columnoffset - 1];
1199     int columnlength = l - ((columnoffset - o) + TIMESTAMP_TYPE_SIZE);
1200     String family = familylength == 0? "":
1201       Bytes.toStringBinary(b, columnoffset, familylength);
1202     String qualifier = columnlength == 0? "":
1203       Bytes.toStringBinary(b, columnoffset + familylength,
1204       columnlength - familylength);
1205     long timestamp = Bytes.toLong(b, o + (l - TIMESTAMP_TYPE_SIZE));
1206     String timestampStr = humanReadableTimestamp(timestamp);
1207     byte type = b[o + l - 1];
1208     return row + "/" + family +
1209       (family != null && family.length() > 0? ":" :"") +
1210       qualifier + "/" + timestampStr + "/" + Type.codeToType(type);
1211   }
1212 
1213   public static String humanReadableTimestamp(final long timestamp) {
1214     if (timestamp == HConstants.LATEST_TIMESTAMP) {
1215       return "LATEST_TIMESTAMP";
1216     }
1217     if (timestamp == HConstants.OLDEST_TIMESTAMP) {
1218       return "OLDEST_TIMESTAMP";
1219     }
1220     return String.valueOf(timestamp);
1221   }
1222 
1223   //---------------------------------------------------------------------------
1224   //
1225   //  Public Member Accessors
1226   //
1227   //---------------------------------------------------------------------------
1228 
1229   /**
1230    * @return The byte array backing this KeyValue.
1231    * @deprecated Since 0.98.0.  Use Cell Interface instead.  Do not presume single backing buffer.
1232    */
1233   @Deprecated
1234   public byte [] getBuffer() {
1235     return this.bytes;
1236   }
1237 
1238   /**
1239    * @return Offset into {@link #getBuffer()} at which this KeyValue starts.
1240    */
1241   public int getOffset() {
1242     return this.offset;
1243   }
1244 
1245   /**
1246    * @return Length of bytes this KeyValue occupies in {@link #getBuffer()}.
1247    */
1248   public int getLength() {
1249     return length;
1250   }
1251 
1252   //---------------------------------------------------------------------------
1253   //
1254   //  Length and Offset Calculators
1255   //
1256   //---------------------------------------------------------------------------
1257 
1258   /**
1259    * Determines the total length of the KeyValue stored in the specified
1260    * byte array and offset.  Includes all headers.
1261    * @param bytes byte array
1262    * @param offset offset to start of the KeyValue
1263    * @return length of entire KeyValue, in bytes
1264    */
1265   private static int getLength(byte [] bytes, int offset) {
1266     int klength = ROW_OFFSET + Bytes.toInt(bytes, offset);
1267     int vlength = Bytes.toInt(bytes, offset + Bytes.SIZEOF_INT);
1268     return klength + vlength;
1269   }
1270 
1271   /**
1272    * @return Key offset in backing buffer..
1273    */
1274   public int getKeyOffset() {
1275     return this.offset + ROW_OFFSET;
1276   }
1277 
1278   public String getKeyString() {
1279     return Bytes.toStringBinary(getBuffer(), getKeyOffset(), getKeyLength());
1280   }
1281 
1282   /**
1283    * @return Length of key portion.
1284    */
1285   public int getKeyLength() {
1286     return Bytes.toInt(this.bytes, this.offset);
1287   }
1288 
1289   /**
1290    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1291    */
1292   @Override
1293   public byte[] getValueArray() {
1294     return bytes;
1295   }
1296 
1297   /**
1298    * @return the value offset
1299    */
1300   @Override
1301   public int getValueOffset() {
1302     int voffset = getKeyOffset() + getKeyLength();
1303     return voffset;
1304   }
1305 
1306   /**
1307    * @return Value length
1308    */
1309   @Override
1310   public int getValueLength() {
1311     int vlength = Bytes.toInt(this.bytes, this.offset + Bytes.SIZEOF_INT);
1312     return vlength;
1313   }
1314 
1315   /**
1316    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1317    */
1318   @Override
1319   public byte[] getRowArray() {
1320     return bytes;
1321   }
1322 
1323   /**
1324    * @return Row offset
1325    */
1326   @Override
1327   public int getRowOffset() {
1328     return this.offset + ROW_KEY_OFFSET;
1329   }
1330 
1331   /**
1332    * @return Row length
1333    */
1334   @Override
1335   public short getRowLength() {
1336     return Bytes.toShort(this.bytes, getKeyOffset());
1337   }
1338 
1339   /**
1340    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1341    */
1342   @Override
1343   public byte[] getFamilyArray() {
1344     return bytes;
1345   }
1346 
1347   /**
1348    * @return Family offset
1349    */
1350   @Override
1351   public int getFamilyOffset() {
1352     return getFamilyOffset(getRowLength());
1353   }
1354 
1355   /**
1356    * @return Family offset
1357    */
1358   private int getFamilyOffset(int rlength) {
1359     return this.offset + ROW_KEY_OFFSET + rlength + Bytes.SIZEOF_BYTE;
1360   }
1361 
1362   /**
1363    * @return Family length
1364    */
1365   @Override
1366   public byte getFamilyLength() {
1367     return getFamilyLength(getFamilyOffset());
1368   }
1369 
1370   /**
1371    * @return Family length
1372    */
1373   public byte getFamilyLength(int foffset) {
1374     return this.bytes[foffset-1];
1375   }
1376 
1377   /**
1378    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1379    */
1380   @Override
1381   public byte[] getQualifierArray() {
1382     return bytes;
1383   }
1384 
1385   /**
1386    * @return Qualifier offset
1387    */
1388   @Override
1389   public int getQualifierOffset() {
1390     return getQualifierOffset(getFamilyOffset());
1391   }
1392 
1393   /**
1394    * @return Qualifier offset
1395    */
1396   private int getQualifierOffset(int foffset) {
1397     return foffset + getFamilyLength(foffset);
1398   }
1399 
1400   /**
1401    * @return Qualifier length
1402    */
1403   @Override
1404   public int getQualifierLength() {
1405     return getQualifierLength(getRowLength(),getFamilyLength());
1406   }
1407 
1408   /**
1409    * @return Qualifier length
1410    */
1411   private int getQualifierLength(int rlength, int flength) {
1412     return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
1413   }
1414 
1415   /**
1416    * @return Timestamp offset
1417    */
1418   public int getTimestampOffset() {
1419     return getTimestampOffset(getKeyLength());
1420   }
1421 
1422   /**
1423    * @param keylength Pass if you have it to save on a int creation.
1424    * @return Timestamp offset
1425    */
1426   private int getTimestampOffset(final int keylength) {
1427     return getKeyOffset() + keylength - TIMESTAMP_TYPE_SIZE;
1428   }
1429 
1430   /**
1431    * @return True if this KeyValue has a LATEST_TIMESTAMP timestamp.
1432    */
1433   public boolean isLatestTimestamp() {
1434     return Bytes.equals(getBuffer(), getTimestampOffset(), Bytes.SIZEOF_LONG,
1435       HConstants.LATEST_TIMESTAMP_BYTES, 0, Bytes.SIZEOF_LONG);
1436   }
1437 
1438   /**
1439    * @param now Time to set into <code>this</code> IFF timestamp ==
1440    * {@link HConstants#LATEST_TIMESTAMP} (else, its a noop).
1441    * @return True is we modified this.
1442    */
1443   public boolean updateLatestStamp(final byte [] now) {
1444     if (this.isLatestTimestamp()) {
1445       int tsOffset = getTimestampOffset();
1446       System.arraycopy(now, 0, this.bytes, tsOffset, Bytes.SIZEOF_LONG);
1447       // clear cache or else getTimestamp() possibly returns an old value
1448       return true;
1449     }
1450     return false;
1451   }
1452 
1453   @Override
1454   public void setTimestamp(long ts) {
1455     Bytes.putBytes(this.bytes, this.getTimestampOffset(), Bytes.toBytes(ts), 0, Bytes.SIZEOF_LONG);
1456   }
1457 
1458   @Override
1459   public void setTimestamp(byte[] ts, int tsOffset) {
1460     Bytes.putBytes(this.bytes, this.getTimestampOffset(), ts, tsOffset, Bytes.SIZEOF_LONG);
1461   }
1462 
1463   //---------------------------------------------------------------------------
1464   //
1465   //  Methods that return copies of fields
1466   //
1467   //---------------------------------------------------------------------------
1468 
1469   /**
1470    * Do not use unless you have to. Used internally for compacting and testing. Use
1471    * {@link #getRowArray()}, {@link #getFamilyArray()}, {@link #getQualifierArray()}, and
1472    * {@link #getValueArray()} if accessing a KeyValue client-side.
1473    * @return Copy of the key portion only.
1474    */
1475   public byte [] getKey() {
1476     int keylength = getKeyLength();
1477     byte [] key = new byte[keylength];
1478     System.arraycopy(getBuffer(), getKeyOffset(), key, 0, keylength);
1479     return key;
1480   }
1481 
1482   /**
1483    *
1484    * @return Timestamp
1485    */
1486   @Override
1487   public long getTimestamp() {
1488     return getTimestamp(getKeyLength());
1489   }
1490 
1491   /**
1492    * @param keylength Pass if you have it to save on a int creation.
1493    * @return Timestamp
1494    */
1495   long getTimestamp(final int keylength) {
1496     int tsOffset = getTimestampOffset(keylength);
1497     return Bytes.toLong(this.bytes, tsOffset);
1498   }
1499 
1500   /**
1501    * @return Type of this KeyValue.
1502    */
1503   @Deprecated
1504   public byte getType() {
1505     return getTypeByte();
1506   }
1507 
1508   /**
1509    * @return KeyValue.TYPE byte representation
1510    */
1511   @Override
1512   public byte getTypeByte() {
1513     return this.bytes[this.offset + getKeyLength() - 1 + ROW_OFFSET];
1514   }
1515 
1516   /**
1517    * @return True if a delete type, a {@link KeyValue.Type#Delete} or
1518    * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
1519    * KeyValue type.
1520    */
1521   @Deprecated // use CellUtil#isDelete
1522   public boolean isDelete() {
1523     return KeyValue.isDelete(getType());
1524   }
1525 
1526   /**
1527    * This returns the offset where the tag actually starts.
1528    */
1529   @Override
1530   public int getTagsOffset() {
1531     int tagsLen = getTagsLength();
1532     if (tagsLen == 0) {
1533       return this.offset + this.length;
1534     }
1535     return this.offset + this.length - tagsLen;
1536   }
1537 
1538   /**
1539    * This returns the total length of the tag bytes
1540    */
1541   @Override
1542   public int getTagsLength() {
1543     int tagsLen = this.length - (getKeyLength() + getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE);
1544     if (tagsLen > 0) {
1545       // There are some Tag bytes in the byte[]. So reduce 2 bytes which is added to denote the tags
1546       // length
1547       tagsLen -= TAGS_LENGTH_SIZE;
1548     }
1549     return tagsLen;
1550   }
1551 
1552   /**
1553    * Returns any tags embedded in the KeyValue.  Used in testcases.
1554    * @return The tags
1555    */
1556   public List<Tag> getTags() {
1557     int tagsLength = getTagsLength();
1558     if (tagsLength == 0) {
1559       return EMPTY_ARRAY_LIST;
1560     }
1561     return Tag.asList(getTagsArray(), getTagsOffset(), tagsLength);
1562   }
1563 
1564   /**
1565    * @return the backing array of the entire KeyValue (all KeyValue fields are in a single array)
1566    */
1567   @Override
1568   public byte[] getTagsArray() {
1569     return bytes;
1570   }
1571 
1572   /**
1573    * Creates a new KeyValue that only contains the key portion (the value is
1574    * set to be null).
1575    *
1576    * TODO only used by KeyOnlyFilter -- move there.
1577    * @param lenAsVal replace value with the actual value length (false=empty)
1578    */
1579   public KeyValue createKeyOnly(boolean lenAsVal) {
1580     // KV format:  <keylen:4><valuelen:4><key:keylen><value:valuelen>
1581     // Rebuild as: <keylen:4><0:4><key:keylen>
1582     int dataLen = lenAsVal? Bytes.SIZEOF_INT : 0;
1583     byte [] newBuffer = new byte[getKeyLength() + ROW_OFFSET + dataLen];
1584     System.arraycopy(this.bytes, this.offset, newBuffer, 0,
1585         Math.min(newBuffer.length,this.length));
1586     Bytes.putInt(newBuffer, Bytes.SIZEOF_INT, dataLen);
1587     if (lenAsVal) {
1588       Bytes.putInt(newBuffer, newBuffer.length - dataLen, this.getValueLength());
1589     }
1590     return new KeyValue(newBuffer);
1591   }
1592 
1593   /**
1594    * Splits a column in {@code family:qualifier} form into separate byte arrays. An empty qualifier
1595    * (ie, {@code fam:}) is parsed as <code>{ fam, EMPTY_BYTE_ARRAY }</code> while no delimiter (ie,
1596    * {@code fam}) is parsed as an array of one element, <code>{ fam }</code>.
1597    * <p>
1598    * Don't forget, HBase DOES support empty qualifiers. (see HBASE-9549)
1599    * </p>
1600    * <p>
1601    * Not recommend to be used as this is old-style API.
1602    * </p>
1603    * @param c The column.
1604    * @return The parsed column.
1605    */
1606   public static byte [][] parseColumn(byte [] c) {
1607     final int index = getDelimiter(c, 0, c.length, COLUMN_FAMILY_DELIMITER);
1608     if (index == -1) {
1609       // If no delimiter, return array of size 1
1610       return new byte [][] { c };
1611     } else if(index == c.length - 1) {
1612       // family with empty qualifier, return array size 2
1613       byte [] family = new byte[c.length-1];
1614       System.arraycopy(c, 0, family, 0, family.length);
1615       return new byte [][] { family, HConstants.EMPTY_BYTE_ARRAY};
1616     }
1617     // Family and column, return array size 2
1618     final byte [][] result = new byte [2][];
1619     result[0] = new byte [index];
1620     System.arraycopy(c, 0, result[0], 0, index);
1621     final int len = c.length - (index + 1);
1622     result[1] = new byte[len];
1623     System.arraycopy(c, index + 1 /* Skip delimiter */, result[1], 0, len);
1624     return result;
1625   }
1626 
1627   /**
1628    * Makes a column in family:qualifier form from separate byte arrays.
1629    * <p>
1630    * Not recommended for usage as this is old-style API.
1631    * @param family
1632    * @param qualifier
1633    * @return family:qualifier
1634    */
1635   public static byte [] makeColumn(byte [] family, byte [] qualifier) {
1636     return Bytes.add(family, COLUMN_FAMILY_DELIM_ARRAY, qualifier);
1637   }
1638 
1639   /**
1640    * @param b
1641    * @param delimiter
1642    * @return Index of delimiter having started from start of <code>b</code>
1643    * moving rightward.
1644    */
1645   public static int getDelimiter(final byte [] b, int offset, final int length,
1646       final int delimiter) {
1647     if (b == null) {
1648       throw new IllegalArgumentException("Passed buffer is null");
1649     }
1650     int result = -1;
1651     for (int i = offset; i < length + offset; i++) {
1652       if (b[i] == delimiter) {
1653         result = i;
1654         break;
1655       }
1656     }
1657     return result;
1658   }
1659 
1660   /**
1661    * Find index of passed delimiter walking from end of buffer backwards.
1662    * @param b
1663    * @param delimiter
1664    * @return Index of delimiter
1665    */
1666   public static int getDelimiterInReverse(final byte [] b, final int offset,
1667       final int length, final int delimiter) {
1668     if (b == null) {
1669       throw new IllegalArgumentException("Passed buffer is null");
1670     }
1671     int result = -1;
1672     for (int i = (offset + length) - 1; i >= offset; i--) {
1673       if (b[i] == delimiter) {
1674         result = i;
1675         break;
1676       }
1677     }
1678     return result;
1679   }
1680 
1681   /**
1682    * A {@link KVComparator} for <code>hbase:meta</code> catalog table
1683    * {@link KeyValue}s.
1684    * @deprecated : {@link CellComparator#META_COMPARATOR} to be used
1685    */
1686   @Deprecated
1687   public static class MetaComparator extends KVComparator {
1688     /**
1689      * Compare key portion of a {@link KeyValue} for keys in <code>hbase:meta</code>
1690      * table.
1691      */
1692     @Override
1693     public int compare(final Cell left, final Cell right) {
1694       return CellComparator.META_COMPARATOR.compareKeyIgnoresMvcc(left, right);
1695     }
1696 
1697     @Override
1698     public int compareOnlyKeyPortion(Cell left, Cell right) {
1699       return compare(left, right);
1700     }
1701 
1702     @Override
1703     public int compareRows(byte [] left, int loffset, int llength,
1704         byte [] right, int roffset, int rlength) {
1705       int leftDelimiter = getDelimiter(left, loffset, llength,
1706           HConstants.DELIMITER);
1707       int rightDelimiter = getDelimiter(right, roffset, rlength,
1708           HConstants.DELIMITER);
1709       // Compare up to the delimiter
1710       int lpart = (leftDelimiter < 0 ? llength :leftDelimiter - loffset);
1711       int rpart = (rightDelimiter < 0 ? rlength :rightDelimiter - roffset);
1712       int result = Bytes.compareTo(left, loffset, lpart, right, roffset, rpart);
1713       if (result != 0) {
1714         return result;
1715       } else {
1716         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1717           return -1;
1718         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1719           return 1;
1720         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1721           return 0;
1722         }
1723       }
1724       // Compare middle bit of the row.
1725       // Move past delimiter
1726       leftDelimiter++;
1727       rightDelimiter++;
1728       int leftFarDelimiter = getDelimiterInReverse(left, leftDelimiter,
1729           llength - (leftDelimiter - loffset), HConstants.DELIMITER);
1730       int rightFarDelimiter = getDelimiterInReverse(right,
1731           rightDelimiter, rlength - (rightDelimiter - roffset),
1732           HConstants.DELIMITER);
1733       // Now compare middlesection of row.
1734       lpart = (leftFarDelimiter < 0 ? llength + loffset: leftFarDelimiter) - leftDelimiter;
1735       rpart = (rightFarDelimiter < 0 ? rlength + roffset: rightFarDelimiter)- rightDelimiter;
1736       result = super.compareRows(left, leftDelimiter, lpart, right, rightDelimiter, rpart);
1737       if (result != 0) {
1738         return result;
1739       }  else {
1740         if (leftDelimiter < 0 && rightDelimiter >= 0) {
1741           return -1;
1742         } else if (rightDelimiter < 0 && leftDelimiter >= 0) {
1743           return 1;
1744         } else if (leftDelimiter < 0 && rightDelimiter < 0) {
1745           return 0;
1746         }
1747       }
1748       // Compare last part of row, the rowid.
1749       leftFarDelimiter++;
1750       rightFarDelimiter++;
1751       result = Bytes.compareTo(left, leftFarDelimiter, llength - (leftFarDelimiter - loffset),
1752           right, rightFarDelimiter, rlength - (rightFarDelimiter - roffset));
1753       return result;
1754     }
1755 
1756     /**
1757      * Don't do any fancy Block Index splitting tricks.
1758      */
1759     @Override
1760     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
1761       return Arrays.copyOf(rightKey, rightKey.length);
1762     }
1763 
1764     /**
1765      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1766      * instantiate the appropriate comparator.
1767      * TODO: With V3 consider removing this.
1768      * @return legacy class name for FileFileTrailer#comparatorClassName
1769      */
1770     @Override
1771     public String getLegacyKeyComparatorName() {
1772       return "org.apache.hadoop.hbase.KeyValue$MetaKeyComparator";
1773     }
1774 
1775     @Override
1776     protected Object clone() throws CloneNotSupportedException {
1777       return new MetaComparator();
1778     }
1779 
1780     /**
1781      * Override the row key comparison to parse and compare the meta row key parts.
1782      */
1783     @Override
1784     protected int compareRowKey(final Cell l, final Cell r) {
1785       byte[] left = l.getRowArray();
1786       int loffset = l.getRowOffset();
1787       int llength = l.getRowLength();
1788       byte[] right = r.getRowArray();
1789       int roffset = r.getRowOffset();
1790       int rlength = r.getRowLength();
1791       return compareRows(left, loffset, llength, right, roffset, rlength);
1792     }
1793   }
1794 
1795   /**
1796    * Compare KeyValues.  When we compare KeyValues, we only compare the Key
1797    * portion.  This means two KeyValues with same Key but different Values are
1798    * considered the same as far as this Comparator is concerned.
1799    * @deprecated : Use {@link CellComparator}.
1800    */
1801   @Deprecated
1802   public static class KVComparator implements RawComparator<Cell>, SamePrefixComparator<byte[]> {
1803 
1804     /**
1805      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
1806      * instantiate the appropriate comparator.
1807      * TODO: With V3 consider removing this.
1808      * @return legacy class name for FileFileTrailer#comparatorClassName
1809      */
1810     public String getLegacyKeyComparatorName() {
1811       return "org.apache.hadoop.hbase.KeyValue$KeyComparator";
1812     }
1813 
1814     @Override // RawComparator
1815     public int compare(byte[] l, int loff, int llen, byte[] r, int roff, int rlen) {
1816       return compareFlatKey(l,loff,llen, r,roff,rlen);
1817     }
1818 
1819 
1820     /**
1821      * Compares the only the user specified portion of a Key.  This is overridden by MetaComparator.
1822      * @param left
1823      * @param right
1824      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
1825      */
1826     protected int compareRowKey(final Cell left, final Cell right) {
1827       return CellComparator.COMPARATOR.compareRows(left, right);
1828     }
1829 
1830     /**
1831      * Compares left to right assuming that left,loffset,llength and right,roffset,rlength are
1832      * full KVs laid out in a flat byte[]s.
1833      * @param left
1834      * @param loffset
1835      * @param llength
1836      * @param right
1837      * @param roffset
1838      * @param rlength
1839      * @return  0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
1840      */
1841     public int compareFlatKey(byte[] left, int loffset, int llength,
1842         byte[] right, int roffset, int rlength) {
1843       // Compare row
1844       short lrowlength = Bytes.toShort(left, loffset);
1845       short rrowlength = Bytes.toShort(right, roffset);
1846       int compare = compareRows(left, loffset + Bytes.SIZEOF_SHORT,
1847           lrowlength, right, roffset + Bytes.SIZEOF_SHORT, rrowlength);
1848       if (compare != 0) {
1849         return compare;
1850       }
1851 
1852       // Compare the rest of the two KVs without making any assumptions about
1853       // the common prefix. This function will not compare rows anyway, so we
1854       // don't need to tell it that the common prefix includes the row.
1855       return compareWithoutRow(0, left, loffset, llength, right, roffset,
1856           rlength, rrowlength);
1857     }
1858 
1859     public int compareFlatKey(byte[] left, byte[] right) {
1860       return compareFlatKey(left, 0, left.length, right, 0, right.length);
1861     }
1862 
1863     // compare a key against row/fam/qual/ts/type
1864     public int compareKey(Cell cell,
1865         byte[] row, int roff, int rlen,
1866         byte[] fam, int foff, int flen,
1867         byte[] col, int coff, int clen,
1868         long ts, byte type) {
1869 
1870       int compare = compareRows(
1871         cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
1872         row, roff, rlen);
1873       if (compare != 0) {
1874         return compare;
1875       }
1876       // If the column is not specified, the "minimum" key type appears the
1877       // latest in the sorted order, regardless of the timestamp. This is used
1878       // for specifying the last key/value in a given row, because there is no
1879       // "lexicographically last column" (it would be infinitely long). The
1880       // "maximum" key type does not need this behavior.
1881       if (cell.getFamilyLength() + cell.getQualifierLength() == 0
1882           && cell.getTypeByte() == Type.Minimum.getCode()) {
1883         // left is "bigger", i.e. it appears later in the sorted order
1884         return 1;
1885       }
1886       if (flen+clen == 0 && type == Type.Minimum.getCode()) {
1887         return -1;
1888       }
1889 
1890       compare = compareFamilies(
1891         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
1892         fam, foff, flen);
1893       if (compare != 0) {
1894         return compare;
1895       }
1896       compare = compareColumns(
1897         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
1898         col, coff, clen);
1899       if (compare != 0) {
1900         return compare;
1901       }
1902       // Next compare timestamps.
1903       compare = compareTimestamps(cell.getTimestamp(), ts);
1904       if (compare != 0) {
1905         return compare;
1906       }
1907 
1908       // Compare types. Let the delete types sort ahead of puts; i.e. types
1909       // of higher numbers sort before those of lesser numbers. Maximum (255)
1910       // appears ahead of everything, and minimum (0) appears after
1911       // everything.
1912       return (0xff & type) - (0xff & cell.getTypeByte());
1913     }
1914 
1915     public int compareOnlyKeyPortion(Cell left, Cell right) {
1916       return CellComparator.COMPARATOR.compareKeyIgnoresMvcc(left, right);
1917     }
1918 
1919     /**
1920      * Compares the Key of a cell -- with fields being more significant in this order:
1921      * rowkey, colfam/qual, timestamp, type, mvcc
1922      */
1923     @Override
1924     public int compare(final Cell left, final Cell right) {
1925       int compare = CellComparator.COMPARATOR.compare(left, right);
1926       return compare;
1927     }
1928 
1929     public int compareTimestamps(final Cell left, final Cell right) {
1930       return CellComparator.compareTimestamps(left, right);
1931     }
1932 
1933     /**
1934      * @param left
1935      * @param right
1936      * @return Result comparing rows.
1937      */
1938     public int compareRows(final Cell left, final Cell right) {
1939       return compareRows(left.getRowArray(),left.getRowOffset(), left.getRowLength(),
1940       right.getRowArray(), right.getRowOffset(), right.getRowLength());
1941     }
1942 
1943     /**
1944      * Get the b[],o,l for left and right rowkey portions and compare.
1945      * @param left
1946      * @param loffset
1947      * @param llength
1948      * @param right
1949      * @param roffset
1950      * @param rlength
1951      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
1952      */
1953     public int compareRows(byte [] left, int loffset, int llength,
1954         byte [] right, int roffset, int rlength) {
1955       return Bytes.compareTo(left, loffset, llength, right, roffset, rlength);
1956     }
1957 
1958     int compareColumns(final Cell left, final short lrowlength, final Cell right,
1959         final short rrowlength) {
1960       return CellComparator.compareColumns(left, right);
1961     }
1962 
1963     protected int compareColumns(
1964         byte [] left, int loffset, int llength, final int lfamilylength,
1965         byte [] right, int roffset, int rlength, final int rfamilylength) {
1966       // Compare family portion first.
1967       int diff = Bytes.compareTo(left, loffset, lfamilylength,
1968         right, roffset, rfamilylength);
1969       if (diff != 0) {
1970         return diff;
1971       }
1972       // Compare qualifier portion
1973       return Bytes.compareTo(left, loffset + lfamilylength,
1974         llength - lfamilylength,
1975         right, roffset + rfamilylength, rlength - rfamilylength);
1976       }
1977 
1978     static int compareTimestamps(final long ltimestamp, final long rtimestamp) {
1979       // The below older timestamps sorting ahead of newer timestamps looks
1980       // wrong but it is intentional. This way, newer timestamps are first
1981       // found when we iterate over a memstore and newer versions are the
1982       // first we trip over when reading from a store file.
1983       if (ltimestamp < rtimestamp) {
1984         return 1;
1985       } else if (ltimestamp > rtimestamp) {
1986         return -1;
1987       }
1988       return 0;
1989     }
1990 
1991     /**
1992      * Overridden
1993      * @param commonPrefix
1994      * @param left
1995      * @param loffset
1996      * @param llength
1997      * @param right
1998      * @param roffset
1999      * @param rlength
2000      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
2001      */
2002     @Override // SamePrefixComparator
2003     public int compareIgnoringPrefix(int commonPrefix, byte[] left,
2004         int loffset, int llength, byte[] right, int roffset, int rlength) {
2005       // Compare row
2006       short lrowlength = Bytes.toShort(left, loffset);
2007       short rrowlength;
2008 
2009       int comparisonResult = 0;
2010       if (commonPrefix < ROW_LENGTH_SIZE) {
2011         // almost nothing in common
2012         rrowlength = Bytes.toShort(right, roffset);
2013         comparisonResult = compareRows(left, loffset + ROW_LENGTH_SIZE,
2014             lrowlength, right, roffset + ROW_LENGTH_SIZE, rrowlength);
2015       } else { // the row length is the same
2016         rrowlength = lrowlength;
2017         if (commonPrefix < ROW_LENGTH_SIZE + rrowlength) {
2018           // The rows are not the same. Exclude the common prefix and compare
2019           // the rest of the two rows.
2020           int common = commonPrefix - ROW_LENGTH_SIZE;
2021           comparisonResult = compareRows(
2022               left, loffset + common + ROW_LENGTH_SIZE, lrowlength - common,
2023               right, roffset + common + ROW_LENGTH_SIZE, rrowlength - common);
2024         }
2025       }
2026       if (comparisonResult != 0) {
2027         return comparisonResult;
2028       }
2029 
2030       assert lrowlength == rrowlength;
2031       return compareWithoutRow(commonPrefix, left, loffset, llength, right,
2032           roffset, rlength, lrowlength);
2033     }
2034 
2035     /**
2036      * Compare columnFamily, qualifier, timestamp, and key type (everything
2037      * except the row). This method is used both in the normal comparator and
2038      * the "same-prefix" comparator. Note that we are assuming that row portions
2039      * of both KVs have already been parsed and found identical, and we don't
2040      * validate that assumption here.
2041      * @param commonPrefix
2042      *          the length of the common prefix of the two key-values being
2043      *          compared, including row length and row
2044      */
2045     private int compareWithoutRow(int commonPrefix, byte[] left, int loffset,
2046         int llength, byte[] right, int roffset, int rlength, short rowlength) {
2047       /***
2048        * KeyValue Format and commonLength:
2049        * |_keyLen_|_valLen_|_rowLen_|_rowKey_|_famiLen_|_fami_|_Quali_|....
2050        * ------------------|-------commonLength--------|--------------
2051        */
2052       int commonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rowlength;
2053 
2054       // commonLength + TIMESTAMP_TYPE_SIZE
2055       int commonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + commonLength;
2056       // ColumnFamily + Qualifier length.
2057       int lcolumnlength = llength - commonLengthWithTSAndType;
2058       int rcolumnlength = rlength - commonLengthWithTSAndType;
2059 
2060       byte ltype = left[loffset + (llength - 1)];
2061       byte rtype = right[roffset + (rlength - 1)];
2062 
2063       // If the column is not specified, the "minimum" key type appears the
2064       // latest in the sorted order, regardless of the timestamp. This is used
2065       // for specifying the last key/value in a given row, because there is no
2066       // "lexicographically last column" (it would be infinitely long). The
2067       // "maximum" key type does not need this behavior.
2068       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
2069         // left is "bigger", i.e. it appears later in the sorted order
2070         return 1;
2071       }
2072       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
2073         return -1;
2074       }
2075 
2076       int lfamilyoffset = commonLength + loffset;
2077       int rfamilyoffset = commonLength + roffset;
2078 
2079       // Column family length.
2080       int lfamilylength = left[lfamilyoffset - 1];
2081       int rfamilylength = right[rfamilyoffset - 1];
2082       // If left family size is not equal to right family size, we need not
2083       // compare the qualifiers.
2084       boolean sameFamilySize = (lfamilylength == rfamilylength);
2085       int common = 0;
2086       if (commonPrefix > 0) {
2087         common = Math.max(0, commonPrefix - commonLength);
2088         if (!sameFamilySize) {
2089           // Common should not be larger than Math.min(lfamilylength,
2090           // rfamilylength).
2091           common = Math.min(common, Math.min(lfamilylength, rfamilylength));
2092         } else {
2093           common = Math.min(common, Math.min(lcolumnlength, rcolumnlength));
2094         }
2095       }
2096       if (!sameFamilySize) {
2097         // comparing column family is enough.
2098         return Bytes.compareTo(left, lfamilyoffset + common, lfamilylength
2099             - common, right, rfamilyoffset + common, rfamilylength - common);
2100       }
2101       // Compare family & qualifier together.
2102       final int comparison = Bytes.compareTo(left, lfamilyoffset + common,
2103           lcolumnlength - common, right, rfamilyoffset + common,
2104           rcolumnlength - common);
2105       if (comparison != 0) {
2106         return comparison;
2107       }
2108 
2109       ////
2110       // Next compare timestamps.
2111       long ltimestamp = Bytes.toLong(left,
2112           loffset + (llength - TIMESTAMP_TYPE_SIZE));
2113       long rtimestamp = Bytes.toLong(right,
2114           roffset + (rlength - TIMESTAMP_TYPE_SIZE));
2115       int compare = compareTimestamps(ltimestamp, rtimestamp);
2116       if (compare != 0) {
2117         return compare;
2118       }
2119 
2120       // Compare types. Let the delete types sort ahead of puts; i.e. types
2121       // of higher numbers sort before those of lesser numbers. Maximum (255)
2122       // appears ahead of everything, and minimum (0) appears after
2123       // everything.
2124       return (0xff & rtype) - (0xff & ltype);
2125     }
2126 
2127     protected int compareFamilies(final byte[] left, final int loffset, final int lfamilylength,
2128         final byte[] right, final int roffset, final int rfamilylength) {
2129       int diff = Bytes.compareTo(left, loffset, lfamilylength, right, roffset, rfamilylength);
2130       return diff;
2131     }
2132 
2133     protected int compareColumns(final byte[] left, final int loffset, final int lquallength,
2134         final byte[] right, final int roffset, final int rquallength) {
2135       int diff = Bytes.compareTo(left, loffset, lquallength, right, roffset, rquallength);
2136       return diff;
2137     }
2138     /**
2139      * Compares the row and column of two keyvalues for equality
2140      * @param left
2141      * @param right
2142      * @return True if same row and column.
2143      */
2144     public boolean matchingRowColumn(final Cell left,
2145         final Cell right) {
2146       short lrowlength = left.getRowLength();
2147       short rrowlength = right.getRowLength();
2148 
2149       // TsOffset = end of column data. just comparing Row+CF length of each
2150       if ((left.getRowLength() + left.getFamilyLength() + left.getQualifierLength()) != (right
2151           .getRowLength() + right.getFamilyLength() + right.getQualifierLength())) {
2152         return false;
2153       }
2154 
2155       if (!matchingRows(left, lrowlength, right, rrowlength)) {
2156         return false;
2157       }
2158 
2159       int lfoffset = left.getFamilyOffset();
2160       int rfoffset = right.getFamilyOffset();
2161       int lclength = left.getQualifierLength();
2162       int rclength = right.getQualifierLength();
2163       int lfamilylength = left.getFamilyLength();
2164       int rfamilylength = right.getFamilyLength();
2165       int diff = compareFamilies(left.getFamilyArray(), lfoffset, lfamilylength,
2166           right.getFamilyArray(), rfoffset, rfamilylength);
2167       if (diff != 0) {
2168         return false;
2169       } else {
2170         diff = compareColumns(left.getQualifierArray(), left.getQualifierOffset(), lclength,
2171             right.getQualifierArray(), right.getQualifierOffset(), rclength);
2172         return diff == 0;
2173       }
2174     }
2175 
2176     /**
2177      * Compares the row of two keyvalues for equality
2178      * @param left
2179      * @param right
2180      * @return True if rows match.
2181      */
2182     public boolean matchingRows(final Cell left, final Cell right) {
2183       short lrowlength = left.getRowLength();
2184       short rrowlength = right.getRowLength();
2185       return matchingRows(left, lrowlength, right, rrowlength);
2186     }
2187 
2188     /**
2189      * @param left
2190      * @param lrowlength
2191      * @param right
2192      * @param rrowlength
2193      * @return True if rows match.
2194      */
2195     private boolean matchingRows(final Cell left, final short lrowlength,
2196         final Cell right, final short rrowlength) {
2197       return lrowlength == rrowlength &&
2198           matchingRows(left.getRowArray(), left.getRowOffset(), lrowlength,
2199               right.getRowArray(), right.getRowOffset(), rrowlength);
2200     }
2201 
2202     /**
2203      * Compare rows. Just calls Bytes.equals, but it's good to have this encapsulated.
2204      * @param left Left row array.
2205      * @param loffset Left row offset.
2206      * @param llength Left row length.
2207      * @param right Right row array.
2208      * @param roffset Right row offset.
2209      * @param rlength Right row length.
2210      * @return Whether rows are the same row.
2211      */
2212     public boolean matchingRows(final byte [] left, final int loffset, final int llength,
2213         final byte [] right, final int roffset, final int rlength) {
2214       return Bytes.equals(left, loffset, llength, right, roffset, rlength);
2215     }
2216 
2217     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2218       byte[] fakeKey = getShortMidpointKey(lastKeyOfPreviousBlock, firstKeyInBlock);
2219       if (compareFlatKey(fakeKey, firstKeyInBlock) > 0) {
2220         LOG.error("Unexpected getShortMidpointKey result, fakeKey:"
2221             + Bytes.toStringBinary(fakeKey) + ", firstKeyInBlock:"
2222             + Bytes.toStringBinary(firstKeyInBlock));
2223         return firstKeyInBlock;
2224       }
2225       if (lastKeyOfPreviousBlock != null && compareFlatKey(lastKeyOfPreviousBlock, fakeKey) >= 0) {
2226         LOG.error("Unexpected getShortMidpointKey result, lastKeyOfPreviousBlock:" +
2227             Bytes.toStringBinary(lastKeyOfPreviousBlock) + ", fakeKey:" +
2228             Bytes.toStringBinary(fakeKey));
2229         return firstKeyInBlock;
2230       }
2231       return fakeKey;
2232     }
2233 
2234     /**
2235      * This is a HFile block index key optimization.
2236      * @param leftKey
2237      * @param rightKey
2238      * @return 0 if equal, &lt;0 if left smaller, &gt;0 if right smaller
2239      * @deprecated Since 0.99.2;
2240      */
2241     @Deprecated
2242     public byte[] getShortMidpointKey(final byte[] leftKey, final byte[] rightKey) {
2243       if (rightKey == null) {
2244         throw new IllegalArgumentException("rightKey can not be null");
2245       }
2246       if (leftKey == null) {
2247         return Arrays.copyOf(rightKey, rightKey.length);
2248       }
2249       if (compareFlatKey(leftKey, rightKey) >= 0) {
2250         throw new IllegalArgumentException("Unexpected input, leftKey:" + Bytes.toString(leftKey)
2251           + ", rightKey:" + Bytes.toString(rightKey));
2252       }
2253 
2254       short leftRowLength = Bytes.toShort(leftKey, 0);
2255       short rightRowLength = Bytes.toShort(rightKey, 0);
2256       int leftCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + leftRowLength;
2257       int rightCommonLength = ROW_LENGTH_SIZE + FAMILY_LENGTH_SIZE + rightRowLength;
2258       int leftCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + leftCommonLength;
2259       int rightCommonLengthWithTSAndType = TIMESTAMP_TYPE_SIZE + rightCommonLength;
2260       int leftColumnLength = leftKey.length - leftCommonLengthWithTSAndType;
2261       int rightColumnLength = rightKey.length - rightCommonLengthWithTSAndType;
2262       // rows are equal
2263       if (leftRowLength == rightRowLength && compareRows(leftKey, ROW_LENGTH_SIZE, leftRowLength,
2264         rightKey, ROW_LENGTH_SIZE, rightRowLength) == 0) {
2265         // Compare family & qualifier together.
2266         int comparison = Bytes.compareTo(leftKey, leftCommonLength, leftColumnLength, rightKey,
2267           rightCommonLength, rightColumnLength);
2268         // same with "row + family + qualifier", return rightKey directly
2269         if (comparison == 0) {
2270           return Arrays.copyOf(rightKey, rightKey.length);
2271         }
2272         // "family + qualifier" are different, generate a faked key per rightKey
2273         byte[] newKey = Arrays.copyOf(rightKey, rightKey.length);
2274         Bytes.putLong(newKey, rightKey.length - TIMESTAMP_TYPE_SIZE, HConstants.LATEST_TIMESTAMP);
2275         Bytes.putByte(newKey, rightKey.length - TYPE_SIZE, Type.Maximum.getCode());
2276         return newKey;
2277       }
2278       // rows are different
2279       short minLength = leftRowLength < rightRowLength ? leftRowLength : rightRowLength;
2280       short diffIdx = 0;
2281       while (diffIdx < minLength
2282           && leftKey[ROW_LENGTH_SIZE + diffIdx] == rightKey[ROW_LENGTH_SIZE + diffIdx]) {
2283         diffIdx++;
2284       }
2285       byte[] newRowKey = null;
2286       if (diffIdx >= minLength) {
2287         // leftKey's row is prefix of rightKey's.
2288         newRowKey = new byte[diffIdx + 1];
2289         System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2290       } else {
2291         int diffByte = leftKey[ROW_LENGTH_SIZE + diffIdx];
2292         if ((0xff & diffByte) < 0xff && (diffByte + 1) <
2293             (rightKey[ROW_LENGTH_SIZE + diffIdx] & 0xff)) {
2294           newRowKey = new byte[diffIdx + 1];
2295           System.arraycopy(leftKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx);
2296           newRowKey[diffIdx] = (byte) (diffByte + 1);
2297         } else {
2298           newRowKey = new byte[diffIdx + 1];
2299           System.arraycopy(rightKey, ROW_LENGTH_SIZE, newRowKey, 0, diffIdx + 1);
2300         }
2301       }
2302       return new KeyValue(newRowKey, null, null, HConstants.LATEST_TIMESTAMP,
2303         Type.Maximum).getKey();
2304     }
2305 
2306     @Override
2307     protected Object clone() throws CloneNotSupportedException {
2308       super.clone();
2309       return new KVComparator();
2310     }
2311 
2312   }
2313 
2314   /**
2315    * @param b
2316    * @return A KeyValue made of a byte array that holds the key-only part.
2317    * Needed to convert hfile index members to KeyValues.
2318    */
2319   public static KeyValue createKeyValueFromKey(final byte [] b) {
2320     return createKeyValueFromKey(b, 0, b.length);
2321   }
2322 
2323   /**
2324    * @param bb
2325    * @return A KeyValue made of a byte buffer that holds the key-only part.
2326    * Needed to convert hfile index members to KeyValues.
2327    */
2328   public static KeyValue createKeyValueFromKey(final ByteBuffer bb) {
2329     return createKeyValueFromKey(bb.array(), bb.arrayOffset(), bb.limit());
2330   }
2331 
2332   /**
2333    * @param b
2334    * @param o
2335    * @param l
2336    * @return A KeyValue made of a byte array that holds the key-only part.
2337    * Needed to convert hfile index members to KeyValues.
2338    */
2339   public static KeyValue createKeyValueFromKey(final byte [] b, final int o,
2340       final int l) {
2341     byte [] newb = new byte[l + ROW_OFFSET];
2342     System.arraycopy(b, o, newb, ROW_OFFSET, l);
2343     Bytes.putInt(newb, 0, l);
2344     Bytes.putInt(newb, Bytes.SIZEOF_INT, 0);
2345     return new KeyValue(newb);
2346   }
2347 
2348   /**
2349    * @param in Where to read bytes from.  Creates a byte array to hold the KeyValue
2350    * backing bytes copied from the steam.
2351    * @return KeyValue created by deserializing from <code>in</code> OR if we find a length
2352    * of zero, we will return null which can be useful marking a stream as done.
2353    * @throws IOException
2354    */
2355   public static KeyValue create(final DataInput in) throws IOException {
2356     return create(in.readInt(), in);
2357   }
2358 
2359   /**
2360    * Create a KeyValue reading <code>length</code> from <code>in</code>
2361    * @param length
2362    * @param in
2363    * @return Created KeyValue OR if we find a length of zero, we will return null which
2364    * can be useful marking a stream as done.
2365    * @throws IOException
2366    */
2367   public static KeyValue create(int length, final DataInput in) throws IOException {
2368 
2369     if (length <= 0) {
2370       if (length == 0) return null;
2371       throw new IOException("Failed read " + length + " bytes, stream corrupt?");
2372     }
2373 
2374     // This is how the old Writables.readFrom used to deserialize.  Didn't even vint.
2375     byte [] bytes = new byte[length];
2376     in.readFully(bytes);
2377     return new KeyValue(bytes, 0, length);
2378   }
2379 
2380   /**
2381    * Create a new KeyValue by copying existing cell and adding new tags
2382    * @param c
2383    * @param newTags
2384    * @return a new KeyValue instance with new tags
2385    */
2386   public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
2387     List<Tag> existingTags = null;
2388     if(c.getTagsLength() > 0) {
2389       existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
2390       existingTags.addAll(newTags);
2391     } else {
2392       existingTags = newTags;
2393     }
2394     return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
2395       c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
2396       c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
2397       c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
2398       c.getValueLength(), existingTags);
2399   }
2400 
2401   /**
2402    * Create a KeyValue reading from the raw InputStream.
2403    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
2404    * @param in
2405    * @return Created KeyValue or throws an exception
2406    * @throws IOException
2407    * {@link Deprecated} As of 1.2. Use {@link KeyValueUtil#iscreate(InputStream, boolean)} instead.
2408    */
2409   @Deprecated
2410   public static KeyValue iscreate(final InputStream in) throws IOException {
2411     byte [] intBytes = new byte[Bytes.SIZEOF_INT];
2412     int bytesRead = 0;
2413     while (bytesRead < intBytes.length) {
2414       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
2415       if (n < 0) {
2416         if (bytesRead == 0) {
2417           throw new EOFException();
2418         }
2419         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
2420       }
2421       bytesRead += n;
2422     }
2423     // TODO: perhaps some sanity check is needed here.
2424     byte [] bytes = new byte[Bytes.toInt(intBytes)];
2425     IOUtils.readFully(in, bytes, 0, bytes.length);
2426     return new KeyValue(bytes, 0, bytes.length);
2427   }
2428 
2429   /**
2430    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable.
2431    * @param kv
2432    * @param out
2433    * @return Length written on stream
2434    * @throws IOException
2435    * @see #create(DataInput) for the inverse function
2436    */
2437   public static long write(final KeyValue kv, final DataOutput out) throws IOException {
2438     // This is how the old Writables write used to serialize KVs.  Need to figure way to make it
2439     // work for all implementations.
2440     int length = kv.getLength();
2441     out.writeInt(length);
2442     out.write(kv.getBuffer(), kv.getOffset(), length);
2443     return length + Bytes.SIZEOF_INT;
2444   }
2445 
2446   /**
2447    * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
2448    * not require a {@link DataOutput}, just take plain {@link OutputStream}
2449    * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
2450    * @param kv
2451    * @param out
2452    * @param withTags
2453    * @return Length written on stream
2454    * @throws IOException
2455    * @see #create(DataInput) for the inverse function
2456    * @see #write(KeyValue, DataOutput)
2457    * @see KeyValueUtil#oswrite(Cell, OutputStream, boolean)
2458    * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0.
2459    *             Instead use {@link #write(OutputStream, boolean)}
2460    */
2461   @Deprecated
2462   public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
2463       throws IOException {
2464     return kv.write(out, withTags);
2465   }
2466 
2467   @Override
2468   public int write(OutputStream out) throws IOException {
2469     return write(out, true);
2470   }
2471 
2472   @Override
2473   public int write(OutputStream out, boolean withTags) throws IOException {
2474     // In KeyValueUtil#oswrite we do a Cell serialization as KeyValue. Any changes doing here, pls
2475     // check KeyValueUtil#oswrite also and do necessary changes.
2476     int length = this.length;
2477     if (!withTags) {
2478       length = this.getKeyLength() + this.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
2479     }
2480     ByteBufferUtils.putInt(out, length);
2481     out.write(this.bytes, this.offset, length);
2482     return length + Bytes.SIZEOF_INT;
2483   }
2484 
2485   /**
2486    * Comparator that compares row component only of a KeyValue.
2487    */
2488   public static class RowOnlyComparator implements Comparator<KeyValue> {
2489     final KVComparator comparator;
2490 
2491     public RowOnlyComparator(final KVComparator c) {
2492       this.comparator = c;
2493     }
2494 
2495     @Override
2496     public int compare(KeyValue left, KeyValue right) {
2497       return comparator.compareRows(left, right);
2498     }
2499   }
2500 
2501 
2502   /**
2503    * Avoids redundant comparisons for better performance.
2504    *
2505    * TODO get rid of this wart
2506    */
2507   public interface SamePrefixComparator<T> {
2508     /**
2509      * Compare two keys assuming that the first n bytes are the same.
2510      * @param commonPrefix How many bytes are the same.
2511      */
2512     int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength,
2513         byte[] right, int roffset, int rlength
2514     );
2515   }
2516 
2517   /**
2518    * @deprecated  Not to be used for any comparsions
2519    */
2520   @Deprecated
2521   public static class RawBytesComparator extends KVComparator {
2522     /**
2523      * The HFileV2 file format's trailer contains this class name.  We reinterpret this and
2524      * instantiate the appropriate comparator.
2525      * TODO: With V3 consider removing this.
2526      * @return legacy class name for FileFileTrailer#comparatorClassName
2527      */
2528     @Override
2529     public String getLegacyKeyComparatorName() {
2530       return "org.apache.hadoop.hbase.util.Bytes$ByteArrayComparator";
2531     }
2532 
2533     /**
2534      * @deprecated Since 0.99.2.
2535      */
2536     @Override
2537     @Deprecated
2538     public int compareFlatKey(byte[] left, int loffset, int llength, byte[] right,
2539         int roffset, int rlength) {
2540       return Bytes.BYTES_RAWCOMPARATOR.compare(left,  loffset, llength, right, roffset, rlength);
2541     }
2542 
2543     @Override
2544     public int compare(Cell left, Cell right) {
2545       return compareOnlyKeyPortion(left, right);
2546     }
2547 
2548     @Override
2549     @VisibleForTesting
2550     public int compareOnlyKeyPortion(Cell left, Cell right) {
2551       int c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getRowArray(), left.getRowOffset(),
2552           left.getRowLength(), right.getRowArray(), right.getRowOffset(), right.getRowLength());
2553       if (c != 0) {
2554         return c;
2555       }
2556       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getFamilyArray(), left.getFamilyOffset(),
2557           left.getFamilyLength(), right.getFamilyArray(), right.getFamilyOffset(),
2558           right.getFamilyLength());
2559       if (c != 0) {
2560         return c;
2561       }
2562       c = Bytes.BYTES_RAWCOMPARATOR.compare(left.getQualifierArray(), left.getQualifierOffset(),
2563           left.getQualifierLength(), right.getQualifierArray(), right.getQualifierOffset(),
2564           right.getQualifierLength());
2565       if (c != 0) {
2566         return c;
2567       }
2568       c = compareTimestamps(left.getTimestamp(), right.getTimestamp());
2569       if (c != 0) {
2570         return c;
2571       }
2572       return (0xff & left.getTypeByte()) - (0xff & right.getTypeByte());
2573     }
2574 
2575     @Override
2576     public byte[] calcIndexKey(byte[] lastKeyOfPreviousBlock, byte[] firstKeyInBlock) {
2577       return firstKeyInBlock;
2578     }
2579 
2580   }
2581 
2582   /**
2583    * HeapSize implementation
2584    *
2585    * We do not count the bytes in the rowCache because it should be empty for a KeyValue in the
2586    * MemStore.
2587    */
2588   @Override
2589   public long heapSize() {
2590     int sum = 0;
2591     sum += ClassSize.OBJECT;// the KeyValue object itself
2592     sum += ClassSize.REFERENCE;// pointer to "bytes"
2593     sum += ClassSize.align(ClassSize.ARRAY);// "bytes"
2594     sum += ClassSize.align(length);// number of bytes of data in the "bytes" array
2595     sum += 2 * Bytes.SIZEOF_INT;// offset, length
2596     sum += Bytes.SIZEOF_LONG;// memstoreTS
2597     return ClassSize.align(sum);
2598   }
2599 
2600   /**
2601    * A simple form of KeyValue that creates a keyvalue with only the key part of the byte[]
2602    * Mainly used in places where we need to compare two cells.  Avoids copying of bytes
2603    * In places like block index keys, we need to compare the key byte[] with a cell.
2604    * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells
2605    */
2606   public static class KeyOnlyKeyValue extends KeyValue {
2607     private short rowLen = -1;
2608     public KeyOnlyKeyValue() {
2609 
2610     }
2611     public KeyOnlyKeyValue(byte[] b) {
2612       this(b, 0, b.length);
2613     }
2614 
2615     public KeyOnlyKeyValue(byte[] b, int offset, int length) {
2616       this.bytes = b;
2617       this.length = length;
2618       this.offset = offset;
2619       this.rowLen = Bytes.toShort(this.bytes, this.offset);
2620     }
2621 
2622     @Override
2623     public int getKeyOffset() {
2624       return this.offset;
2625     }
2626 
2627     /**
2628      * A setter that helps to avoid object creation every time and whenever
2629      * there is a need to create new KeyOnlyKeyValue.
2630      * @param key
2631      * @param offset
2632      * @param length
2633      */
2634     public void setKey(byte[] key, int offset, int length) {
2635       this.bytes = key;
2636       this.offset = offset;
2637       this.length = length;
2638       this.rowLen = Bytes.toShort(this.bytes, this.offset);
2639     }
2640 
2641     @Override
2642     public byte[] getKey() {
2643       int keylength = getKeyLength();
2644       byte[] key = new byte[keylength];
2645       System.arraycopy(this.bytes, getKeyOffset(), key, 0, keylength);
2646       return key;
2647     }
2648 
2649     @Override
2650     public byte[] getRowArray() {
2651       return bytes;
2652     }
2653 
2654     @Override
2655     public int getRowOffset() {
2656       return getKeyOffset() + Bytes.SIZEOF_SHORT;
2657     }
2658 
2659     @Override
2660     public byte[] getFamilyArray() {
2661       return bytes;
2662     }
2663 
2664     @Override
2665     public byte getFamilyLength() {
2666       return this.bytes[getFamilyOffset() - 1];
2667     }
2668 
2669     @Override
2670     public int getFamilyOffset() {
2671       return this.offset + Bytes.SIZEOF_SHORT + getRowLength() + Bytes.SIZEOF_BYTE;
2672     }
2673 
2674     @Override
2675     public byte[] getQualifierArray() {
2676       return bytes;
2677     }
2678 
2679     @Override
2680     public int getQualifierLength() {
2681       return getQualifierLength(getRowLength(), getFamilyLength());
2682     }
2683 
2684     @Override
2685     public int getQualifierOffset() {
2686       return getFamilyOffset() + getFamilyLength();
2687     }
2688 
2689     @Override
2690     public int getKeyLength() {
2691       return length;
2692     }
2693 
2694     @Override
2695     public short getRowLength() {
2696       return rowLen;
2697     }
2698 
2699     @Override
2700     public byte getTypeByte() {
2701       return this.bytes[this.offset + getKeyLength() - 1];
2702     }
2703 
2704     private int getQualifierLength(int rlength, int flength) {
2705       return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
2706     }
2707 
2708     @Override
2709     public long getTimestamp() {
2710       int tsOffset = getTimestampOffset();
2711       return Bytes.toLong(this.bytes, tsOffset);
2712     }
2713 
2714     @Override
2715     public int getTimestampOffset() {
2716       return getKeyOffset() + getKeyLength() - TIMESTAMP_TYPE_SIZE;
2717     }
2718 
2719     @Override
2720     public byte[] getTagsArray() {
2721       return HConstants.EMPTY_BYTE_ARRAY;
2722     }
2723 
2724     @Override
2725     public int getTagsOffset() {
2726       return 0;
2727     }
2728 
2729     @Override
2730     public byte[] getValueArray() {
2731       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2732     }
2733 
2734     @Override
2735     public int getValueOffset() {
2736       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2737     }
2738 
2739     @Override
2740     public int getValueLength() {
2741       throw new IllegalArgumentException("KeyOnlyKeyValue does not work with values.");
2742     }
2743 
2744     @Override
2745     public int getTagsLength() {
2746       return 0;
2747     }
2748 
2749     @Override
2750     public String toString() {
2751       if (this.bytes == null || this.bytes.length == 0) {
2752         return "empty";
2753       }
2754       return keyToString(this.bytes, this.offset, getKeyLength()) + "/vlen=0/mvcc=0";
2755     }
2756 
2757     @Override
2758     public int hashCode() {
2759       return super.hashCode();
2760     }
2761 
2762     @Override
2763     public boolean equals(Object other) {
2764       return super.equals(other);
2765     }
2766 
2767     @Override
2768     public long heapSize() {
2769       return super.heapSize() + Bytes.SIZEOF_SHORT;
2770     }
2771   }
2772 }