View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Date;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.NavigableSet;
27  import java.util.UUID;
28  import java.util.concurrent.ConcurrentHashMap;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.conf.Configuration;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.Cell;
36  import org.apache.hadoop.hbase.CellComparator;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.KeyValue.Type;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.Tag;
43  import org.apache.hadoop.hbase.classification.InterfaceAudience;
44  import org.apache.hadoop.hbase.client.Scan;
45  import org.apache.hadoop.hbase.filter.Filter;
46  import org.apache.hadoop.hbase.filter.FilterList;
47  import org.apache.hadoop.hbase.io.compress.Compression;
48  import org.apache.hadoop.hbase.io.hfile.CacheConfig;
49  import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
50  import org.apache.hadoop.hbase.io.hfile.HFileContext;
51  import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
52  import org.apache.hadoop.hbase.master.TableLockManager;
53  import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
54  import org.apache.hadoop.hbase.mob.MobCacheConfig;
55  import org.apache.hadoop.hbase.mob.MobConstants;
56  import org.apache.hadoop.hbase.mob.MobFile;
57  import org.apache.hadoop.hbase.mob.MobFileName;
58  import org.apache.hadoop.hbase.mob.MobStoreEngine;
59  import org.apache.hadoop.hbase.mob.MobUtils;
60  import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
61  import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
62  import org.apache.hadoop.hbase.util.Bytes;
63  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64  import org.apache.hadoop.hbase.util.HFileArchiveUtil;
65  import org.apache.hadoop.hbase.util.IdLock;
66  
67  /**
68   * The store implementation to save MOBs (medium objects), it extends the HStore.
69   * When a descriptor of a column family has the value "IS_MOB", it means this column family
70   * is a mob one. When a HRegion instantiate a store for this column family, the HMobStore is
71   * created.
72   * HMobStore is almost the same with the HStore except using different types of scanners.
73   * In the method of getScanner, the MobStoreScanner and MobReversedStoreScanner are returned.
74   * In these scanners, a additional seeks in the mob files should be performed after the seek
75   * to HBase is done.
76   * The store implements how we save MOBs by extending HStore. When a descriptor
77   * of a column family has the value "IS_MOB", it means this column family is a mob one. When a
78   * HRegion instantiate a store for this column family, the HMobStore is created. HMobStore is
79   * almost the same with the HStore except using different types of scanners. In the method of
80   * getScanner, the MobStoreScanner and MobReversedStoreScanner are returned. In these scanners, a
81   * additional seeks in the mob files should be performed after the seek in HBase is done.
82   */
83  @InterfaceAudience.Private
84  public class HMobStore extends HStore {
85    private static final Log LOG = LogFactory.getLog(HMobStore.class);
86    private MobCacheConfig mobCacheConfig;
87    private Path homePath;
88    private Path mobFamilyPath;
89    private volatile long cellsCountCompactedToMob = 0;
90    private volatile long cellsCountCompactedFromMob = 0;
91    private volatile long cellsSizeCompactedToMob = 0;
92    private volatile long cellsSizeCompactedFromMob = 0;
93    private volatile long mobFlushCount = 0;
94    private volatile long mobFlushedCellsCount = 0;
95    private volatile long mobFlushedCellsSize = 0;
96    private volatile long mobScanCellsCount = 0;
97    private volatile long mobScanCellsSize = 0;
98    private HColumnDescriptor family;
99    private TableLockManager tableLockManager;
100   private TableName tableLockName;
101   private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>();
102   private final IdLock keyLock = new IdLock();
103 
104   public HMobStore(final HRegion region, final HColumnDescriptor family,
105       final Configuration confParam) throws IOException {
106     super(region, family, confParam);
107     this.family = family;
108     this.mobCacheConfig = (MobCacheConfig) cacheConf;
109     this.homePath = MobUtils.getMobHome(conf);
110     this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
111         family.getNameAsString());
112     List<Path> locations = new ArrayList<Path>(2);
113     locations.add(mobFamilyPath);
114     TableName tn = region.getTableDesc().getTableName();
115     locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
116         .getEncodedName(), family.getNameAsString()));
117     map.put(Bytes.toString(tn.getName()), locations);
118     if (region.getRegionServerServices() != null) {
119       tableLockManager = region.getRegionServerServices().getTableLockManager();
120       tableLockName = MobUtils.getTableLockName(getTableName());
121     }
122   }
123 
124   /**
125    * Creates the mob cache config.
126    */
127   @Override
128   protected void createCacheConf(HColumnDescriptor family) {
129     cacheConf = new MobCacheConfig(conf, family);
130   }
131 
132   /**
133    * Gets current config.
134    */
135   public Configuration getConfiguration() {
136     return this.conf;
137   }
138 
139   /**
140    * Gets the MobStoreScanner or MobReversedStoreScanner. In these scanners, a additional seeks in
141    * the mob files should be performed after the seek in HBase is done.
142    */
143   @Override
144   protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
145       long readPt, KeyValueScanner scanner) throws IOException {
146     if (scanner == null) {
147       if (MobUtils.isRefOnlyScan(scan)) {
148         Filter refOnlyFilter = new MobReferenceOnlyFilter();
149         Filter filter = scan.getFilter();
150         if (filter != null) {
151           scan.setFilter(new FilterList(filter, refOnlyFilter));
152         } else {
153           scan.setFilter(refOnlyFilter);
154         }
155       }
156       scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
157           targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
158     }
159     return scanner;
160   }
161 
162   /**
163    * Creates the mob store engine.
164    */
165   @Override
166   protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
167       CellComparator cellComparator) throws IOException {
168     MobStoreEngine engine = new MobStoreEngine();
169     engine.createComponents(conf, store, cellComparator);
170     return engine;
171   }
172 
173   /**
174    * Gets the temp directory.
175    * @return The temp directory.
176    */
177   private Path getTempDir() {
178     return new Path(homePath, MobConstants.TEMP_DIR_NAME);
179   }
180 
181   /**
182    * Creates the writer for the mob file in temp directory.
183    * @param date The latest date of written cells.
184    * @param maxKeyCount The key count.
185    * @param compression The compression algorithm.
186    * @param startKey The start key.
187    * @return The writer for the mob file.
188    * @throws IOException
189    */
190   public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
191       Compression.Algorithm compression, byte[] startKey) throws IOException {
192     if (startKey == null) {
193       startKey = HConstants.EMPTY_START_ROW;
194     }
195     Path path = getTempDir();
196     return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
197   }
198 
199   /**
200    * Creates the writer for the del file in temp directory.
201    * The del file keeps tracking the delete markers. Its name has a suffix _del,
202    * the format is [0-9a-f]+(_del)?.
203    * @param date The latest date of written cells.
204    * @param maxKeyCount The key count.
205    * @param compression The compression algorithm.
206    * @param startKey The start key.
207    * @return The writer for the del file.
208    * @throws IOException
209    */
210   public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
211       Compression.Algorithm compression, byte[] startKey) throws IOException {
212     if (startKey == null) {
213       startKey = HConstants.EMPTY_START_ROW;
214     }
215     Path path = getTempDir();
216     String suffix = UUID
217         .randomUUID().toString().replaceAll("-", "") + "_del";
218     MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
219     return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
220   }
221 
222   /**
223    * Creates the writer for the mob file in temp directory.
224    * @param date The date string, its format is yyyymmmdd.
225    * @param basePath The basic path for a temp directory.
226    * @param maxKeyCount The key count.
227    * @param compression The compression algorithm.
228    * @param startKey The start key.
229    * @return The writer for the mob file.
230    * @throws IOException
231    */
232   public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
233       Compression.Algorithm compression, byte[] startKey) throws IOException {
234     MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
235         .toString().replaceAll("-", ""));
236     return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
237   }
238 
239   /**
240    * Creates the writer for the mob file in temp directory.
241    * @param mobFileName The mob file name.
242    * @param basePath The basic path for a temp directory.
243    * @param maxKeyCount The key count.
244    * @param compression The compression algorithm.
245    * @return The writer for the mob file.
246    * @throws IOException
247    */
248   public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath,
249       long maxKeyCount, Compression.Algorithm compression) throws IOException {
250     final CacheConfig writerCacheConf = mobCacheConfig;
251     HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
252         .withIncludesMvcc(true).withIncludesTags(true)
253         .withCompressTags(family.isCompressTags())
254         .withChecksumType(checksumType)
255         .withBytesPerCheckSum(bytesPerChecksum)
256         .withBlockSize(blocksize)
257         .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding())
258         .withEncryptionContext(cryptoContext)
259         .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
260 
261     StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
262         .withFilePath(new Path(basePath, mobFileName.getFileName()))
263         .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
264         .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
265     return w;
266   }
267 
268   /**
269    * Commits the mob file.
270    * @param sourceFile The source file.
271    * @param targetPath The directory path where the source file is renamed to.
272    * @throws IOException
273    */
274   public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
275     if (sourceFile == null) {
276       return;
277     }
278     Path dstPath = new Path(targetPath, sourceFile.getName());
279     validateMobFile(sourceFile);
280     String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
281     LOG.info(msg);
282     Path parent = dstPath.getParent();
283     if (!region.getFilesystem().exists(parent)) {
284       region.getFilesystem().mkdirs(parent);
285     }
286     if (!region.getFilesystem().rename(sourceFile, dstPath)) {
287       throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
288     }
289   }
290 
291   /**
292    * Validates a mob file by opening and closing it.
293    *
294    * @param path the path to the mob file
295    */
296   private void validateMobFile(Path path) throws IOException {
297     StoreFile storeFile = null;
298     try {
299       storeFile =
300           new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
301       storeFile.createReader();
302     } catch (IOException e) {
303       LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
304       throw e;
305     } finally {
306       if (storeFile != null) {
307         storeFile.closeReader(false);
308       }
309     }
310   }
311 
312   /**
313    * Reads the cell from the mob file, and the read point does not count.
314    * This is used for DefaultMobStoreCompactor where we can read empty value for the missing cell.
315    * @param reference The cell found in the HBase, its value is a path to a mob file.
316    * @param cacheBlocks Whether the scanner should cache blocks.
317    * @return The cell found in the mob file.
318    * @throws IOException
319    */
320   public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
321     return resolve(reference, cacheBlocks, -1, true);
322   }
323 
324   /**
325    * Reads the cell from the mob file.
326    * @param reference The cell found in the HBase, its value is a path to a mob file.
327    * @param cacheBlocks Whether the scanner should cache blocks.
328    * @param readPt the read point.
329    * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is
330    *        missing or corrupt.
331    * @return The cell found in the mob file.
332    * @throws IOException
333    */
334   public Cell resolve(Cell reference, boolean cacheBlocks, long readPt,
335     boolean readEmptyValueOnMobCellMiss) throws IOException {
336     Cell result = null;
337     if (MobUtils.hasValidMobRefCellValue(reference)) {
338       String fileName = MobUtils.getMobFileName(reference);
339       Tag tableNameTag = MobUtils.getTableNameTag(reference);
340       if (tableNameTag != null) {
341         byte[] tableName = tableNameTag.getValue();
342         String tableNameString = Bytes.toString(tableName);
343         List<Path> locations = map.get(tableNameString);
344         if (locations == null) {
345           IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode());
346           try {
347             locations = map.get(tableNameString);
348             if (locations == null) {
349               locations = new ArrayList<Path>(2);
350               TableName tn = TableName.valueOf(tableName);
351               locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
352               locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
353                   .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
354               map.put(tableNameString, locations);
355             }
356           } finally {
357             keyLock.releaseLockEntry(lockEntry);
358           }
359         }
360         result = readCell(locations, fileName, reference, cacheBlocks, readPt,
361           readEmptyValueOnMobCellMiss);
362       }
363     }
364     if (result == null) {
365       LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
366           + "qualifier,timestamp,type and tags but with an empty value to return.");
367       result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
368           reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
369           reference.getFamilyLength(), reference.getQualifierArray(),
370           reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
371           Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
372           0, 0, reference.getTagsArray(), reference.getTagsOffset(),
373           reference.getTagsLength());
374     }
375     return result;
376   }
377 
378   /**
379    * Reads the cell from a mob file.
380    * The mob file might be located in different directories.
381    * 1. The working directory.
382    * 2. The archive directory.
383    * Reads the cell from the files located in both of the above directories.
384    * @param locations The possible locations where the mob files are saved.
385    * @param fileName The file to be read.
386    * @param search The cell to be searched.
387    * @param cacheMobBlocks Whether the scanner should cache blocks.
388    * @param readPt the read point.
389    * @param readEmptyValueOnMobCellMiss Whether return null value when the mob file is
390    *        missing or corrupt.
391    * @return The found cell. Null if there's no such a cell.
392    * @throws IOException
393    */
394   private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
395     long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
396     FileSystem fs = getFileSystem();
397     Throwable throwable = null;
398     for (Path location : locations) {
399       MobFile file = null;
400       Path path = new Path(location, fileName);
401       try {
402         file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
403         return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search,
404           cacheMobBlocks);
405       } catch (IOException e) {
406         mobCacheConfig.getMobFileCache().evictFile(fileName);
407         throwable = e;
408         if ((e instanceof FileNotFoundException) ||
409             (e.getCause() instanceof FileNotFoundException)) {
410           LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
411         } else if (e instanceof CorruptHFileException) {
412           LOG.error("The mob file " + path + " is corrupt", e);
413           break;
414         } else {
415           throw e;
416         }
417       } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
418         mobCacheConfig.getMobFileCache().evictFile(fileName);
419         LOG.warn("Fail to read the cell", e);
420         throwable = e;
421       } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
422         mobCacheConfig.getMobFileCache().evictFile(fileName);
423         LOG.warn("Fail to read the cell", e);
424         throwable = e;
425       } finally {
426         if (file != null) {
427           mobCacheConfig.getMobFileCache().closeFile(file);
428         }
429       }
430     }
431     LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
432       + " or it is corrupt");
433     if (readEmptyValueOnMobCellMiss) {
434       return null;
435     } else if (throwable instanceof IOException) {
436       throw (IOException) throwable;
437     } else {
438       throw new IOException(throwable);
439     }
440   }
441 
442   /**
443    * Gets the mob file path.
444    * @return The mob file path.
445    */
446   public Path getPath() {
447     return mobFamilyPath;
448   }
449 
450   /**
451    * The compaction in the store of mob.
452    * The cells in this store contains the path of the mob files. There might be race
453    * condition between the major compaction and the sweeping in mob files.
454    * In order to avoid this, we need mutually exclude the running of the major compaction and
455    * sweeping in mob files.
456    * The minor compaction is not affected.
457    * The major compaction is marked as retainDeleteMarkers when a sweeping is in progress.
458    */
459   @Override
460   public List<StoreFile> compact(CompactionContext compaction,
461       CompactionThroughputController throughputController) throws IOException {
462     // If it's major compaction, try to find whether there's a sweeper is running
463     // If yes, mark the major compaction as retainDeleteMarkers
464     if (compaction.getRequest().isAllFiles()) {
465       // Use the Zookeeper to coordinate.
466       // 1. Acquire a operation lock.
467       //   1.1. If no, mark the major compaction as retainDeleteMarkers and continue the compaction.
468       //   1.2. If the lock is obtained, search the node of sweeping.
469       //      1.2.1. If the node is there, the sweeping is in progress, mark the major
470       //             compaction as retainDeleteMarkers and continue the compaction.
471       //      1.2.2. If the node is not there, add a child to the major compaction node, and
472       //             run the compaction directly.
473       TableLock lock = null;
474       if (tableLockManager != null) {
475         lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore");
476       }
477       boolean tableLocked = false;
478       String tableName = getTableName().getNameAsString();
479       if (lock != null) {
480         try {
481           LOG.info("Start to acquire a read lock for the table[" + tableName
482               + "], ready to perform the major compaction");
483           lock.acquire();
484           tableLocked = true;
485         } catch (Exception e) {
486           LOG.error("Fail to lock the table " + tableName, e);
487         }
488       } else {
489         // If the tableLockManager is null, mark the tableLocked as true.
490         tableLocked = true;
491       }
492       try {
493         if (!tableLocked) {
494           LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table["
495               + tableName + "], forcing the delete markers to be retained");
496           compaction.getRequest().forceRetainDeleteMarkers();
497         }
498         return super.compact(compaction, throughputController);
499       } finally {
500         if (tableLocked && lock != null) {
501           try {
502             lock.release();
503           } catch (IOException e) {
504             LOG.error("Fail to release the table lock " + tableName, e);
505           }
506         }
507       }
508     } else {
509       // If it's not a major compaction, continue the compaction.
510       return super.compact(compaction, throughputController);
511     }
512   }
513 
514   public void updateCellsCountCompactedToMob(long count) {
515     cellsCountCompactedToMob += count;
516   }
517 
518   public long getCellsCountCompactedToMob() {
519     return cellsCountCompactedToMob;
520   }
521 
522   public void updateCellsCountCompactedFromMob(long count) {
523     cellsCountCompactedFromMob += count;
524   }
525 
526   public long getCellsCountCompactedFromMob() {
527     return cellsCountCompactedFromMob;
528   }
529 
530   public void updateCellsSizeCompactedToMob(long size) {
531     cellsSizeCompactedToMob += size;
532   }
533 
534   public long getCellsSizeCompactedToMob() {
535     return cellsSizeCompactedToMob;
536   }
537 
538   public void updateCellsSizeCompactedFromMob(long size) {
539     cellsSizeCompactedFromMob += size;
540   }
541 
542   public long getCellsSizeCompactedFromMob() {
543     return cellsSizeCompactedFromMob;
544   }
545 
546   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT")
547   public void updateMobFlushCount() {
548     mobFlushCount++;
549   }
550 
551   public long getMobFlushCount() {
552     return mobFlushCount;
553   }
554 
555   public void updateMobFlushedCellsCount(long count) {
556     mobFlushedCellsCount += count;
557   }
558 
559   public long getMobFlushedCellsCount() {
560     return mobFlushedCellsCount;
561   }
562 
563   public void updateMobFlushedCellsSize(long size) {
564     mobFlushedCellsSize += size;
565   }
566 
567   public long getMobFlushedCellsSize() {
568     return mobFlushedCellsSize;
569   }
570 
571   public void updateMobScanCellsCount(long count) {
572     mobScanCellsCount += count;
573   }
574 
575   public long getMobScanCellsCount() {
576     return mobScanCellsCount;
577   }
578 
579   public void updateMobScanCellsSize(long size) {
580     mobScanCellsSize += size;
581   }
582 
583   public long getMobScanCellsSize() {
584     return mobScanCellsSize;
585   }
586 }