1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver;
19
20 import java.io.FileNotFoundException;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Date;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NavigableSet;
27 import java.util.UUID;
28 import java.util.concurrent.ConcurrentHashMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellComparator;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.KeyValue.Type;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.Tag;
43 import org.apache.hadoop.hbase.classification.InterfaceAudience;
44 import org.apache.hadoop.hbase.client.Scan;
45 import org.apache.hadoop.hbase.filter.Filter;
46 import org.apache.hadoop.hbase.filter.FilterList;
47 import org.apache.hadoop.hbase.io.compress.Compression;
48 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
49 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
50 import org.apache.hadoop.hbase.io.hfile.HFileContext;
51 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
52 import org.apache.hadoop.hbase.master.TableLockManager;
53 import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
54 import org.apache.hadoop.hbase.mob.MobCacheConfig;
55 import org.apache.hadoop.hbase.mob.MobConstants;
56 import org.apache.hadoop.hbase.mob.MobFile;
57 import org.apache.hadoop.hbase.mob.MobFileName;
58 import org.apache.hadoop.hbase.mob.MobStoreEngine;
59 import org.apache.hadoop.hbase.mob.MobUtils;
60 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
61 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
65 import org.apache.hadoop.hbase.util.IdLock;
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @InterfaceAudience.Private
84 public class HMobStore extends HStore {
85 private static final Log LOG = LogFactory.getLog(HMobStore.class);
86 private MobCacheConfig mobCacheConfig;
87 private Path homePath;
88 private Path mobFamilyPath;
89 private volatile long cellsCountCompactedToMob = 0;
90 private volatile long cellsCountCompactedFromMob = 0;
91 private volatile long cellsSizeCompactedToMob = 0;
92 private volatile long cellsSizeCompactedFromMob = 0;
93 private volatile long mobFlushCount = 0;
94 private volatile long mobFlushedCellsCount = 0;
95 private volatile long mobFlushedCellsSize = 0;
96 private volatile long mobScanCellsCount = 0;
97 private volatile long mobScanCellsSize = 0;
98 private HColumnDescriptor family;
99 private TableLockManager tableLockManager;
100 private TableName tableLockName;
101 private Map<String, List<Path>> map = new ConcurrentHashMap<String, List<Path>>();
102 private final IdLock keyLock = new IdLock();
103
104 public HMobStore(final HRegion region, final HColumnDescriptor family,
105 final Configuration confParam) throws IOException {
106 super(region, family, confParam);
107 this.family = family;
108 this.mobCacheConfig = (MobCacheConfig) cacheConf;
109 this.homePath = MobUtils.getMobHome(conf);
110 this.mobFamilyPath = MobUtils.getMobFamilyPath(conf, this.getTableName(),
111 family.getNameAsString());
112 List<Path> locations = new ArrayList<Path>(2);
113 locations.add(mobFamilyPath);
114 TableName tn = region.getTableDesc().getTableName();
115 locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
116 .getEncodedName(), family.getNameAsString()));
117 map.put(Bytes.toString(tn.getName()), locations);
118 if (region.getRegionServerServices() != null) {
119 tableLockManager = region.getRegionServerServices().getTableLockManager();
120 tableLockName = MobUtils.getTableLockName(getTableName());
121 }
122 }
123
124
125
126
127 @Override
128 protected void createCacheConf(HColumnDescriptor family) {
129 cacheConf = new MobCacheConfig(conf, family);
130 }
131
132
133
134
135 public Configuration getConfiguration() {
136 return this.conf;
137 }
138
139
140
141
142
143 @Override
144 protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
145 long readPt, KeyValueScanner scanner) throws IOException {
146 if (scanner == null) {
147 if (MobUtils.isRefOnlyScan(scan)) {
148 Filter refOnlyFilter = new MobReferenceOnlyFilter();
149 Filter filter = scan.getFilter();
150 if (filter != null) {
151 scan.setFilter(new FilterList(filter, refOnlyFilter));
152 } else {
153 scan.setFilter(refOnlyFilter);
154 }
155 }
156 scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
157 targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
158 }
159 return scanner;
160 }
161
162
163
164
165 @Override
166 protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf,
167 CellComparator cellComparator) throws IOException {
168 MobStoreEngine engine = new MobStoreEngine();
169 engine.createComponents(conf, store, cellComparator);
170 return engine;
171 }
172
173
174
175
176
177 private Path getTempDir() {
178 return new Path(homePath, MobConstants.TEMP_DIR_NAME);
179 }
180
181
182
183
184
185
186
187
188
189
190 public StoreFile.Writer createWriterInTmp(Date date, long maxKeyCount,
191 Compression.Algorithm compression, byte[] startKey) throws IOException {
192 if (startKey == null) {
193 startKey = HConstants.EMPTY_START_ROW;
194 }
195 Path path = getTempDir();
196 return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
197 }
198
199
200
201
202
203
204
205
206
207
208
209
210 public StoreFile.Writer createDelFileWriterInTmp(Date date, long maxKeyCount,
211 Compression.Algorithm compression, byte[] startKey) throws IOException {
212 if (startKey == null) {
213 startKey = HConstants.EMPTY_START_ROW;
214 }
215 Path path = getTempDir();
216 String suffix = UUID
217 .randomUUID().toString().replaceAll("-", "") + "_del";
218 MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
219 return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
220 }
221
222
223
224
225
226
227
228
229
230
231
232 public StoreFile.Writer createWriterInTmp(String date, Path basePath, long maxKeyCount,
233 Compression.Algorithm compression, byte[] startKey) throws IOException {
234 MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
235 .toString().replaceAll("-", ""));
236 return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
237 }
238
239
240
241
242
243
244
245
246
247
248 public StoreFile.Writer createWriterInTmp(MobFileName mobFileName, Path basePath,
249 long maxKeyCount, Compression.Algorithm compression) throws IOException {
250 final CacheConfig writerCacheConf = mobCacheConfig;
251 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
252 .withIncludesMvcc(true).withIncludesTags(true)
253 .withCompressTags(family.isCompressTags())
254 .withChecksumType(checksumType)
255 .withBytesPerCheckSum(bytesPerChecksum)
256 .withBlockSize(blocksize)
257 .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding())
258 .withEncryptionContext(cryptoContext)
259 .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
260
261 StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, region.getFilesystem())
262 .withFilePath(new Path(basePath, mobFileName.getFileName()))
263 .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
264 .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
265 return w;
266 }
267
268
269
270
271
272
273
274 public void commitFile(final Path sourceFile, Path targetPath) throws IOException {
275 if (sourceFile == null) {
276 return;
277 }
278 Path dstPath = new Path(targetPath, sourceFile.getName());
279 validateMobFile(sourceFile);
280 String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
281 LOG.info(msg);
282 Path parent = dstPath.getParent();
283 if (!region.getFilesystem().exists(parent)) {
284 region.getFilesystem().mkdirs(parent);
285 }
286 if (!region.getFilesystem().rename(sourceFile, dstPath)) {
287 throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
288 }
289 }
290
291
292
293
294
295
296 private void validateMobFile(Path path) throws IOException {
297 StoreFile storeFile = null;
298 try {
299 storeFile =
300 new StoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE);
301 storeFile.createReader();
302 } catch (IOException e) {
303 LOG.error("Fail to open mob file[" + path + "], keep it in temp directory.", e);
304 throw e;
305 } finally {
306 if (storeFile != null) {
307 storeFile.closeReader(false);
308 }
309 }
310 }
311
312
313
314
315
316
317
318
319
320 public Cell resolve(Cell reference, boolean cacheBlocks) throws IOException {
321 return resolve(reference, cacheBlocks, -1, true);
322 }
323
324
325
326
327
328
329
330
331
332
333
334 public Cell resolve(Cell reference, boolean cacheBlocks, long readPt,
335 boolean readEmptyValueOnMobCellMiss) throws IOException {
336 Cell result = null;
337 if (MobUtils.hasValidMobRefCellValue(reference)) {
338 String fileName = MobUtils.getMobFileName(reference);
339 Tag tableNameTag = MobUtils.getTableNameTag(reference);
340 if (tableNameTag != null) {
341 byte[] tableName = tableNameTag.getValue();
342 String tableNameString = Bytes.toString(tableName);
343 List<Path> locations = map.get(tableNameString);
344 if (locations == null) {
345 IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode());
346 try {
347 locations = map.get(tableNameString);
348 if (locations == null) {
349 locations = new ArrayList<Path>(2);
350 TableName tn = TableName.valueOf(tableName);
351 locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
352 locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils
353 .getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
354 map.put(tableNameString, locations);
355 }
356 } finally {
357 keyLock.releaseLockEntry(lockEntry);
358 }
359 }
360 result = readCell(locations, fileName, reference, cacheBlocks, readPt,
361 readEmptyValueOnMobCellMiss);
362 }
363 }
364 if (result == null) {
365 LOG.warn("The KeyValue result is null, assemble a new KeyValue with the same row,family,"
366 + "qualifier,timestamp,type and tags but with an empty value to return.");
367 result = new KeyValue(reference.getRowArray(), reference.getRowOffset(),
368 reference.getRowLength(), reference.getFamilyArray(), reference.getFamilyOffset(),
369 reference.getFamilyLength(), reference.getQualifierArray(),
370 reference.getQualifierOffset(), reference.getQualifierLength(), reference.getTimestamp(),
371 Type.codeToType(reference.getTypeByte()), HConstants.EMPTY_BYTE_ARRAY,
372 0, 0, reference.getTagsArray(), reference.getTagsOffset(),
373 reference.getTagsLength());
374 }
375 return result;
376 }
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394 private Cell readCell(List<Path> locations, String fileName, Cell search, boolean cacheMobBlocks,
395 long readPt, boolean readEmptyValueOnMobCellMiss) throws IOException {
396 FileSystem fs = getFileSystem();
397 Throwable throwable = null;
398 for (Path location : locations) {
399 MobFile file = null;
400 Path path = new Path(location, fileName);
401 try {
402 file = mobCacheConfig.getMobFileCache().openFile(fs, path, mobCacheConfig);
403 return readPt != -1 ? file.readCell(search, cacheMobBlocks, readPt) : file.readCell(search,
404 cacheMobBlocks);
405 } catch (IOException e) {
406 mobCacheConfig.getMobFileCache().evictFile(fileName);
407 throwable = e;
408 if ((e instanceof FileNotFoundException) ||
409 (e.getCause() instanceof FileNotFoundException)) {
410 LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
411 } else if (e instanceof CorruptHFileException) {
412 LOG.error("The mob file " + path + " is corrupt", e);
413 break;
414 } else {
415 throw e;
416 }
417 } catch (NullPointerException e) {
418 mobCacheConfig.getMobFileCache().evictFile(fileName);
419 LOG.warn("Fail to read the cell", e);
420 throwable = e;
421 } catch (AssertionError e) {
422 mobCacheConfig.getMobFileCache().evictFile(fileName);
423 LOG.warn("Fail to read the cell", e);
424 throwable = e;
425 } finally {
426 if (file != null) {
427 mobCacheConfig.getMobFileCache().closeFile(file);
428 }
429 }
430 }
431 LOG.error("The mob file " + fileName + " could not be found in the locations " + locations
432 + " or it is corrupt");
433 if (readEmptyValueOnMobCellMiss) {
434 return null;
435 } else if (throwable instanceof IOException) {
436 throw (IOException) throwable;
437 } else {
438 throw new IOException(throwable);
439 }
440 }
441
442
443
444
445
446 public Path getPath() {
447 return mobFamilyPath;
448 }
449
450
451
452
453
454
455
456
457
458
459 @Override
460 public List<StoreFile> compact(CompactionContext compaction,
461 CompactionThroughputController throughputController) throws IOException {
462
463
464 if (compaction.getRequest().isAllFiles()) {
465
466
467
468
469
470
471
472
473 TableLock lock = null;
474 if (tableLockManager != null) {
475 lock = tableLockManager.readLock(tableLockName, "Major compaction in HMobStore");
476 }
477 boolean tableLocked = false;
478 String tableName = getTableName().getNameAsString();
479 if (lock != null) {
480 try {
481 LOG.info("Start to acquire a read lock for the table[" + tableName
482 + "], ready to perform the major compaction");
483 lock.acquire();
484 tableLocked = true;
485 } catch (Exception e) {
486 LOG.error("Fail to lock the table " + tableName, e);
487 }
488 } else {
489
490 tableLocked = true;
491 }
492 try {
493 if (!tableLocked) {
494 LOG.warn("Cannot obtain the table lock, maybe a sweep tool is running on this table["
495 + tableName + "], forcing the delete markers to be retained");
496 compaction.getRequest().forceRetainDeleteMarkers();
497 }
498 return super.compact(compaction, throughputController);
499 } finally {
500 if (tableLocked && lock != null) {
501 try {
502 lock.release();
503 } catch (IOException e) {
504 LOG.error("Fail to release the table lock " + tableName, e);
505 }
506 }
507 }
508 } else {
509
510 return super.compact(compaction, throughputController);
511 }
512 }
513
514 public void updateCellsCountCompactedToMob(long count) {
515 cellsCountCompactedToMob += count;
516 }
517
518 public long getCellsCountCompactedToMob() {
519 return cellsCountCompactedToMob;
520 }
521
522 public void updateCellsCountCompactedFromMob(long count) {
523 cellsCountCompactedFromMob += count;
524 }
525
526 public long getCellsCountCompactedFromMob() {
527 return cellsCountCompactedFromMob;
528 }
529
530 public void updateCellsSizeCompactedToMob(long size) {
531 cellsSizeCompactedToMob += size;
532 }
533
534 public long getCellsSizeCompactedToMob() {
535 return cellsSizeCompactedToMob;
536 }
537
538 public void updateCellsSizeCompactedFromMob(long size) {
539 cellsSizeCompactedFromMob += size;
540 }
541
542 public long getCellsSizeCompactedFromMob() {
543 return cellsSizeCompactedFromMob;
544 }
545
546 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT")
547 public void updateMobFlushCount() {
548 mobFlushCount++;
549 }
550
551 public long getMobFlushCount() {
552 return mobFlushCount;
553 }
554
555 public void updateMobFlushedCellsCount(long count) {
556 mobFlushedCellsCount += count;
557 }
558
559 public long getMobFlushedCellsCount() {
560 return mobFlushedCellsCount;
561 }
562
563 public void updateMobFlushedCellsSize(long size) {
564 mobFlushedCellsSize += size;
565 }
566
567 public long getMobFlushedCellsSize() {
568 return mobFlushedCellsSize;
569 }
570
571 public void updateMobScanCellsCount(long count) {
572 mobScanCellsCount += count;
573 }
574
575 public long getMobScanCellsCount() {
576 return mobScanCellsCount;
577 }
578
579 public void updateMobScanCellsSize(long size) {
580 mobScanCellsSize += size;
581 }
582
583 public long getMobScanCellsSize() {
584 return mobScanCellsSize;
585 }
586 }