View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mob;
20  
21  import java.io.FileNotFoundException;
22  import java.io.IOException;
23  import java.security.Key;
24  import java.security.KeyException;
25  import java.text.ParseException;
26  import java.text.SimpleDateFormat;
27  import java.util.ArrayList;
28  import java.util.Collection;
29  import java.util.Date;
30  import java.util.List;
31  import java.util.UUID;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.RejectedExecutionException;
34  import java.util.concurrent.RejectedExecutionHandler;
35  import java.util.concurrent.SynchronousQueue;
36  import java.util.concurrent.ThreadPoolExecutor;
37  import java.util.concurrent.TimeUnit;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.hbase.Cell;
46  import org.apache.hadoop.hbase.CellComparator;
47  import org.apache.hadoop.hbase.HBaseConfiguration;
48  import org.apache.hadoop.hbase.HColumnDescriptor;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HTableDescriptor;
52  import org.apache.hadoop.hbase.KeyValue;
53  import org.apache.hadoop.hbase.TableName;
54  import org.apache.hadoop.hbase.Tag;
55  import org.apache.hadoop.hbase.TagType;
56  import org.apache.hadoop.hbase.backup.HFileArchiver;
57  import org.apache.hadoop.hbase.classification.InterfaceAudience;
58  import org.apache.hadoop.hbase.client.Scan;
59  import org.apache.hadoop.hbase.io.HFileLink;
60  import org.apache.hadoop.hbase.io.compress.Compression;
61  import org.apache.hadoop.hbase.io.crypto.Cipher;
62  import org.apache.hadoop.hbase.io.crypto.Encryption;
63  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
64  import org.apache.hadoop.hbase.io.hfile.HFileContext;
65  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
66  import org.apache.hadoop.hbase.master.TableLockManager;
67  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
68  import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
69  import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
70  import org.apache.hadoop.hbase.regionserver.BloomType;
71  import org.apache.hadoop.hbase.regionserver.HStore;
72  import org.apache.hadoop.hbase.regionserver.StoreFile;
73  import org.apache.hadoop.hbase.security.EncryptionUtil;
74  import org.apache.hadoop.hbase.security.User;
75  import org.apache.hadoop.hbase.util.Bytes;
76  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
77  import org.apache.hadoop.hbase.util.FSUtils;
78  import org.apache.hadoop.hbase.util.ReflectionUtils;
79  import org.apache.hadoop.hbase.util.Threads;
80  
81  /**
82   * The mob utilities
83   */
84  @InterfaceAudience.Private
85  public class MobUtils {
86  
87    private static final Log LOG = LogFactory.getLog(MobUtils.class);
88  
89    private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
90        new ThreadLocal<SimpleDateFormat>() {
91      @Override
92      protected SimpleDateFormat initialValue() {
93        return new SimpleDateFormat("yyyyMMdd");
94      }
95    };
96  
97    /**
98     * Formats a date to a string.
99     * @param date The date.
100    * @return The string format of the date, it's yyyymmdd.
101    */
102   public static String formatDate(Date date) {
103     return LOCAL_FORMAT.get().format(date);
104   }
105 
106   /**
107    * Parses the string to a date.
108    * @param dateString The string format of a date, it's yyyymmdd.
109    * @return A date.
110    * @throws ParseException
111    */
112   public static Date parseDate(String dateString) throws ParseException {
113     return LOCAL_FORMAT.get().parse(dateString);
114   }
115 
116   /**
117    * Whether the current cell is a mob reference cell.
118    * @param cell The current cell.
119    * @return True if the cell has a mob reference tag, false if it doesn't.
120    */
121   public static boolean isMobReferenceCell(Cell cell) {
122     if (cell.getTagsLength() > 0) {
123       Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
124           TagType.MOB_REFERENCE_TAG_TYPE);
125       return tag != null;
126     }
127     return false;
128   }
129 
130   /**
131    * Gets the table name tag.
132    * @param cell The current cell.
133    * @return The table name tag.
134    */
135   public static Tag getTableNameTag(Cell cell) {
136     if (cell.getTagsLength() > 0) {
137       Tag tag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
138           TagType.MOB_TABLE_NAME_TAG_TYPE);
139       return tag;
140     }
141     return null;
142   }
143 
144   /**
145    * Whether the tag list has a mob reference tag.
146    * @param tags The tag list.
147    * @return True if the list has a mob reference tag, false if it doesn't.
148    */
149   public static boolean hasMobReferenceTag(List<Tag> tags) {
150     if (!tags.isEmpty()) {
151       for (Tag tag : tags) {
152         if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
153           return true;
154         }
155       }
156     }
157     return false;
158   }
159 
160   /**
161    * Indicates whether it's a raw scan.
162    * The information is set in the attribute "hbase.mob.scan.raw" of scan.
163    * For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
164    * In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
165    * the mob file.
166    * @param scan The current scan.
167    * @return True if it's a raw scan.
168    */
169   public static boolean isRawMobScan(Scan scan) {
170     byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
171     try {
172       return raw != null && Bytes.toBoolean(raw);
173     } catch (IllegalArgumentException e) {
174       return false;
175     }
176   }
177 
178   /**
179    * Indicates whether it's a reference only scan.
180    * The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
181    * If it's a ref only scan, only the cells with ref tag are returned.
182    * @param scan The current scan.
183    * @return True if it's a ref only scan.
184    */
185   public static boolean isRefOnlyScan(Scan scan) {
186     byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
187     try {
188       return refOnly != null && Bytes.toBoolean(refOnly);
189     } catch (IllegalArgumentException e) {
190       return false;
191     }
192   }
193 
194   /**
195    * Indicates whether the scan contains the information of caching blocks.
196    * The information is set in the attribute "hbase.mob.cache.blocks" of scan.
197    * @param scan The current scan.
198    * @return True when the Scan attribute specifies to cache the MOB blocks.
199    */
200   public static boolean isCacheMobBlocks(Scan scan) {
201     byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
202     try {
203       return cache != null && Bytes.toBoolean(cache);
204     } catch (IllegalArgumentException e) {
205       return false;
206     }
207   }
208 
209   /**
210    * Sets the attribute of caching blocks in the scan.
211    *
212    * @param scan
213    *          The current scan.
214    * @param cacheBlocks
215    *          True, set the attribute of caching blocks into the scan, the scanner with this scan
216    *          caches blocks.
217    *          False, the scanner doesn't cache blocks for this scan.
218    */
219   public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
220     scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
221   }
222 
223   /**
224    * Cleans the expired mob files.
225    * Cleans the files whose creation date is older than (current - columnFamily.ttl), and
226    * the minVersions of that column family is 0.
227    * @param fs The current file system.
228    * @param conf The current configuration.
229    * @param tableName The current table name.
230    * @param columnDescriptor The descriptor of the current column family.
231    * @param cacheConfig The cacheConfig that disables the block cache.
232    * @param current The current time.
233    * @throws IOException
234    */
235   public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
236       HColumnDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
237       throws IOException {
238     long timeToLive = columnDescriptor.getTimeToLive();
239     if (Integer.MAX_VALUE == timeToLive) {
240       // no need to clean, because the TTL is not set.
241       return;
242     }
243 
244     Date expireDate = new Date(current - timeToLive * 1000);
245     expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
246     LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
247 
248     FileStatus[] stats = null;
249     Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
250     Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
251     try {
252       stats = fs.listStatus(path);
253     } catch (FileNotFoundException e) {
254       LOG.warn("Failed to find the mob file " + path, e);
255     }
256     if (null == stats) {
257       // no file found
258       return;
259     }
260     List<StoreFile> filesToClean = new ArrayList<StoreFile>();
261     int deletedFileCount = 0;
262     for (FileStatus file : stats) {
263       String fileName = file.getPath().getName();
264       try {
265         MobFileName mobFileName = null;
266         if (!HFileLink.isHFileLink(file.getPath())) {
267           mobFileName = MobFileName.create(fileName);
268         } else {
269           HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
270           mobFileName = MobFileName.create(hfileLink.getOriginPath().getName());
271         }
272         Date fileDate = parseDate(mobFileName.getDate());
273         if (LOG.isDebugEnabled()) {
274           LOG.debug("Checking file " + fileName);
275         }
276         if (fileDate.getTime() < expireDate.getTime()) {
277           if (LOG.isDebugEnabled()) {
278             LOG.debug(fileName + " is an expired file");
279           }
280           filesToClean.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
281         }
282       } catch (Exception e) {
283         LOG.error("Cannot parse the fileName " + fileName, e);
284       }
285     }
286     if (!filesToClean.isEmpty()) {
287       try {
288         removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
289             filesToClean);
290         deletedFileCount = filesToClean.size();
291       } catch (IOException e) {
292         LOG.error("Failed to delete the mob files " + filesToClean, e);
293       }
294     }
295     LOG.info(deletedFileCount + " expired mob files are deleted");
296   }
297 
298   /**
299    * Gets the root dir of the mob files.
300    * It's {HBASE_DIR}/mobdir.
301    * @param conf The current configuration.
302    * @return the root dir of the mob file.
303    */
304   public static Path getMobHome(Configuration conf) {
305     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
306     return new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
307   }
308 
309   /**
310    * Gets the qualified root dir of the mob files.
311    * @param conf The current configuration.
312    * @return The qualified root dir.
313    * @throws IOException
314    */
315   public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
316     Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
317     Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
318     FileSystem fs = mobRootDir.getFileSystem(conf);
319     return mobRootDir.makeQualified(fs);
320   }
321 
322   /**
323    * Gets the region dir of the mob files.
324    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}.
325    * @param conf The current configuration.
326    * @param tableName The current table name.
327    * @return The region dir of the mob files.
328    */
329   public static Path getMobRegionPath(Configuration conf, TableName tableName) {
330     Path tablePath = FSUtils.getTableDir(getMobHome(conf), tableName);
331     HRegionInfo regionInfo = getMobRegionInfo(tableName);
332     return new Path(tablePath, regionInfo.getEncodedName());
333   }
334 
335   /**
336    * Gets the family dir of the mob files.
337    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
338    * @param conf The current configuration.
339    * @param tableName The current table name.
340    * @param familyName The current family name.
341    * @return The family dir of the mob files.
342    */
343   public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
344     return new Path(getMobRegionPath(conf, tableName), familyName);
345   }
346 
347   /**
348    * Gets the family dir of the mob files.
349    * It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
350    * @param regionPath The path of mob region which is a dummy one.
351    * @param familyName The current family name.
352    * @return The family dir of the mob files.
353    */
354   public static Path getMobFamilyPath(Path regionPath, String familyName) {
355     return new Path(regionPath, familyName);
356   }
357 
358   /**
359    * Gets the HRegionInfo of the mob files.
360    * This is a dummy region. The mob files are not saved in a region in HBase.
361    * This is only used in mob snapshot. It's internally used only.
362    * @param tableName
363    * @return A dummy mob region info.
364    */
365   public static HRegionInfo getMobRegionInfo(TableName tableName) {
366     HRegionInfo info = new HRegionInfo(tableName, MobConstants.MOB_REGION_NAME_BYTES,
367         HConstants.EMPTY_END_ROW, false, 0);
368     return info;
369   }
370 
371   /**
372    * Gets whether the current HRegionInfo is a mob one.
373    * @param regionInfo The current HRegionInfo.
374    * @return If true, the current HRegionInfo is a mob one.
375    */
376   public static boolean isMobRegionInfo(HRegionInfo regionInfo) {
377     return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
378         .equals(regionInfo.getEncodedName());
379   }
380 
381   /**
382    * Gets whether the current region name follows the pattern of a mob region name.
383    * @param tableName The current table name.
384    * @param regionName The current region name.
385    * @return True if the current region name follows the pattern of a mob region name.
386    */
387   public static boolean isMobRegionName(TableName tableName, byte[] regionName) {
388     return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName());
389   }
390 
391   /**
392    * Gets the working directory of the mob compaction.
393    * @param root The root directory of the mob compaction.
394    * @param jobName The current job name.
395    * @return The directory of the mob compaction for the current job.
396    */
397   public static Path getCompactionWorkingPath(Path root, String jobName) {
398     return new Path(root, jobName);
399   }
400 
401   /**
402    * Archives the mob files.
403    * @param conf The current configuration.
404    * @param fs The current file system.
405    * @param tableName The table name.
406    * @param tableDir The table directory.
407    * @param family The name of the column family.
408    * @param storeFiles The files to be deleted.
409    * @throws IOException
410    */
411   public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
412       Path tableDir, byte[] family, Collection<StoreFile> storeFiles) throws IOException {
413     HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
414         storeFiles);
415   }
416 
417   /**
418    * Creates a mob reference KeyValue.
419    * The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
420    * @param cell The original Cell.
421    * @param fileName The mob file name where the mob reference KeyValue is written.
422    * @param tableNameTag The tag of the current table name. It's very important in
423    *                        cloning the snapshot.
424    * @return The mob reference KeyValue.
425    */
426   public static KeyValue createMobRefKeyValue(Cell cell, byte[] fileName, Tag tableNameTag) {
427     // Append the tags to the KeyValue.
428     // The key is same, the value is the filename of the mob file
429     List<Tag> tags = new ArrayList<Tag>();
430     // Add the ref tag as the 1st one.
431     tags.add(MobConstants.MOB_REF_TAG);
432     // Add the tag of the source table name, this table is where this mob file is flushed
433     // from.
434     // It's very useful in cloning the snapshot. When reading from the cloning table, we need to
435     // find the original mob files by this table name. For details please see cloning
436     // snapshot for mob files.
437     tags.add(tableNameTag);
438     // Add the existing tags.
439     tags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()));
440     int valueLength = cell.getValueLength();
441     byte[] refValue = Bytes.add(Bytes.toBytes(valueLength), fileName);
442     KeyValue reference = new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
443         cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
444         cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
445         cell.getTimestamp(), KeyValue.Type.Put, refValue, 0, refValue.length, tags);
446     reference.setSequenceId(cell.getSequenceId());
447     return reference;
448   }
449 
450   /**
451    * Creates a writer for the mob file in temp directory.
452    * @param conf The current configuration.
453    * @param fs The current file system.
454    * @param family The descriptor of the current column family.
455    * @param date The date string, its format is yyyymmmdd.
456    * @param basePath The basic path for a temp directory.
457    * @param maxKeyCount The key count.
458    * @param compression The compression algorithm.
459    * @param startKey The hex string of the start key.
460    * @param cacheConfig The current cache config.
461    * @param cryptoContext The encryption context.
462    * @return The writer for the mob file.
463    * @throws IOException
464    */
465   public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
466       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
467       Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
468       Encryption.Context cryptoContext)
469       throws IOException {
470     MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
471         .replaceAll("-", ""));
472     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
473       cacheConfig, cryptoContext);
474   }
475 
476   /**
477    * Creates a writer for the ref file in temp directory.
478    * @param conf The current configuration.
479    * @param fs The current file system.
480    * @param family The descriptor of the current column family.
481    * @param basePath The basic path for a temp directory.
482    * @param maxKeyCount The key count.
483    * @param cacheConfig The current cache config.
484    * @param cryptoContext The encryption context.
485    * @return The writer for the mob file.
486    * @throws IOException
487    */
488   public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
489     HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
490     Encryption.Context cryptoContext)
491     throws IOException {
492     HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
493       .withIncludesTags(true).withCompression(family.getCompactionCompression())
494       .withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf))
495       .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
496       .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
497       .withEncryptionContext(cryptoContext).withCreateTime(EnvironmentEdgeManager.currentTime())
498       .build();
499     Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
500     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
501       .withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType())
502       .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
503     return w;
504   }
505 
506   /**
507    * Creates a writer for the mob file in temp directory.
508    * @param conf The current configuration.
509    * @param fs The current file system.
510    * @param family The descriptor of the current column family.
511    * @param date The date string, its format is yyyymmmdd.
512    * @param basePath The basic path for a temp directory.
513    * @param maxKeyCount The key count.
514    * @param compression The compression algorithm.
515    * @param startKey The start key.
516    * @param cacheConfig The current cache config.
517    * @param cryptoContext The encryption context.
518    * @return The writer for the mob file.
519    * @throws IOException
520    */
521   public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
522       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
523       Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
524       Encryption.Context cryptoContext)
525       throws IOException {
526     MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
527         .replaceAll("-", ""));
528     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
529       cacheConfig, cryptoContext);
530   }
531 
532   /**
533    * Creates a writer for the del file in temp directory.
534    * @param conf The current configuration.
535    * @param fs The current file system.
536    * @param family The descriptor of the current column family.
537    * @param date The date string, its format is yyyymmmdd.
538    * @param basePath The basic path for a temp directory.
539    * @param maxKeyCount The key count.
540    * @param compression The compression algorithm.
541    * @param startKey The start key.
542    * @param cacheConfig The current cache config.
543    * @param cryptoContext The encryption context.
544    * @return The writer for the del file.
545    * @throws IOException
546    */
547   public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
548       HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
549       Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
550       Encryption.Context cryptoContext)
551       throws IOException {
552     String suffix = UUID
553       .randomUUID().toString().replaceAll("-", "") + "_del";
554     MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
555     return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
556       cacheConfig, cryptoContext);
557   }
558 
559   /**
560    * Creates a writer for the mob file in temp directory.
561    * @param conf The current configuration.
562    * @param fs The current file system.
563    * @param family The descriptor of the current column family.
564    * @param mobFileName The mob file name.
565    * @param basePath The basic path for a temp directory.
566    * @param maxKeyCount The key count.
567    * @param compression The compression algorithm.
568    * @param cacheConfig The current cache config.
569    * @param cryptoContext The encryption context.
570    * @return The writer for the mob file.
571    * @throws IOException
572    */
573   private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
574     HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
575     Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext)
576     throws IOException {
577     HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
578       .withIncludesMvcc(true).withIncludesTags(true)
579       .withCompressTags(family.isCompressTags())
580       .withChecksumType(HStore.getChecksumType(conf))
581       .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
582       .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
583       .withEncryptionContext(cryptoContext)
584       .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
585 
586     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
587       .withFilePath(new Path(basePath, mobFileName.getFileName()))
588       .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
589       .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
590     return w;
591   }
592 
593   /**
594    * Commits the mob file.
595    * @param conf The current configuration.
596    * @param fs The current file system.
597    * @param sourceFile The path where the mob file is saved.
598    * @param targetPath The directory path where the source file is renamed to.
599    * @param cacheConfig The current cache config.
600    * @return The target file path the source file is renamed to.
601    * @throws IOException
602    */
603   public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
604       Path targetPath, CacheConfig cacheConfig) throws IOException {
605     if (sourceFile == null) {
606       return null;
607     }
608     Path dstPath = new Path(targetPath, sourceFile.getName());
609     validateMobFile(conf, fs, sourceFile, cacheConfig);
610     String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
611     LOG.info(msg);
612     Path parent = dstPath.getParent();
613     if (!fs.exists(parent)) {
614       fs.mkdirs(parent);
615     }
616     if (!fs.rename(sourceFile, dstPath)) {
617       throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
618     }
619     return dstPath;
620   }
621 
622   /**
623    * Validates a mob file by opening and closing it.
624    * @param conf The current configuration.
625    * @param fs The current file system.
626    * @param path The path where the mob file is saved.
627    * @param cacheConfig The current cache config.
628    */
629   private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
630       CacheConfig cacheConfig) throws IOException {
631     StoreFile storeFile = null;
632     try {
633       storeFile = new StoreFile(fs, path, conf, cacheConfig, BloomType.NONE);
634       storeFile.createReader();
635     } catch (IOException e) {
636       LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
637       throw e;
638     } finally {
639       if (storeFile != null) {
640         storeFile.closeReader(false);
641       }
642     }
643   }
644 
645   /**
646    * Indicates whether the current mob ref cell has a valid value.
647    * A mob ref cell has a mob reference tag.
648    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
649    * The real mob value length takes 4 bytes.
650    * The remaining part is the mob file name.
651    * @param cell The mob ref cell.
652    * @return True if the cell has a valid value.
653    */
654   public static boolean hasValidMobRefCellValue(Cell cell) {
655     return cell.getValueLength() > Bytes.SIZEOF_INT;
656   }
657 
658   /**
659    * Gets the mob value length from the mob ref cell.
660    * A mob ref cell has a mob reference tag.
661    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
662    * The real mob value length takes 4 bytes.
663    * The remaining part is the mob file name.
664    * @param cell The mob ref cell.
665    * @return The real mob value length.
666    */
667   public static int getMobValueLength(Cell cell) {
668     return Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), Bytes.SIZEOF_INT);
669   }
670 
671   /**
672    * Gets the mob file name from the mob ref cell.
673    * A mob ref cell has a mob reference tag.
674    * The value of a mob ref cell consists of two parts, real mob value length and mob file name.
675    * The real mob value length takes 4 bytes.
676    * The remaining part is the mob file name.
677    * @param cell The mob ref cell.
678    * @return The mob file name.
679    */
680   public static String getMobFileName(Cell cell) {
681     return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
682         cell.getValueLength() - Bytes.SIZEOF_INT);
683   }
684 
685   /**
686    * Gets the table name used in the table lock.
687    * The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
688    * @param tn The table name.
689    * @return The table name used in table lock.
690    */
691   public static TableName getTableLockName(TableName tn) {
692     byte[] tableName = tn.getName();
693     return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
694   }
695 
696   /**
697    * Performs the mob compaction.
698    * @param conf the Configuration
699    * @param fs the file system
700    * @param tableName the table the compact
701    * @param hcd the column descriptor
702    * @param pool the thread pool
703    * @param tableLockManager the tableLock manager
704    * @param allFiles Whether add all mob files into the compaction.
705    */
706   public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
707     HColumnDescriptor hcd, ExecutorService pool, TableLockManager tableLockManager,
708     boolean allFiles) throws IOException {
709     String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
710       PartitionedMobCompactor.class.getName());
711     // instantiate the mob compactor.
712     MobCompactor compactor = null;
713     try {
714       compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
715         Configuration.class, FileSystem.class, TableName.class, HColumnDescriptor.class,
716         ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool });
717     } catch (Exception e) {
718       throw new IOException("Unable to load configured mob file compactor '" + className + "'", e);
719     }
720     // compact only for mob-enabled column.
721     // obtain a write table lock before performing compaction to avoid race condition
722     // with major compaction in mob-enabled column.
723     boolean tableLocked = false;
724     TableLock lock = null;
725     try {
726       // the tableLockManager might be null in testing. In that case, it is lock-free.
727       if (tableLockManager != null) {
728         lock = tableLockManager.writeLock(MobUtils.getTableLockName(tableName),
729           "Run MobCompactor");
730         lock.acquire();
731       }
732       tableLocked = true;
733       compactor.compact(allFiles);
734     } catch (Exception e) {
735       LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
736         + " in the table " + tableName.getNameAsString(), e);
737     } finally {
738       if (lock != null && tableLocked) {
739         try {
740           lock.release();
741         } catch (IOException e) {
742           LOG.error(
743             "Failed to release the write lock for the table " + tableName.getNameAsString(), e);
744         }
745       }
746     }
747   }
748 
749   /**
750    * Creates a thread pool.
751    * @param conf the Configuration
752    * @return A thread pool.
753    */
754   public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
755     int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
756       MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
757     if (maxThreads == 0) {
758       maxThreads = 1;
759     }
760     final SynchronousQueue<Runnable> queue = new SynchronousQueue<Runnable>();
761     ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
762       Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
763         @Override
764         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
765           try {
766             // waiting for a thread to pick up instead of throwing exceptions.
767             queue.put(r);
768           } catch (InterruptedException e) {
769             throw new RejectedExecutionException(e);
770           }
771         }
772       });
773     ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
774     return pool;
775   }
776 
777   /**
778    * Creates the encyption context.
779    * @param conf The current configuration.
780    * @param family The current column descriptor.
781    * @return The encryption context.
782    * @throws IOException
783    */
784   public static Encryption.Context createEncryptionContext(Configuration conf,
785     HColumnDescriptor family) throws IOException {
786     // TODO the code is repeated, and needs to be unified.
787     Encryption.Context cryptoContext = Encryption.Context.NONE;
788     String cipherName = family.getEncryptionType();
789     if (cipherName != null) {
790       Cipher cipher;
791       Key key;
792       byte[] keyBytes = family.getEncryptionKey();
793       if (keyBytes != null) {
794         // Family provides specific key material
795         String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User
796           .getCurrent().getShortName());
797         try {
798           // First try the master key
799           key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
800         } catch (KeyException e) {
801           // If the current master key fails to unwrap, try the alternate, if
802           // one is configured
803           if (LOG.isDebugEnabled()) {
804             LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
805           }
806           String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
807           if (alternateKeyName != null) {
808             try {
809               key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
810             } catch (KeyException ex) {
811               throw new IOException(ex);
812             }
813           } else {
814             throw new IOException(e);
815           }
816         }
817         // Use the algorithm the key wants
818         cipher = Encryption.getCipher(conf, key.getAlgorithm());
819         if (cipher == null) {
820           throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
821         }
822         // Fail if misconfigured
823         // We use the encryption type specified in the column schema as a sanity check on
824         // what the wrapped key is telling us
825         if (!cipher.getName().equalsIgnoreCase(cipherName)) {
826           throw new RuntimeException("Encryption for family '" + family.getNameAsString()
827             + "' configured with type '" + cipherName + "' but key specifies algorithm '"
828             + cipher.getName() + "'");
829         }
830       } else {
831         // Family does not provide key material, create a random key
832         cipher = Encryption.getCipher(conf, cipherName);
833         if (cipher == null) {
834           throw new RuntimeException("Cipher '" + cipherName + "' is not available");
835         }
836         key = cipher.getRandomKey();
837       }
838       cryptoContext = Encryption.newContext(conf);
839       cryptoContext.setCipher(cipher);
840       cryptoContext.setKey(key);
841     }
842     return cryptoContext;
843   }
844 
845   /**
846    * Checks whether this table has mob-enabled columns.
847    * @param htd The current table descriptor.
848    * @return Whether this table has mob-enabled columns.
849    */
850   public static boolean hasMobColumns(HTableDescriptor htd) {
851     HColumnDescriptor[] hcds = htd.getColumnFamilies();
852     for (HColumnDescriptor hcd : hcds) {
853       if (hcd.isMobEnabled()) {
854         return true;
855       }
856     }
857     return false;
858   }
859 
860   /**
861    * Indicates whether return null value when the mob file is missing or corrupt.
862    * The information is set in the attribute "empty.value.on.mobcell.miss" of scan.
863    * @param scan The current scan.
864    * @return True if the readEmptyValueOnMobCellMiss is enabled.
865    */
866   public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) {
867     byte[] readEmptyValueOnMobCellMiss =
868       scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS);
869     try {
870       return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss);
871     } catch (IllegalArgumentException e) {
872       return false;
873     }
874   }
875 
876   /**
877    * Archives mob store files
878    * @param conf The current configuration.
879    * @param fs The current file system.
880    * @param mobRegionInfo The mob family region info.
881    * @param mobFamilyDir The mob family directory.
882    * @param family The name of the column family.
883    * @throws IOException
884    */
885   public static void archiveMobStoreFiles(Configuration conf, FileSystem fs,
886       HRegionInfo mobRegionInfo, Path mobFamilyDir, byte[] family) throws IOException {
887     // disable the block cache.
888     Configuration copyOfConf = HBaseConfiguration.create(conf);
889     copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
890     CacheConfig cacheConfig = new CacheConfig(copyOfConf);
891     FileStatus[] fileStatus = FSUtils.listStatus(fs, mobFamilyDir);
892     List<StoreFile> storeFileList = new ArrayList<StoreFile>();
893     for (FileStatus file : fileStatus) {
894       storeFileList.add(new StoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE));
895     }
896     HFileArchiver.archiveStoreFiles(conf, fs, mobRegionInfo, mobFamilyDir, family, storeFileList);
897   }
898 }