1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.lang.reflect.Constructor;
26 import java.text.ParseException;
27 import java.util.AbstractList;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.ListIterator;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.NavigableMap;
40 import java.util.NavigableSet;
41 import java.util.RandomAccess;
42 import java.util.Set;
43 import java.util.TreeMap;
44 import java.util.concurrent.Callable;
45 import java.util.concurrent.CompletionService;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.ConcurrentMap;
48 import java.util.concurrent.ConcurrentSkipListMap;
49 import java.util.concurrent.ExecutionException;
50 import java.util.concurrent.ExecutorCompletionService;
51 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.FutureTask;
55 import java.util.concurrent.ThreadFactory;
56 import java.util.concurrent.ThreadPoolExecutor;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.atomic.AtomicLong;
62 import java.util.concurrent.locks.Lock;
63 import java.util.concurrent.locks.ReadWriteLock;
64 import java.util.concurrent.locks.ReentrantReadWriteLock;
65
66 import org.apache.commons.logging.Log;
67 import org.apache.commons.logging.LogFactory;
68 import org.apache.hadoop.conf.Configuration;
69 import org.apache.hadoop.fs.FileStatus;
70 import org.apache.hadoop.fs.FileSystem;
71 import org.apache.hadoop.fs.Path;
72 import org.apache.hadoop.hbase.Cell;
73 import org.apache.hadoop.hbase.CellComparator;
74 import org.apache.hadoop.hbase.CellScanner;
75 import org.apache.hadoop.hbase.CellUtil;
76 import org.apache.hadoop.hbase.CompoundConfiguration;
77 import org.apache.hadoop.hbase.DoNotRetryIOException;
78 import org.apache.hadoop.hbase.DroppedSnapshotException;
79 import org.apache.hadoop.hbase.HBaseConfiguration;
80 import org.apache.hadoop.hbase.HColumnDescriptor;
81 import org.apache.hadoop.hbase.HConstants;
82 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
83 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
84 import org.apache.hadoop.hbase.HRegionInfo;
85 import org.apache.hadoop.hbase.HTableDescriptor;
86 import org.apache.hadoop.hbase.KeyValue;
87 import org.apache.hadoop.hbase.KeyValueUtil;
88 import org.apache.hadoop.hbase.NamespaceDescriptor;
89 import org.apache.hadoop.hbase.NotServingRegionException;
90 import org.apache.hadoop.hbase.RegionTooBusyException;
91 import org.apache.hadoop.hbase.ShareableMemory;
92 import org.apache.hadoop.hbase.TableName;
93 import org.apache.hadoop.hbase.Tag;
94 import org.apache.hadoop.hbase.TagRewriteCell;
95 import org.apache.hadoop.hbase.TagType;
96 import org.apache.hadoop.hbase.UnknownScannerException;
97 import org.apache.hadoop.hbase.backup.HFileArchiver;
98 import org.apache.hadoop.hbase.classification.InterfaceAudience;
99 import org.apache.hadoop.hbase.client.Append;
100 import org.apache.hadoop.hbase.client.Delete;
101 import org.apache.hadoop.hbase.client.Durability;
102 import org.apache.hadoop.hbase.client.Get;
103 import org.apache.hadoop.hbase.client.Increment;
104 import org.apache.hadoop.hbase.client.IsolationLevel;
105 import org.apache.hadoop.hbase.client.Mutation;
106 import org.apache.hadoop.hbase.client.Put;
107 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
108 import org.apache.hadoop.hbase.client.Result;
109 import org.apache.hadoop.hbase.client.RowMutations;
110 import org.apache.hadoop.hbase.client.Scan;
111 import org.apache.hadoop.hbase.conf.ConfigurationManager;
112 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
113 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
114 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
115 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
116 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
117 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
118 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
119 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
120 import org.apache.hadoop.hbase.filter.FilterWrapper;
121 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
122 import org.apache.hadoop.hbase.io.HeapSize;
123 import org.apache.hadoop.hbase.io.TimeRange;
124 import org.apache.hadoop.hbase.io.hfile.BlockCache;
125 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
126 import org.apache.hadoop.hbase.io.hfile.HFile;
127 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
128 import org.apache.hadoop.hbase.ipc.RpcCallContext;
129 import org.apache.hadoop.hbase.ipc.RpcServer;
130 import org.apache.hadoop.hbase.mob.MobUtils;
131 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
132 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
133 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
134 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
135 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
136 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
137 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
138 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionLoad;
139 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
140 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
141 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
142 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
143 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
144 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
145 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor;
146 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
147 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
148 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
149 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
150 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
151 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
152 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
153 import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
154 import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
155 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
156 import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
157 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
158 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
159 import org.apache.hadoop.hbase.security.User;
160 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
161 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
162 import org.apache.hadoop.hbase.util.ByteStringer;
163 import org.apache.hadoop.hbase.util.Bytes;
164 import org.apache.hadoop.hbase.util.CancelableProgressable;
165 import org.apache.hadoop.hbase.util.ClassSize;
166 import org.apache.hadoop.hbase.util.CompressionTest;
167 import org.apache.hadoop.hbase.util.Counter;
168 import org.apache.hadoop.hbase.util.EncryptionTest;
169 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
170 import org.apache.hadoop.hbase.util.FSTableDescriptors;
171 import org.apache.hadoop.hbase.util.FSUtils;
172 import org.apache.hadoop.hbase.util.HashedBytes;
173 import org.apache.hadoop.hbase.util.Pair;
174 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
175 import org.apache.hadoop.hbase.util.Threads;
176 import org.apache.hadoop.hbase.wal.WAL;
177 import org.apache.hadoop.hbase.wal.WALFactory;
178 import org.apache.hadoop.hbase.wal.WALKey;
179 import org.apache.hadoop.hbase.wal.WALSplitter;
180 import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
181 import org.apache.hadoop.io.MultipleIOException;
182 import org.apache.hadoop.util.StringUtils;
183 import org.apache.htrace.Trace;
184 import org.apache.htrace.TraceScope;
185
186 import com.google.common.annotations.VisibleForTesting;
187 import com.google.common.base.Optional;
188 import com.google.common.base.Preconditions;
189 import com.google.common.collect.Lists;
190 import com.google.common.collect.Maps;
191 import com.google.common.io.Closeables;
192 import com.google.protobuf.ByteString;
193 import com.google.protobuf.Descriptors;
194 import com.google.protobuf.Message;
195 import com.google.protobuf.RpcCallback;
196 import com.google.protobuf.RpcController;
197 import com.google.protobuf.Service;
198 import com.google.protobuf.TextFormat;
199
200 @InterfaceAudience.Private
201 public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
202 private static final Log LOG = LogFactory.getLog(HRegion.class);
203
204 public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
205 "hbase.hregion.scan.loadColumnFamiliesOnDemand";
206
207
208
209
210
211
212
213
214
215 private final int maxWaitForSeqId;
216 private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms";
217 private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000;
218
219
220
221
222
223 private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
224
225 final AtomicBoolean closed = new AtomicBoolean(false);
226
227
228
229
230
231
232 final AtomicBoolean closing = new AtomicBoolean(false);
233
234
235
236
237
238 private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
239
240
241
242
243
244
245 private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
246
247
248
249
250
251
252 protected volatile long lastReplayedOpenRegionSeqId = -1L;
253 protected volatile long lastReplayedCompactionSeqId = -1L;
254
255
256
257
258
259
260
261
262
263
264 private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
265 new ConcurrentHashMap<HashedBytes, RowLockContext>();
266
267 protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(
268 Bytes.BYTES_RAWCOMPARATOR);
269
270
271 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
272
273 private final AtomicLong memstoreSize = new AtomicLong(0);
274
275
276 final Counter numMutationsWithoutWAL = new Counter();
277 final Counter dataInMemoryWithoutWAL = new Counter();
278
279
280 final Counter checkAndMutateChecksPassed = new Counter();
281 final Counter checkAndMutateChecksFailed = new Counter();
282
283
284 final Counter readRequestsCount = new Counter();
285 final Counter writeRequestsCount = new Counter();
286
287
288 private final Counter blockedRequestsCount = new Counter();
289
290
291 final AtomicLong compactionsFinished = new AtomicLong(0L);
292 final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L);
293 final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L);
294
295 private final WAL wal;
296 private final HRegionFileSystem fs;
297 protected final Configuration conf;
298 private final Configuration baseConf;
299 private final int rowLockWaitDuration;
300 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
301
302
303
304
305
306
307
308 final long busyWaitDuration;
309 static final long DEFAULT_BUSY_WAIT_DURATION = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
310
311
312
313
314 final int maxBusyWaitMultiplier;
315
316
317
318 final long maxBusyWaitDuration;
319
320
321 static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
322 final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
323
324 private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
325
326
327
328
329 private long openSeqNum = HConstants.NO_SEQNUM;
330
331
332
333
334
335 private boolean isLoadingCfsOnDemandDefault = false;
336
337 private final AtomicInteger majorInProgress = new AtomicInteger(0);
338 private final AtomicInteger minorInProgress = new AtomicInteger(0);
339
340
341
342
343
344
345
346 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
347
348
349 private PrepareFlushResult prepareFlushResult = null;
350
351
352
353
354 private boolean disallowWritesInRecovering = false;
355
356
357 private volatile boolean recovering = false;
358
359 private volatile Optional<ConfigurationManager> configurationManager;
360
361
362
363
364
365
366 public long getSmallestReadPoint() {
367 long minimumReadPoint;
368
369
370
371 synchronized(scannerReadPoints) {
372 minimumReadPoint = mvcc.getReadPoint();
373
374 for (Long readPoint: this.scannerReadPoints.values()) {
375 if (readPoint < minimumReadPoint) {
376 minimumReadPoint = readPoint;
377 }
378 }
379 }
380 return minimumReadPoint;
381 }
382
383
384
385
386
387 static class WriteState {
388
389 volatile boolean flushing = false;
390
391 volatile boolean flushRequested = false;
392
393 volatile int compacting = 0;
394
395 volatile boolean writesEnabled = true;
396
397 volatile boolean readOnly = false;
398
399
400 volatile boolean readsEnabled = true;
401
402
403
404
405
406
407 synchronized void setReadOnly(final boolean onOff) {
408 this.writesEnabled = !onOff;
409 this.readOnly = onOff;
410 }
411
412 boolean isReadOnly() {
413 return this.readOnly;
414 }
415
416 boolean isFlushRequested() {
417 return this.flushRequested;
418 }
419
420 void setReadsEnabled(boolean readsEnabled) {
421 this.readsEnabled = readsEnabled;
422 }
423
424 static final long HEAP_SIZE = ClassSize.align(
425 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
426 }
427
428
429
430
431
432
433
434 public static class FlushResultImpl implements FlushResult {
435 final Result result;
436 final String failureReason;
437 final long flushSequenceId;
438 final boolean wroteFlushWalMarker;
439
440
441
442
443
444
445
446
447 FlushResultImpl(Result result, long flushSequenceId) {
448 this(result, flushSequenceId, null, false);
449 assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
450 .FLUSHED_COMPACTION_NEEDED;
451 }
452
453
454
455
456
457
458 FlushResultImpl(Result result, String failureReason, boolean wroteFlushMarker) {
459 this(result, -1, failureReason, wroteFlushMarker);
460 assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
461 }
462
463
464
465
466
467
468
469 FlushResultImpl(Result result, long flushSequenceId, String failureReason,
470 boolean wroteFlushMarker) {
471 this.result = result;
472 this.flushSequenceId = flushSequenceId;
473 this.failureReason = failureReason;
474 this.wroteFlushWalMarker = wroteFlushMarker;
475 }
476
477
478
479
480
481
482 @Override
483 public boolean isFlushSucceeded() {
484 return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
485 .FLUSHED_COMPACTION_NEEDED;
486 }
487
488
489
490
491
492 @Override
493 public boolean isCompactionNeeded() {
494 return result == Result.FLUSHED_COMPACTION_NEEDED;
495 }
496
497 @Override
498 public String toString() {
499 return new StringBuilder()
500 .append("flush result:").append(result).append(", ")
501 .append("failureReason:").append(failureReason).append(",")
502 .append("flush seq id").append(flushSequenceId).toString();
503 }
504
505 @Override
506 public Result getResult() {
507 return result;
508 }
509 }
510
511
512 @VisibleForTesting
513 static class PrepareFlushResult {
514 final FlushResult result;
515 final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
516 final TreeMap<byte[], List<Path>> committedFiles;
517 final TreeMap<byte[], Long> storeFlushableSize;
518 final long startTime;
519 final long flushOpSeqId;
520 final long flushedSeqId;
521 final long totalFlushableSize;
522
523
524 PrepareFlushResult(FlushResult result, long flushSeqId) {
525 this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
526 }
527
528
529 PrepareFlushResult(
530 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
531 TreeMap<byte[], List<Path>> committedFiles,
532 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
533 long flushedSeqId, long totalFlushableSize) {
534 this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
535 flushSeqId, flushedSeqId, totalFlushableSize);
536 }
537
538 private PrepareFlushResult(
539 FlushResult result,
540 TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
541 TreeMap<byte[], List<Path>> committedFiles,
542 TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
543 long flushedSeqId, long totalFlushableSize) {
544 this.result = result;
545 this.storeFlushCtxs = storeFlushCtxs;
546 this.committedFiles = committedFiles;
547 this.storeFlushableSize = storeFlushableSize;
548 this.startTime = startTime;
549 this.flushOpSeqId = flushSeqId;
550 this.flushedSeqId = flushedSeqId;
551 this.totalFlushableSize = totalFlushableSize;
552 }
553
554 public FlushResult getResult() {
555 return this.result;
556 }
557 }
558
559 final WriteState writestate = new WriteState();
560
561 long memstoreFlushSize;
562 final long timestampSlop;
563 final long rowProcessorTimeout;
564
565
566 private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap =
567 new ConcurrentHashMap<Store, Long>();
568
569 final RegionServerServices rsServices;
570 private RegionServerAccounting rsAccounting;
571 private long flushCheckInterval;
572
573 private long flushPerChanges;
574 private long blockingMemStoreSize;
575 final long threadWakeFrequency;
576
577 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
578
579
580 private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock();
581 private boolean splitRequest;
582 private byte[] explicitSplitPoint = null;
583
584 private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
585
586
587 private RegionCoprocessorHost coprocessorHost;
588
589 private HTableDescriptor htableDescriptor = null;
590 private RegionSplitPolicy splitPolicy;
591 private FlushPolicy flushPolicy;
592
593 private final MetricsRegion metricsRegion;
594 private final MetricsRegionWrapperImpl metricsRegionWrapper;
595 private final Durability durability;
596 private final boolean regionStatsEnabled;
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619 @Deprecated
620 @VisibleForTesting
621 public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
622 final Configuration confParam, final HRegionInfo regionInfo,
623 final HTableDescriptor htd, final RegionServerServices rsServices) {
624 this(new HRegionFileSystem(confParam, fs, tableDir, regionInfo),
625 wal, confParam, htd, rsServices);
626 }
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644 public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration confParam,
645 final HTableDescriptor htd, final RegionServerServices rsServices) {
646 if (htd == null) {
647 throw new IllegalArgumentException("Need table descriptor");
648 }
649
650 if (confParam instanceof CompoundConfiguration) {
651 throw new IllegalArgumentException("Need original base configuration");
652 }
653
654 this.wal = wal;
655 this.fs = fs;
656
657
658 this.baseConf = confParam;
659 this.conf = new CompoundConfiguration()
660 .add(confParam)
661 .addStringMap(htd.getConfiguration())
662 .addBytesMap(htd.getValues());
663 this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
664 DEFAULT_CACHE_FLUSH_INTERVAL);
665 this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
666 if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
667 throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
668 + MAX_FLUSH_PER_CHANGES);
669 }
670 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
671 DEFAULT_ROWLOCK_WAIT_DURATION);
672
673 this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID);
674 this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
675 this.htableDescriptor = htd;
676 this.rsServices = rsServices;
677 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
678 setHTableSpecificConf();
679 this.scannerReadPoints = new ConcurrentHashMap<RegionScanner, Long>();
680
681 this.busyWaitDuration = conf.getLong(
682 "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION);
683 this.maxBusyWaitMultiplier = conf.getInt("hbase.busy.wait.multiplier.max", 2);
684 if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) {
685 throw new IllegalArgumentException("Invalid hbase.busy.wait.duration ("
686 + busyWaitDuration + ") or hbase.busy.wait.multiplier.max ("
687 + maxBusyWaitMultiplier + "). Their product should be positive");
688 }
689 this.maxBusyWaitDuration = conf.getLong("hbase.ipc.client.call.purge.timeout",
690 2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
691
692
693
694
695
696
697
698 this.timestampSlop = conf.getLong(
699 "hbase.hregion.keyvalue.timestamp.slop.millisecs",
700 HConstants.LATEST_TIMESTAMP);
701
702
703
704
705
706 this.rowProcessorTimeout = conf.getLong(
707 "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
708 this.durability = htd.getDurability() == Durability.USE_DEFAULT
709 ? DEFAULT_DURABILITY
710 : htd.getDurability();
711 if (rsServices != null) {
712 this.rsAccounting = this.rsServices.getRegionServerAccounting();
713
714
715 this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
716 this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
717 this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
718
719 Map<String, Region> recoveringRegions = rsServices.getRecoveringRegions();
720 String encodedName = getRegionInfo().getEncodedName();
721 if (recoveringRegions != null && recoveringRegions.containsKey(encodedName)) {
722 this.recovering = true;
723 recoveringRegions.put(encodedName, this);
724 }
725 } else {
726 this.metricsRegionWrapper = null;
727 this.metricsRegion = null;
728 }
729 if (LOG.isDebugEnabled()) {
730
731 LOG.debug("Instantiated " + this);
732 }
733
734
735 this.disallowWritesInRecovering =
736 conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
737 HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
738 configurationManager = Optional.absent();
739
740
741 this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
742 NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR) ?
743 false :
744 conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
745 HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
746 }
747
748 void setHTableSpecificConf() {
749 if (this.htableDescriptor == null) return;
750 long flushSize = this.htableDescriptor.getMemStoreFlushSize();
751
752 if (flushSize <= 0) {
753 flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
754 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
755 }
756 this.memstoreFlushSize = flushSize;
757 this.blockingMemStoreSize = this.memstoreFlushSize *
758 conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
759 HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
760 }
761
762
763
764
765
766
767
768
769
770 @Deprecated
771 public long initialize() throws IOException {
772 return initialize(null);
773 }
774
775
776
777
778
779
780
781
782 private long initialize(final CancelableProgressable reporter) throws IOException {
783 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
784 long nextSeqId = -1;
785 try {
786 nextSeqId = initializeRegionInternals(reporter, status);
787 return nextSeqId;
788 } finally {
789
790
791 if (nextSeqId == -1) {
792 status.abort("Exception during region " + getRegionInfo().getRegionNameAsString() +
793 " initialization.");
794 }
795 }
796 }
797
798 private long initializeRegionInternals(final CancelableProgressable reporter,
799 final MonitoredTask status) throws IOException {
800 if (coprocessorHost != null) {
801 status.setStatus("Running coprocessor pre-open hook");
802 coprocessorHost.preOpen();
803 }
804
805
806 status.setStatus("Writing region info on filesystem");
807 fs.checkRegionInfoOnFilesystem();
808
809
810 status.setStatus("Initializing all the Stores");
811 long maxSeqId = initializeStores(reporter, status);
812 this.mvcc.advanceTo(maxSeqId);
813 if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
814
815 maxSeqId = Math.max(maxSeqId,
816 replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
817
818 this.mvcc.advanceTo(maxSeqId);
819 }
820 this.lastReplayedOpenRegionSeqId = maxSeqId;
821
822 this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
823 this.writestate.flushRequested = false;
824 this.writestate.compacting = 0;
825
826 if (this.writestate.writesEnabled) {
827
828 status.setStatus("Cleaning up temporary data from old regions");
829 fs.cleanupTempDir();
830 }
831
832 if (this.writestate.writesEnabled) {
833 status.setStatus("Cleaning up detritus from prior splits");
834
835
836
837 fs.cleanupAnySplitDetritus();
838 fs.cleanupMergesDir();
839 }
840
841
842 this.splitPolicy = RegionSplitPolicy.create(this, conf);
843
844
845 this.flushPolicy = FlushPolicyFactory.create(this, conf);
846
847 long lastFlushTime = EnvironmentEdgeManager.currentTime();
848 for (Store store: stores.values()) {
849 this.lastStoreFlushTimeMap.put(store, lastFlushTime);
850 }
851
852
853
854 long nextSeqid = maxSeqId;
855
856
857
858
859 if (this.writestate.writesEnabled) {
860 nextSeqid = WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs
861 .getRegionDir(), nextSeqid, (this.recovering ? (this.flushPerChanges + 10000000) : 1));
862 } else {
863 nextSeqid++;
864 }
865
866 LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
867 "; next sequenceid=" + nextSeqid);
868
869
870 this.closing.set(false);
871 this.closed.set(false);
872
873 if (coprocessorHost != null) {
874 status.setStatus("Running coprocessor post-open hooks");
875 coprocessorHost.postOpen();
876 }
877
878 status.markComplete("Region opened successfully");
879 return nextSeqid;
880 }
881
882
883
884
885
886
887
888
889 private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
890 throws IOException {
891
892
893 long maxSeqId = -1;
894
895 long maxMemstoreTS = -1;
896
897 if (!htableDescriptor.getFamilies().isEmpty()) {
898
899 ThreadPoolExecutor storeOpenerThreadPool =
900 getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
901 CompletionService<HStore> completionService =
902 new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
903
904
905 for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
906 status.setStatus("Instantiating store for column family " + family);
907 completionService.submit(new Callable<HStore>() {
908 @Override
909 public HStore call() throws IOException {
910 return instantiateHStore(family);
911 }
912 });
913 }
914 boolean allStoresOpened = false;
915 try {
916 for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
917 Future<HStore> future = completionService.take();
918 HStore store = future.get();
919 this.stores.put(store.getFamily().getName(), store);
920
921 long storeMaxSequenceId = store.getMaxSequenceId();
922 maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
923 storeMaxSequenceId);
924 if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
925 maxSeqId = storeMaxSequenceId;
926 }
927 long maxStoreMemstoreTS = store.getMaxMemstoreTS();
928 if (maxStoreMemstoreTS > maxMemstoreTS) {
929 maxMemstoreTS = maxStoreMemstoreTS;
930 }
931 }
932 allStoresOpened = true;
933 } catch (InterruptedException e) {
934 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
935 } catch (ExecutionException e) {
936 throw new IOException(e.getCause());
937 } finally {
938 storeOpenerThreadPool.shutdownNow();
939 if (!allStoresOpened) {
940
941 LOG.error("Could not initialize all stores for the region=" + this);
942 for (Store store : this.stores.values()) {
943 try {
944 store.close();
945 } catch (IOException e) {
946 LOG.warn(e.getMessage());
947 }
948 }
949 }
950 }
951 }
952 return Math.max(maxSeqId, maxMemstoreTS + 1);
953 }
954
955 private void initializeWarmup(final CancelableProgressable reporter) throws IOException {
956 MonitoredTask status = TaskMonitor.get().createStatus("Initializing region " + this);
957
958
959 status.setStatus("Warming up all the Stores");
960 initializeStores(reporter, status);
961 }
962
963 private void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
964 Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
965 for (Store store: getStores()) {
966 ArrayList<Path> storeFileNames = new ArrayList<Path>();
967 for (StoreFile storeFile: store.getStorefiles()) {
968 storeFileNames.add(storeFile.getPath());
969 }
970 storeFiles.put(store.getFamily().getName(), storeFileNames);
971 }
972
973 RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
974 RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
975 getRegionServerServices().getServerName(), storeFiles);
976 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
977 }
978
979 private void writeRegionCloseMarker(WAL wal) throws IOException {
980 Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
981 for (Store store: getStores()) {
982 ArrayList<Path> storeFileNames = new ArrayList<Path>();
983 for (StoreFile storeFile: store.getStorefiles()) {
984 storeFileNames.add(storeFile.getPath());
985 }
986 storeFiles.put(store.getFamily().getName(), storeFileNames);
987 }
988
989 RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
990 RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
991 getRegionServerServices().getServerName(), storeFiles);
992 WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
993
994
995
996
997 if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) {
998 WALSplitter.writeRegionSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(),
999 mvcc.getReadPoint(), 0);
1000 }
1001 }
1002
1003
1004
1005
1006 public boolean hasReferences() {
1007 for (Store store : this.stores.values()) {
1008 if (store.hasReferences()) return true;
1009 }
1010 return false;
1011 }
1012
1013 @Override
1014 public HDFSBlocksDistribution getHDFSBlocksDistribution() {
1015 HDFSBlocksDistribution hdfsBlocksDistribution =
1016 new HDFSBlocksDistribution();
1017 synchronized (this.stores) {
1018 for (Store store : this.stores.values()) {
1019 for (StoreFile sf : store.getStorefiles()) {
1020 HDFSBlocksDistribution storeFileBlocksDistribution =
1021 sf.getHDFSBlockDistribution();
1022 hdfsBlocksDistribution.add(storeFileBlocksDistribution);
1023 }
1024 }
1025 }
1026 return hdfsBlocksDistribution;
1027 }
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1038 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
1039 Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
1040 return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
1041 }
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052 public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
1053 final HTableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
1054 throws IOException {
1055 HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
1056 FileSystem fs = tablePath.getFileSystem(conf);
1057
1058 HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tablePath, regionInfo);
1059 for (HColumnDescriptor family: tableDescriptor.getFamilies()) {
1060 Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family.getNameAsString());
1061 if (storeFiles == null) continue;
1062
1063 for (StoreFileInfo storeFileInfo : storeFiles) {
1064 try {
1065 hdfsBlocksDistribution.add(storeFileInfo.computeHDFSBlocksDistribution(fs));
1066 } catch (IOException ioe) {
1067 LOG.warn("Error getting hdfs block distribution for " + storeFileInfo);
1068 }
1069 }
1070 }
1071 return hdfsBlocksDistribution;
1072 }
1073
1074
1075
1076
1077
1078
1079 public long addAndGetGlobalMemstoreSize(long memStoreSize) {
1080 if (this.rsAccounting != null) {
1081 rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
1082 }
1083 return this.memstoreSize.addAndGet(memStoreSize);
1084 }
1085
1086 @Override
1087 public HRegionInfo getRegionInfo() {
1088 return this.fs.getRegionInfo();
1089 }
1090
1091
1092
1093
1094
1095 RegionServerServices getRegionServerServices() {
1096 return this.rsServices;
1097 }
1098
1099 @Override
1100 public long getReadRequestsCount() {
1101 return readRequestsCount.get();
1102 }
1103
1104 @Override
1105 public void updateReadRequestsCount(long i) {
1106 readRequestsCount.add(i);
1107 }
1108
1109 @Override
1110 public long getWriteRequestsCount() {
1111 return writeRequestsCount.get();
1112 }
1113
1114 @Override
1115 public void updateWriteRequestsCount(long i) {
1116 writeRequestsCount.add(i);
1117 }
1118
1119 @Override
1120 public long getMemstoreSize() {
1121 return memstoreSize.get();
1122 }
1123
1124 @Override
1125 public long getNumMutationsWithoutWAL() {
1126 return numMutationsWithoutWAL.get();
1127 }
1128
1129 @Override
1130 public long getDataInMemoryWithoutWAL() {
1131 return dataInMemoryWithoutWAL.get();
1132 }
1133
1134 @Override
1135 public long getBlockedRequestsCount() {
1136 return blockedRequestsCount.get();
1137 }
1138
1139 @Override
1140 public long getCheckAndMutateChecksPassed() {
1141 return checkAndMutateChecksPassed.get();
1142 }
1143
1144 @Override
1145 public long getCheckAndMutateChecksFailed() {
1146 return checkAndMutateChecksFailed.get();
1147 }
1148
1149 @Override
1150 public MetricsRegion getMetrics() {
1151 return metricsRegion;
1152 }
1153
1154 @Override
1155 public boolean isClosed() {
1156 return this.closed.get();
1157 }
1158
1159 @Override
1160 public boolean isClosing() {
1161 return this.closing.get();
1162 }
1163
1164 @Override
1165 public boolean isReadOnly() {
1166 return this.writestate.isReadOnly();
1167 }
1168
1169
1170
1171
1172 public void setRecovering(boolean newState) {
1173 boolean wasRecovering = this.recovering;
1174
1175
1176 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
1177 && wasRecovering && !newState) {
1178
1179
1180 boolean forceFlush = getTableDesc().getRegionReplication() > 1;
1181
1182
1183 MonitoredTask status = TaskMonitor.get().createStatus(
1184 "Flushing region " + this + " because recovery is finished");
1185 try {
1186 if (forceFlush) {
1187 internalFlushcache(status);
1188 }
1189
1190 status.setStatus("Writing region open event marker to WAL because recovery is finished");
1191 try {
1192 long seqId = openSeqNum;
1193
1194 if (wal != null) {
1195 seqId = getNextSequenceId(wal);
1196 }
1197 writeRegionOpenMarker(wal, seqId);
1198 } catch (IOException e) {
1199
1200
1201 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to write region opening "
1202 + "event to WAL, continueing", e);
1203 }
1204 } catch (IOException ioe) {
1205
1206
1207 LOG.warn(getRegionInfo().getEncodedName() + " : was not able to flush "
1208 + "event to WAL, continueing", ioe);
1209 } finally {
1210 status.cleanup();
1211 }
1212 }
1213
1214 this.recovering = newState;
1215 if (wasRecovering && !recovering) {
1216
1217 coprocessorHost.postLogReplay();
1218 }
1219 }
1220
1221 @Override
1222 public boolean isRecovering() {
1223 return this.recovering;
1224 }
1225
1226 @Override
1227 public boolean isAvailable() {
1228 return !isClosed() && !isClosing();
1229 }
1230
1231
1232 public boolean isSplittable() {
1233 return isAvailable() && !hasReferences();
1234 }
1235
1236
1237
1238
1239 public boolean isMergeable() {
1240 if (!isAvailable()) {
1241 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1242 + " is not mergeable because it is closing or closed");
1243 return false;
1244 }
1245 if (hasReferences()) {
1246 LOG.debug("Region " + getRegionInfo().getRegionNameAsString()
1247 + " is not mergeable because it has references");
1248 return false;
1249 }
1250
1251 return true;
1252 }
1253
1254 public boolean areWritesEnabled() {
1255 synchronized(this.writestate) {
1256 return this.writestate.writesEnabled;
1257 }
1258 }
1259
1260 public MultiVersionConcurrencyControl getMVCC() {
1261 return mvcc;
1262 }
1263
1264 @Override
1265 public long getMaxFlushedSeqId() {
1266 return maxFlushedSeqId;
1267 }
1268
1269 @Override
1270 public long getReadpoint(IsolationLevel isolationLevel) {
1271 if (isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
1272
1273 return Long.MAX_VALUE;
1274 }
1275 return mvcc.getReadPoint();
1276 }
1277
1278 @Override
1279 public boolean isLoadingCfsOnDemandDefault() {
1280 return this.isLoadingCfsOnDemandDefault;
1281 }
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299 public Map<byte[], List<StoreFile>> close() throws IOException {
1300 return close(false);
1301 }
1302
1303 private final Object closeLock = new Object();
1304
1305
1306 public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL =
1307 "hbase.regionserver.optionalcacheflushinterval";
1308
1309 public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
1310
1311 public static final int SYSTEM_CACHE_FLUSH_INTERVAL = 300000;
1312
1313
1314 public static final String MEMSTORE_FLUSH_PER_CHANGES =
1315 "hbase.regionserver.flush.per.changes";
1316 public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000;
1317
1318
1319
1320
1321 public static final long MAX_FLUSH_PER_CHANGES = 1000000000;
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340 public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException {
1341
1342
1343 MonitoredTask status = TaskMonitor.get().createStatus(
1344 "Closing region " + this +
1345 (abort ? " due to abort" : ""));
1346
1347 status.setStatus("Waiting for close lock");
1348 try {
1349 synchronized (closeLock) {
1350 return doClose(abort, status);
1351 }
1352 } finally {
1353 status.cleanup();
1354 }
1355 }
1356
1357
1358
1359
1360 @VisibleForTesting
1361 public void setClosing(boolean closing) {
1362 this.closing.set(closing);
1363 }
1364
1365 private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
1366 throws IOException {
1367 if (isClosed()) {
1368 LOG.warn("Region " + this + " already closed");
1369 return null;
1370 }
1371
1372 if (coprocessorHost != null) {
1373 status.setStatus("Running coprocessor pre-close hooks");
1374 this.coprocessorHost.preClose(abort);
1375 }
1376
1377 status.setStatus("Disabling compacts and flushes for region");
1378 boolean canFlush = true;
1379 synchronized (writestate) {
1380
1381
1382 canFlush = !writestate.readOnly;
1383 writestate.writesEnabled = false;
1384 LOG.debug("Closing " + this + ": disabling compactions & flushes");
1385 waitForFlushesAndCompactions();
1386 }
1387
1388
1389
1390 if (!abort && worthPreFlushing() && canFlush) {
1391 status.setStatus("Pre-flushing region before close");
1392 LOG.info("Running close preflush of " + getRegionInfo().getRegionNameAsString());
1393 try {
1394 internalFlushcache(status);
1395 } catch (IOException ioe) {
1396
1397 status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
1398 }
1399 }
1400
1401 this.closing.set(true);
1402 status.setStatus("Disabling writes for close");
1403
1404 lock.writeLock().lock();
1405 try {
1406 if (this.isClosed()) {
1407 status.abort("Already got closed by another process");
1408
1409 return null;
1410 }
1411 LOG.debug("Updates disabled for region " + this);
1412
1413 if (!abort && canFlush) {
1414 int flushCount = 0;
1415 while (this.memstoreSize.get() > 0) {
1416 try {
1417 if (flushCount++ > 0) {
1418 int actualFlushes = flushCount - 1;
1419 if (actualFlushes > 5) {
1420
1421
1422 throw new DroppedSnapshotException("Failed clearing memory after " +
1423 actualFlushes + " attempts on region: " +
1424 Bytes.toStringBinary(getRegionInfo().getRegionName()));
1425 }
1426 LOG.info("Running extra flush, " + actualFlushes +
1427 " (carrying snapshot?) " + this);
1428 }
1429 internalFlushcache(status);
1430 } catch (IOException ioe) {
1431 status.setStatus("Failed flush " + this + ", putting online again");
1432 synchronized (writestate) {
1433 writestate.writesEnabled = true;
1434 }
1435
1436 throw ioe;
1437 }
1438 }
1439 }
1440
1441 Map<byte[], List<StoreFile>> result =
1442 new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
1443 if (!stores.isEmpty()) {
1444
1445 ThreadPoolExecutor storeCloserThreadPool =
1446 getStoreOpenAndCloseThreadPool("StoreCloserThread-" +
1447 getRegionInfo().getRegionNameAsString());
1448 CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
1449 new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
1450
1451
1452 for (final Store store : stores.values()) {
1453 long flushableSize = store.getFlushableSize();
1454 if (!(abort || flushableSize == 0 || writestate.readOnly)) {
1455 if (getRegionServerServices() != null) {
1456 getRegionServerServices().abort("Assertion failed while closing store "
1457 + getRegionInfo().getRegionNameAsString() + " " + store
1458 + ". flushableSize expected=0, actual= " + flushableSize
1459 + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
1460 + "operation failed and left the memstore in a partially updated state.", null);
1461 }
1462 }
1463 completionService
1464 .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
1465 @Override
1466 public Pair<byte[], Collection<StoreFile>> call() throws IOException {
1467 return new Pair<byte[], Collection<StoreFile>>(
1468 store.getFamily().getName(), store.close());
1469 }
1470 });
1471 }
1472 try {
1473 for (int i = 0; i < stores.size(); i++) {
1474 Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
1475 Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
1476 List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
1477 if (familyFiles == null) {
1478 familyFiles = new ArrayList<StoreFile>();
1479 result.put(storeFiles.getFirst(), familyFiles);
1480 }
1481 familyFiles.addAll(storeFiles.getSecond());
1482 }
1483 } catch (InterruptedException e) {
1484 throw (InterruptedIOException)new InterruptedIOException().initCause(e);
1485 } catch (ExecutionException e) {
1486 throw new IOException(e.getCause());
1487 } finally {
1488 storeCloserThreadPool.shutdownNow();
1489 }
1490 }
1491
1492 status.setStatus("Writing region close event to WAL");
1493 if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) {
1494 writeRegionCloseMarker(wal);
1495 }
1496
1497 this.closed.set(true);
1498 if (!canFlush) {
1499 addAndGetGlobalMemstoreSize(-memstoreSize.get());
1500 } else if (memstoreSize.get() != 0) {
1501 LOG.error("Memstore size is " + memstoreSize.get());
1502 }
1503 if (coprocessorHost != null) {
1504 status.setStatus("Running coprocessor post-close hooks");
1505 this.coprocessorHost.postClose(abort);
1506 }
1507 if (this.metricsRegion != null) {
1508 this.metricsRegion.close();
1509 }
1510 if (this.metricsRegionWrapper != null) {
1511 Closeables.closeQuietly(this.metricsRegionWrapper);
1512 }
1513 status.markComplete("Closed");
1514 LOG.info("Closed " + this);
1515 return result;
1516 } finally {
1517 lock.writeLock().unlock();
1518 }
1519 }
1520
1521 @Override
1522 public void waitForFlushesAndCompactions() {
1523 synchronized (writestate) {
1524 if (this.writestate.readOnly) {
1525
1526
1527 return;
1528 }
1529 boolean interrupted = false;
1530 try {
1531 while (writestate.compacting > 0 || writestate.flushing) {
1532 LOG.debug("waiting for " + writestate.compacting + " compactions"
1533 + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
1534 try {
1535 writestate.wait();
1536 } catch (InterruptedException iex) {
1537
1538 LOG.warn("Interrupted while waiting");
1539 interrupted = true;
1540 }
1541 }
1542 } finally {
1543 if (interrupted) {
1544 Thread.currentThread().interrupt();
1545 }
1546 }
1547 }
1548 }
1549
1550 protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool(
1551 final String threadNamePrefix) {
1552 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1553 int maxThreads = Math.min(numStores,
1554 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1555 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX));
1556 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1557 }
1558
1559 protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool(
1560 final String threadNamePrefix) {
1561 int numStores = Math.max(1, this.htableDescriptor.getFamilies().size());
1562 int maxThreads = Math.max(1,
1563 conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
1564 HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)
1565 / numStores);
1566 return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix);
1567 }
1568
1569 static ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads,
1570 final String threadNamePrefix) {
1571 return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
1572 new ThreadFactory() {
1573 private int count = 1;
1574
1575 @Override
1576 public Thread newThread(Runnable r) {
1577 return new Thread(r, threadNamePrefix + "-" + count++);
1578 }
1579 });
1580 }
1581
1582
1583
1584
1585 private boolean worthPreFlushing() {
1586 return this.memstoreSize.get() >
1587 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
1588 }
1589
1590
1591
1592
1593
1594 @Override
1595 public HTableDescriptor getTableDesc() {
1596 return this.htableDescriptor;
1597 }
1598
1599
1600 public WAL getWAL() {
1601 return this.wal;
1602 }
1603
1604
1605
1606
1607 public RegionSplitPolicy getSplitPolicy() {
1608 return this.splitPolicy;
1609 }
1610
1611
1612
1613
1614
1615
1616
1617
1618 Configuration getBaseConf() {
1619 return this.baseConf;
1620 }
1621
1622
1623 public FileSystem getFilesystem() {
1624 return fs.getFileSystem();
1625 }
1626
1627
1628 public HRegionFileSystem getRegionFileSystem() {
1629 return this.fs;
1630 }
1631
1632 @Override
1633 public long getEarliestFlushTimeForAllStores() {
1634 return lastStoreFlushTimeMap.isEmpty() ? Long.MAX_VALUE : Collections.min(lastStoreFlushTimeMap
1635 .values());
1636 }
1637
1638 @Override
1639 public long getOldestHfileTs(boolean majorCompactioOnly) throws IOException {
1640 long result = Long.MAX_VALUE;
1641 for (Store store : getStores()) {
1642 for (StoreFile file : store.getStorefiles()) {
1643 HFile.Reader reader = file.getReader().getHFileReader();
1644 if (majorCompactioOnly) {
1645 byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
1646 if (val == null || !Bytes.toBoolean(val)) {
1647 continue;
1648 }
1649 }
1650 result = Math.min(result, reader.getFileContext().getFileCreateTime());
1651 }
1652 }
1653 return result == Long.MAX_VALUE ? 0 : result;
1654 }
1655
1656 RegionLoad.Builder setCompleteSequenceId(RegionLoad.Builder regionLoadBldr) {
1657 long lastFlushOpSeqIdLocal = this.lastFlushOpSeqId;
1658 byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
1659 regionLoadBldr.clearStoreCompleteSequenceId();
1660 for (byte[] familyName : this.stores.keySet()) {
1661 long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
1662
1663
1664
1665 long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1;
1666 regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.
1667 newBuilder().setFamilyName(ByteString.copyFrom(familyName)).setSequenceId(csid).build());
1668 }
1669 return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
1670 }
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680 public long getLargestHStoreSize() {
1681 long size = 0;
1682 for (Store h : stores.values()) {
1683 long storeSize = h.getSize();
1684 if (storeSize > size) {
1685 size = storeSize;
1686 }
1687 }
1688 return size;
1689 }
1690
1691
1692
1693
1694
1695 protected void doRegionCompactionPrep() throws IOException {
1696 }
1697
1698 @Override
1699 public void triggerMajorCompaction() throws IOException {
1700 for (Store s : getStores()) {
1701 s.triggerMajorCompaction();
1702 }
1703 }
1704
1705 @Override
1706 public void compact(final boolean majorCompaction) throws IOException {
1707 if (majorCompaction) {
1708 triggerMajorCompaction();
1709 }
1710 for (Store s : getStores()) {
1711 CompactionContext compaction = s.requestCompaction();
1712 if (compaction != null) {
1713 CompactionThroughputController controller = null;
1714 if (rsServices != null) {
1715 controller = CompactionThroughputControllerFactory.create(rsServices, conf);
1716 }
1717 if (controller == null) {
1718 controller = NoLimitCompactionThroughputController.INSTANCE;
1719 }
1720 compact(compaction, s, controller, null);
1721 }
1722 }
1723 }
1724
1725
1726
1727
1728
1729
1730
1731 public void compactStores() throws IOException {
1732 for (Store s : getStores()) {
1733 CompactionContext compaction = s.requestCompaction();
1734 if (compaction != null) {
1735 compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
1736 }
1737 }
1738 }
1739
1740
1741
1742
1743
1744
1745
1746 @VisibleForTesting
1747 void compactStore(byte[] family, CompactionThroughputController throughputController)
1748 throws IOException {
1749 Store s = getStore(family);
1750 CompactionContext compaction = s.requestCompaction();
1751 if (compaction != null) {
1752 compact(compaction, s, throughputController, null);
1753 }
1754 }
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771 public boolean compact(CompactionContext compaction, Store store,
1772 CompactionThroughputController throughputController) throws IOException {
1773 return compact(compaction, store, throughputController, null);
1774 }
1775
1776 public boolean compact(CompactionContext compaction, Store store,
1777 CompactionThroughputController throughputController, User user) throws IOException {
1778 assert compaction != null && compaction.hasSelection();
1779 assert !compaction.getRequest().getFiles().isEmpty();
1780 if (this.closing.get() || this.closed.get()) {
1781 LOG.debug("Skipping compaction on " + this + " because closing/closed");
1782 store.cancelRequestedCompaction(compaction);
1783 return false;
1784 }
1785 MonitoredTask status = null;
1786 boolean requestNeedsCancellation = true;
1787
1788 lock.readLock().lock();
1789 try {
1790 byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
1791 if (stores.get(cf) != store) {
1792 LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this
1793 + " has been re-instantiated, cancel this compaction request. "
1794 + " It may be caused by the roll back of split transaction");
1795 return false;
1796 }
1797
1798 status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
1799 if (this.closed.get()) {
1800 String msg = "Skipping compaction on " + this + " because closed";
1801 LOG.debug(msg);
1802 status.abort(msg);
1803 return false;
1804 }
1805 boolean wasStateSet = false;
1806 try {
1807 synchronized (writestate) {
1808 if (writestate.writesEnabled) {
1809 wasStateSet = true;
1810 ++writestate.compacting;
1811 } else {
1812 String msg = "NOT compacting region " + this + ". Writes disabled.";
1813 LOG.info(msg);
1814 status.abort(msg);
1815 return false;
1816 }
1817 }
1818 LOG.info("Starting compaction on " + store + " in region " + this
1819 + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
1820 doRegionCompactionPrep();
1821 try {
1822 status.setStatus("Compacting store " + store);
1823
1824
1825 requestNeedsCancellation = false;
1826 store.compact(compaction, throughputController, user);
1827 } catch (InterruptedIOException iioe) {
1828 String msg = "compaction interrupted";
1829 LOG.info(msg, iioe);
1830 status.abort(msg);
1831 return false;
1832 }
1833 } finally {
1834 if (wasStateSet) {
1835 synchronized (writestate) {
1836 --writestate.compacting;
1837 if (writestate.compacting <= 0) {
1838 writestate.notifyAll();
1839 }
1840 }
1841 }
1842 }
1843 status.markComplete("Compaction complete");
1844 return true;
1845 } finally {
1846 try {
1847 if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
1848 if (status != null) status.cleanup();
1849 } finally {
1850 lock.readLock().unlock();
1851 }
1852 }
1853 }
1854
1855 @Override
1856 public FlushResult flush(boolean force) throws IOException {
1857 return flushcache(force, false);
1858 }
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882 public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
1883 throws IOException {
1884
1885 if (this.closing.get()) {
1886 String msg = "Skipping flush on " + this + " because closing";
1887 LOG.debug(msg);
1888 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1889 }
1890 MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
1891 status.setStatus("Acquiring readlock on region");
1892
1893 lock.readLock().lock();
1894 try {
1895 if (this.closed.get()) {
1896 String msg = "Skipping flush on " + this + " because closed";
1897 LOG.debug(msg);
1898 status.abort(msg);
1899 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1900 }
1901 if (coprocessorHost != null) {
1902 status.setStatus("Running coprocessor pre-flush hooks");
1903 coprocessorHost.preFlush();
1904 }
1905
1906
1907 if (numMutationsWithoutWAL.get() > 0) {
1908 numMutationsWithoutWAL.set(0);
1909 dataInMemoryWithoutWAL.set(0);
1910 }
1911 synchronized (writestate) {
1912 if (!writestate.flushing && writestate.writesEnabled) {
1913 this.writestate.flushing = true;
1914 } else {
1915 if (LOG.isDebugEnabled()) {
1916 LOG.debug("NOT flushing memstore for region " + this
1917 + ", flushing=" + writestate.flushing + ", writesEnabled="
1918 + writestate.writesEnabled);
1919 }
1920 String msg = "Not flushing since "
1921 + (writestate.flushing ? "already flushing"
1922 : "writes not enabled");
1923 status.abort(msg);
1924 return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false);
1925 }
1926 }
1927
1928 try {
1929 Collection<Store> specificStoresToFlush =
1930 forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
1931 FlushResult fs = internalFlushcache(specificStoresToFlush,
1932 status, writeFlushRequestWalMarker);
1933
1934 if (coprocessorHost != null) {
1935 status.setStatus("Running post-flush coprocessor hooks");
1936 coprocessorHost.postFlush();
1937 }
1938
1939 status.markComplete("Flush successful");
1940 return fs;
1941 } finally {
1942 synchronized (writestate) {
1943 writestate.flushing = false;
1944 this.writestate.flushRequested = false;
1945 writestate.notifyAll();
1946 }
1947 }
1948 } finally {
1949 lock.readLock().unlock();
1950 status.cleanup();
1951 }
1952 }
1953
1954
1955
1956
1957
1958
1959
1960
1961 boolean shouldFlushStore(Store store) {
1962 long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
1963 store.getFamily().getName()) - 1;
1964 if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
1965 if (LOG.isDebugEnabled()) {
1966 LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
1967 getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
1968 " is > " + this.flushPerChanges + " from current=" + mvcc.getReadPoint());
1969 }
1970 return true;
1971 }
1972 if (this.flushCheckInterval <= 0) {
1973 return false;
1974 }
1975 long now = EnvironmentEdgeManager.currentTime();
1976 if (store.timeOfOldestEdit() < now - this.flushCheckInterval) {
1977 if (LOG.isDebugEnabled()) {
1978 LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " +
1979 getRegionInfo().getEncodedName() + " because time of oldest edit=" +
1980 store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now);
1981 }
1982 return true;
1983 }
1984 return false;
1985 }
1986
1987
1988
1989
1990 boolean shouldFlush(final StringBuffer whyFlush) {
1991 whyFlush.setLength(0);
1992
1993 if (this.maxFlushedSeqId > 0
1994 && (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) {
1995 whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush");
1996 return true;
1997 }
1998 long modifiedFlushCheckInterval = flushCheckInterval;
1999 if (getRegionInfo().isSystemTable() &&
2000 getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
2001 modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL;
2002 }
2003 if (modifiedFlushCheckInterval <= 0) {
2004 return false;
2005 }
2006 long now = EnvironmentEdgeManager.currentTime();
2007
2008 if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
2009 return false;
2010 }
2011
2012
2013 for (Store s : getStores()) {
2014 if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
2015
2016 whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
2017 return true;
2018 }
2019 }
2020 return false;
2021 }
2022
2023
2024
2025
2026
2027
2028 private FlushResult internalFlushcache(MonitoredTask status)
2029 throws IOException {
2030 return internalFlushcache(stores.values(), status, false);
2031 }
2032
2033
2034
2035
2036
2037
2038 private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
2039 MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
2040 return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
2041 status, writeFlushWalMarker);
2042 }
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072 protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
2073 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2074 throws IOException {
2075 PrepareFlushResult result
2076 = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
2077 if (result.result == null) {
2078 return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
2079 } else {
2080 return result.result;
2081 }
2082 }
2083
2084 protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
2085 final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
2086 throws IOException {
2087 if (this.rsServices != null && this.rsServices.isAborted()) {
2088
2089 throw new IOException("Aborting flush because server is aborted...");
2090 }
2091 final long startTime = EnvironmentEdgeManager.currentTime();
2092
2093 if (this.memstoreSize.get() <= 0) {
2094
2095
2096 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
2097 this.updatesLock.writeLock().lock();
2098 try {
2099 if (this.memstoreSize.get() <= 0) {
2100
2101
2102
2103
2104
2105
2106 if (wal != null) {
2107 writeEntry = mvcc.begin();
2108 long flushOpSeqId = writeEntry.getWriteNumber();
2109 FlushResult flushResult = new FlushResultImpl(
2110 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2111 flushOpSeqId,
2112 "Nothing to flush",
2113 writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
2114
2115
2116 mvcc.completeAndWait(writeEntry);
2117 writeEntry = null;
2118 return new PrepareFlushResult(flushResult, myseqid);
2119 } else {
2120 return new PrepareFlushResult(
2121 new FlushResultImpl(
2122 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
2123 "Nothing to flush",
2124 false),
2125 myseqid);
2126 }
2127 }
2128 } finally {
2129 this.updatesLock.writeLock().unlock();
2130 if (writeEntry != null) {
2131 mvcc.complete(writeEntry);
2132 }
2133 }
2134 }
2135
2136 if (LOG.isInfoEnabled()) {
2137
2138 StringBuilder perCfExtras = null;
2139 if (!isAllFamilies(storesToFlush)) {
2140 perCfExtras = new StringBuilder();
2141 for (Store store: storesToFlush) {
2142 perCfExtras.append("; ").append(store.getColumnFamilyName());
2143 perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize()));
2144 }
2145 }
2146 LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
2147 " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
2148 ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
2149 ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
2150 }
2151
2152
2153
2154
2155
2156
2157
2158 status.setStatus("Obtaining lock to block concurrent updates");
2159
2160 this.updatesLock.writeLock().lock();
2161 status.setStatus("Preparing to flush by snapshotting stores in " +
2162 getRegionInfo().getEncodedName());
2163 long totalFlushableSizeOfFlushableStores = 0;
2164
2165 Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
2166 for (Store store: storesToFlush) {
2167 flushedFamilyNames.add(store.getFamily().getName());
2168 }
2169
2170 TreeMap<byte[], StoreFlushContext> storeFlushCtxs
2171 = new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
2172 TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
2173 Bytes.BYTES_COMPARATOR);
2174 TreeMap<byte[], Long> storeFlushableSize
2175 = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2176
2177
2178
2179 long flushOpSeqId = HConstants.NO_SEQNUM;
2180
2181
2182 long flushedSeqId = HConstants.NO_SEQNUM;
2183 byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
2184
2185 long trxId = 0;
2186 MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin();
2187 try {
2188 try {
2189 if (wal != null) {
2190 Long earliestUnflushedSequenceIdForTheRegion =
2191 wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
2192 if (earliestUnflushedSequenceIdForTheRegion == null) {
2193
2194 String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
2195 status.setStatus(msg);
2196 return new PrepareFlushResult(
2197 new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
2198 myseqid);
2199 }
2200 flushOpSeqId = getNextSequenceId(wal);
2201
2202 flushedSeqId =
2203 earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
2204 flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
2205 } else {
2206
2207 flushedSeqId = flushOpSeqId = myseqid;
2208 }
2209
2210 for (Store s : storesToFlush) {
2211 totalFlushableSizeOfFlushableStores += s.getFlushableSize();
2212 storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
2213 committedFiles.put(s.getFamily().getName(), null);
2214 storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
2215 }
2216
2217
2218 if (wal != null && !writestate.readOnly) {
2219 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
2220 getRegionInfo(), flushOpSeqId, committedFiles);
2221
2222 trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2223 desc, false, mvcc);
2224 }
2225
2226
2227 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2228 flush.prepare();
2229 }
2230 } catch (IOException ex) {
2231 if (wal != null) {
2232 if (trxId > 0) {
2233 try {
2234 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2235 getRegionInfo(), flushOpSeqId, committedFiles);
2236 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2237 desc, false, mvcc);
2238 } catch (Throwable t) {
2239 LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
2240 StringUtils.stringifyException(t));
2241
2242 }
2243 }
2244
2245 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2246 throw ex;
2247 }
2248 } finally {
2249 this.updatesLock.writeLock().unlock();
2250 }
2251 String s = "Finished memstore snapshotting " + this +
2252 ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores;
2253 status.setStatus(s);
2254 if (LOG.isTraceEnabled()) LOG.trace(s);
2255
2256
2257 if (wal != null) {
2258 try {
2259 wal.sync();
2260 } catch (IOException ioe) {
2261 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2262 throw ioe;
2263 }
2264 }
2265
2266
2267
2268
2269
2270
2271 mvcc.completeAndWait(writeEntry);
2272
2273
2274 writeEntry = null;
2275 } finally {
2276 if (writeEntry != null) {
2277
2278 mvcc.complete(writeEntry);
2279 }
2280 }
2281 return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
2282 flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores);
2283 }
2284
2285
2286
2287
2288
2289 private boolean isAllFamilies(final Collection<Store> families) {
2290 return families == null || this.stores.size() == families.size();
2291 }
2292
2293
2294
2295
2296
2297
2298
2299 private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
2300 if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
2301 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
2302 getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
2303 try {
2304 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2305 desc, true, mvcc);
2306 return true;
2307 } catch (IOException e) {
2308 LOG.warn(getRegionInfo().getEncodedName() + " : "
2309 + "Received exception while trying to write the flush request to wal", e);
2310 }
2311 }
2312 return false;
2313 }
2314
2315 protected FlushResult internalFlushCacheAndCommit(
2316 final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
2317 final Collection<Store> storesToFlush)
2318 throws IOException {
2319
2320
2321 TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
2322 TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
2323 long startTime = prepareResult.startTime;
2324 long flushOpSeqId = prepareResult.flushOpSeqId;
2325 long flushedSeqId = prepareResult.flushedSeqId;
2326 long totalFlushableSizeOfFlushableStores = prepareResult.totalFlushableSize;
2327
2328 String s = "Flushing stores of " + this;
2329 status.setStatus(s);
2330 if (LOG.isTraceEnabled()) LOG.trace(s);
2331
2332
2333
2334
2335
2336 boolean compactionRequested = false;
2337 try {
2338
2339
2340
2341
2342
2343 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2344 flush.flushCache(status);
2345 }
2346
2347
2348
2349 Iterator<Store> it = storesToFlush.iterator();
2350
2351 for (StoreFlushContext flush : storeFlushCtxs.values()) {
2352 boolean needsCompaction = flush.commit(status);
2353 if (needsCompaction) {
2354 compactionRequested = true;
2355 }
2356 byte[] storeName = it.next().getFamily().getName();
2357 List<Path> storeCommittedFiles = flush.getCommittedFiles();
2358 committedFiles.put(storeName, storeCommittedFiles);
2359
2360 if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
2361 totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
2362 }
2363 }
2364 storeFlushCtxs.clear();
2365
2366
2367 this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores);
2368
2369 if (wal != null) {
2370
2371 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
2372 getRegionInfo(), flushOpSeqId, committedFiles);
2373 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2374 desc, true, mvcc);
2375 }
2376 } catch (Throwable t) {
2377
2378
2379
2380
2381
2382
2383 if (wal != null) {
2384 try {
2385 FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
2386 getRegionInfo(), flushOpSeqId, committedFiles);
2387 WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(),
2388 desc, false, mvcc);
2389 } catch (Throwable ex) {
2390 LOG.warn(getRegionInfo().getEncodedName() + " : "
2391 + "failed writing ABORT_FLUSH marker to WAL", ex);
2392
2393 }
2394 wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2395 }
2396 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
2397 Bytes.toStringBinary(getRegionInfo().getRegionName()));
2398 dse.initCause(t);
2399 status.abort("Flush failed: " + StringUtils.stringifyException(t));
2400
2401
2402
2403
2404
2405 this.closing.set(true);
2406
2407 if (rsServices != null) {
2408
2409 rsServices.abort("Replay of WAL required. Forcing server shutdown", dse);
2410 }
2411
2412 throw dse;
2413 }
2414
2415
2416 if (wal != null) {
2417 wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2418 }
2419
2420
2421 for (Store store: storesToFlush) {
2422 this.lastStoreFlushTimeMap.put(store, startTime);
2423 }
2424
2425 this.maxFlushedSeqId = flushedSeqId;
2426 this.lastFlushOpSeqId = flushOpSeqId;
2427
2428
2429
2430 synchronized (this) {
2431 notifyAll();
2432 }
2433
2434 long time = EnvironmentEdgeManager.currentTime() - startTime;
2435 long memstoresize = this.memstoreSize.get();
2436 String msg = "Finished memstore flush of ~"
2437 + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/"
2438 + totalFlushableSizeOfFlushableStores + ", currentsize="
2439 + StringUtils.byteDesc(memstoresize) + "/" + memstoresize
2440 + " for region " + this + " in " + time + "ms, sequenceid="
2441 + flushOpSeqId + ", compaction requested=" + compactionRequested
2442 + ((wal == null) ? "; wal=null" : "");
2443 LOG.info(msg);
2444 status.setStatus(msg);
2445
2446 return new FlushResultImpl(compactionRequested ?
2447 FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
2448 FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
2449 flushOpSeqId);
2450 }
2451
2452
2453
2454
2455
2456
2457 @VisibleForTesting
2458 protected long getNextSequenceId(final WAL wal) throws IOException {
2459
2460
2461
2462
2463
2464
2465 WALKey key = this.appendEmptyEdit(wal);
2466 mvcc.complete(key.getWriteEntry());
2467 return key.getSequenceId(this.maxWaitForSeqId);
2468 }
2469
2470
2471
2472
2473
2474 @Override
2475 public RegionScanner getScanner(Scan scan) throws IOException {
2476 return getScanner(scan, true);
2477 }
2478
2479 public RegionScanner getScanner(Scan scan, boolean copyCellsFromSharedMem) throws IOException {
2480 RegionScanner scanner = getScanner(scan, null, copyCellsFromSharedMem);
2481 return scanner;
2482 }
2483
2484 protected RegionScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners,
2485 boolean copyCellsFromSharedMem) throws IOException {
2486 startRegionOperation(Operation.SCAN);
2487 try {
2488
2489 if (!scan.hasFamilies()) {
2490
2491 for (byte[] family : this.htableDescriptor.getFamiliesKeys()) {
2492 scan.addFamily(family);
2493 }
2494 } else {
2495 for (byte[] family : scan.getFamilyMap().keySet()) {
2496 checkFamily(family);
2497 }
2498 }
2499 return instantiateRegionScanner(scan, additionalScanners, copyCellsFromSharedMem);
2500 } finally {
2501 closeRegionOperation(Operation.SCAN);
2502 }
2503 }
2504
2505 protected RegionScanner instantiateRegionScanner(Scan scan,
2506 List<KeyValueScanner> additionalScanners, boolean copyCellsFromSharedMem) throws IOException {
2507 if (scan.isReversed()) {
2508 if (scan.getFilter() != null) {
2509 scan.getFilter().setReversed(true);
2510 }
2511 return new ReversedRegionScannerImpl(scan, additionalScanners, this, copyCellsFromSharedMem);
2512 }
2513 return new RegionScannerImpl(scan, additionalScanners, this, copyCellsFromSharedMem);
2514 }
2515
2516 @Override
2517 public void prepareDelete(Delete delete) throws IOException {
2518
2519 if(delete.getFamilyCellMap().isEmpty()){
2520 for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
2521
2522 delete.addFamily(family, delete.getTimeStamp());
2523 }
2524 } else {
2525 for(byte [] family : delete.getFamilyCellMap().keySet()) {
2526 if(family == null) {
2527 throw new NoSuchColumnFamilyException("Empty family is invalid");
2528 }
2529 checkFamily(family);
2530 }
2531 }
2532 }
2533
2534 @Override
2535 public void delete(Delete delete) throws IOException {
2536 checkReadOnly();
2537 checkResources();
2538 startRegionOperation(Operation.DELETE);
2539 try {
2540 delete.getRow();
2541
2542 doBatchMutate(delete);
2543 } finally {
2544 closeRegionOperation(Operation.DELETE);
2545 }
2546 }
2547
2548
2549
2550
2551 private static final byte [] FOR_UNIT_TESTS_ONLY = Bytes.toBytes("ForUnitTestsOnly");
2552
2553
2554
2555
2556
2557
2558 void delete(NavigableMap<byte[], List<Cell>> familyMap,
2559 Durability durability) throws IOException {
2560 Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
2561 delete.setFamilyCellMap(familyMap);
2562 delete.setDurability(durability);
2563 doBatchMutate(delete);
2564 }
2565
2566 @Override
2567 public void prepareDeleteTimestamps(Mutation mutation, Map<byte[], List<Cell>> familyMap,
2568 byte[] byteNow) throws IOException {
2569 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
2570
2571 byte[] family = e.getKey();
2572 List<Cell> cells = e.getValue();
2573 assert cells instanceof RandomAccess;
2574
2575 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
2576 int listSize = cells.size();
2577 for (int i=0; i < listSize; i++) {
2578 Cell cell = cells.get(i);
2579
2580
2581 if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP && CellUtil.isDeleteType(cell)) {
2582 byte[] qual = CellUtil.cloneQualifier(cell);
2583 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
2584
2585 Integer count = kvCount.get(qual);
2586 if (count == null) {
2587 kvCount.put(qual, 1);
2588 } else {
2589 kvCount.put(qual, count + 1);
2590 }
2591 count = kvCount.get(qual);
2592
2593 Get get = new Get(CellUtil.cloneRow(cell));
2594 get.setMaxVersions(count);
2595 get.addColumn(family, qual);
2596 if (coprocessorHost != null) {
2597 if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, cell,
2598 byteNow, get)) {
2599 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2600 }
2601 } else {
2602 updateDeleteLatestVersionTimeStamp(cell, get, count, byteNow);
2603 }
2604 } else {
2605 CellUtil.updateLatestStamp(cell, byteNow, 0);
2606 }
2607 }
2608 }
2609 }
2610
2611 void updateDeleteLatestVersionTimeStamp(Cell cell, Get get, int count, byte[] byteNow)
2612 throws IOException {
2613 List<Cell> result = get(get, false);
2614
2615 if (result.size() < count) {
2616
2617 CellUtil.updateLatestStamp(cell, byteNow, 0);
2618 return;
2619 }
2620 if (result.size() > count) {
2621 throw new RuntimeException("Unexpected size: " + result.size());
2622 }
2623 Cell getCell = result.get(count - 1);
2624 CellUtil.setTimestamp(cell, getCell.getTimestamp());
2625 }
2626
2627 @Override
2628 public void put(Put put) throws IOException {
2629 checkReadOnly();
2630
2631
2632
2633
2634
2635 checkResources();
2636 startRegionOperation(Operation.PUT);
2637 try {
2638
2639 doBatchMutate(put);
2640 } finally {
2641 closeRegionOperation(Operation.PUT);
2642 }
2643 }
2644
2645
2646
2647
2648
2649
2650 private abstract static class BatchOperationInProgress<T> {
2651 T[] operations;
2652 int nextIndexToProcess = 0;
2653 OperationStatus[] retCodeDetails;
2654 WALEdit[] walEditsFromCoprocessors;
2655
2656 public BatchOperationInProgress(T[] operations) {
2657 this.operations = operations;
2658 this.retCodeDetails = new OperationStatus[operations.length];
2659 this.walEditsFromCoprocessors = new WALEdit[operations.length];
2660 Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
2661 }
2662
2663 public abstract Mutation getMutation(int index);
2664 public abstract long getNonceGroup(int index);
2665 public abstract long getNonce(int index);
2666
2667 public abstract Mutation[] getMutationsForCoprocs();
2668 public abstract boolean isInReplay();
2669 public abstract long getReplaySequenceId();
2670
2671 public boolean isDone() {
2672 return nextIndexToProcess == operations.length;
2673 }
2674 }
2675
2676 private static class MutationBatch extends BatchOperationInProgress<Mutation> {
2677 private long nonceGroup;
2678 private long nonce;
2679 public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) {
2680 super(operations);
2681 this.nonceGroup = nonceGroup;
2682 this.nonce = nonce;
2683 }
2684
2685 @Override
2686 public Mutation getMutation(int index) {
2687 return this.operations[index];
2688 }
2689
2690 @Override
2691 public long getNonceGroup(int index) {
2692 return nonceGroup;
2693 }
2694
2695 @Override
2696 public long getNonce(int index) {
2697 return nonce;
2698 }
2699
2700 @Override
2701 public Mutation[] getMutationsForCoprocs() {
2702 return this.operations;
2703 }
2704
2705 @Override
2706 public boolean isInReplay() {
2707 return false;
2708 }
2709
2710 @Override
2711 public long getReplaySequenceId() {
2712 return 0;
2713 }
2714 }
2715
2716 private static class ReplayBatch extends BatchOperationInProgress<MutationReplay> {
2717 private long replaySeqId = 0;
2718 public ReplayBatch(MutationReplay[] operations, long seqId) {
2719 super(operations);
2720 this.replaySeqId = seqId;
2721 }
2722
2723 @Override
2724 public Mutation getMutation(int index) {
2725 return this.operations[index].mutation;
2726 }
2727
2728 @Override
2729 public long getNonceGroup(int index) {
2730 return this.operations[index].nonceGroup;
2731 }
2732
2733 @Override
2734 public long getNonce(int index) {
2735 return this.operations[index].nonce;
2736 }
2737
2738 @Override
2739 public Mutation[] getMutationsForCoprocs() {
2740 assert false;
2741 throw new RuntimeException("Should not be called for replay batch");
2742 }
2743
2744 @Override
2745 public boolean isInReplay() {
2746 return true;
2747 }
2748
2749 @Override
2750 public long getReplaySequenceId() {
2751 return this.replaySeqId;
2752 }
2753 }
2754
2755 @Override
2756 public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
2757 throws IOException {
2758
2759
2760
2761
2762 return batchMutate(new MutationBatch(mutations, nonceGroup, nonce));
2763 }
2764
2765 public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
2766 return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
2767 }
2768
2769 @Override
2770 public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
2771 throws IOException {
2772 if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
2773 && replaySeqId < lastReplayedOpenRegionSeqId) {
2774
2775
2776 if (LOG.isTraceEnabled()) {
2777 LOG.trace(getRegionInfo().getEncodedName() + " : "
2778 + "Skipping " + mutations.length + " mutations with replaySeqId=" + replaySeqId
2779 + " which is < than lastReplayedOpenRegionSeqId=" + lastReplayedOpenRegionSeqId);
2780 for (MutationReplay mut : mutations) {
2781 LOG.trace(getRegionInfo().getEncodedName() + " : Skipping : " + mut.mutation);
2782 }
2783 }
2784
2785 OperationStatus[] statuses = new OperationStatus[mutations.length];
2786 for (int i = 0; i < statuses.length; i++) {
2787 statuses[i] = OperationStatus.SUCCESS;
2788 }
2789 return statuses;
2790 }
2791 return batchMutate(new ReplayBatch(mutations, replaySeqId));
2792 }
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802 OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
2803 boolean initialized = false;
2804 Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
2805 startRegionOperation(op);
2806 try {
2807 while (!batchOp.isDone()) {
2808 if (!batchOp.isInReplay()) {
2809 checkReadOnly();
2810 }
2811 checkResources();
2812
2813 if (!initialized) {
2814 this.writeRequestsCount.add(batchOp.operations.length);
2815 if (!batchOp.isInReplay()) {
2816 doPreMutationHook(batchOp);
2817 }
2818 initialized = true;
2819 }
2820 long addedSize = doMiniBatchMutation(batchOp);
2821 long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
2822 if (isFlushSize(newSize)) {
2823 requestFlush();
2824 }
2825 }
2826 } finally {
2827 closeRegionOperation(op);
2828 }
2829 return batchOp.retCodeDetails;
2830 }
2831
2832
2833 private void doPreMutationHook(BatchOperationInProgress<?> batchOp)
2834 throws IOException {
2835
2836 WALEdit walEdit = new WALEdit();
2837 if (coprocessorHost != null) {
2838 for (int i = 0 ; i < batchOp.operations.length; i++) {
2839 Mutation m = batchOp.getMutation(i);
2840 if (m instanceof Put) {
2841 if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
2842
2843
2844 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2845 }
2846 } else if (m instanceof Delete) {
2847 Delete curDel = (Delete) m;
2848 if (curDel.getFamilyCellMap().isEmpty()) {
2849
2850 prepareDelete(curDel);
2851 }
2852 if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
2853
2854
2855 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
2856 }
2857 } else {
2858
2859
2860
2861 batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
2862 "Put/Delete mutations only supported in batchMutate() now");
2863 }
2864 if (!walEdit.isEmpty()) {
2865 batchOp.walEditsFromCoprocessors[i] = walEdit;
2866 walEdit = new WALEdit();
2867 }
2868 }
2869 }
2870 }
2871
2872 @SuppressWarnings("unchecked")
2873 private long doMiniBatchMutation(BatchOperationInProgress<?> batchOp) throws IOException {
2874 boolean isInReplay = batchOp.isInReplay();
2875
2876 boolean putsCfSetConsistent = true;
2877
2878 Set<byte[]> putsCfSet = null;
2879
2880 boolean deletesCfSetConsistent = true;
2881
2882 Set<byte[]> deletesCfSet = null;
2883
2884 long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE;
2885 WALEdit walEdit = new WALEdit(isInReplay);
2886 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
2887 long txid = 0;
2888 boolean doRollBackMemstore = false;
2889 boolean locked = false;
2890
2891
2892 List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
2893
2894 Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
2895
2896 int firstIndex = batchOp.nextIndexToProcess;
2897 int lastIndexExclusive = firstIndex;
2898 boolean success = false;
2899 int noOfPuts = 0, noOfDeletes = 0;
2900 WALKey walKey = null;
2901 long mvccNum = 0;
2902 try {
2903
2904
2905
2906
2907 int numReadyToWrite = 0;
2908 long now = EnvironmentEdgeManager.currentTime();
2909 while (lastIndexExclusive < batchOp.operations.length) {
2910 Mutation mutation = batchOp.getMutation(lastIndexExclusive);
2911 boolean isPutMutation = mutation instanceof Put;
2912
2913 Map<byte[], List<Cell>> familyMap = mutation.getFamilyCellMap();
2914
2915 familyMaps[lastIndexExclusive] = familyMap;
2916
2917
2918 if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
2919 != OperationStatusCode.NOT_RUN) {
2920 lastIndexExclusive++;
2921 continue;
2922 }
2923
2924 try {
2925 if (isPutMutation) {
2926
2927 if (isInReplay) {
2928 removeNonExistentColumnFamilyForReplay(familyMap);
2929 } else {
2930 checkFamilies(familyMap.keySet());
2931 }
2932 checkTimestamps(mutation.getFamilyCellMap(), now);
2933 } else {
2934 prepareDelete((Delete) mutation);
2935 }
2936 checkRow(mutation.getRow(), "doMiniBatchMutation");
2937 } catch (NoSuchColumnFamilyException nscf) {
2938 LOG.warn("No such column family in batch mutation", nscf);
2939 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2940 OperationStatusCode.BAD_FAMILY, nscf.getMessage());
2941 lastIndexExclusive++;
2942 continue;
2943 } catch (FailedSanityCheckException fsce) {
2944 LOG.warn("Batch Mutation did not pass sanity check", fsce);
2945 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2946 OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
2947 lastIndexExclusive++;
2948 continue;
2949 } catch (WrongRegionException we) {
2950 LOG.warn("Batch mutation had a row that does not belong to this region", we);
2951 batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus(
2952 OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
2953 lastIndexExclusive++;
2954 continue;
2955 }
2956
2957
2958
2959 RowLock rowLock = null;
2960 try {
2961 rowLock = getRowLock(mutation.getRow(), true);
2962 } catch (IOException ioe) {
2963 LOG.warn("Failed getting lock in batch put, row="
2964 + Bytes.toStringBinary(mutation.getRow()), ioe);
2965 throw ioe;
2966 }
2967 if (rowLock == null) {
2968
2969 throw new IOException("Failed getting lock in batch put, row=" +
2970 Bytes.toStringBinary(mutation.getRow()));
2971 } else {
2972 acquiredRowLocks.add(rowLock);
2973 }
2974
2975 lastIndexExclusive++;
2976 numReadyToWrite++;
2977
2978 if (isPutMutation) {
2979
2980
2981
2982 if (putsCfSet == null) {
2983 putsCfSet = mutation.getFamilyCellMap().keySet();
2984 } else {
2985 putsCfSetConsistent = putsCfSetConsistent
2986 && mutation.getFamilyCellMap().keySet().equals(putsCfSet);
2987 }
2988 } else {
2989 if (deletesCfSet == null) {
2990 deletesCfSet = mutation.getFamilyCellMap().keySet();
2991 } else {
2992 deletesCfSetConsistent = deletesCfSetConsistent
2993 && mutation.getFamilyCellMap().keySet().equals(deletesCfSet);
2994 }
2995 }
2996 }
2997
2998
2999
3000 now = EnvironmentEdgeManager.currentTime();
3001 byte[] byteNow = Bytes.toBytes(now);
3002
3003
3004 if (numReadyToWrite <= 0) return 0L;
3005
3006
3007
3008
3009
3010
3011 for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) {
3012
3013 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3014 != OperationStatusCode.NOT_RUN) continue;
3015
3016 Mutation mutation = batchOp.getMutation(i);
3017 if (mutation instanceof Put) {
3018 updateCellTimestamps(familyMaps[i].values(), byteNow);
3019 noOfPuts++;
3020 } else {
3021 prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
3022 noOfDeletes++;
3023 }
3024 rewriteCellTags(familyMaps[i], mutation);
3025 }
3026
3027 lock(this.updatesLock.readLock(), numReadyToWrite);
3028 locked = true;
3029
3030
3031 if (!isInReplay && coprocessorHost != null) {
3032 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3033 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3034 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3035 if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L;
3036 }
3037
3038
3039
3040
3041 Durability durability = Durability.USE_DEFAULT;
3042 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3043
3044 if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
3045 continue;
3046 }
3047
3048 Mutation m = batchOp.getMutation(i);
3049 Durability tmpDur = getEffectiveDurability(m.getDurability());
3050 if (tmpDur.ordinal() > durability.ordinal()) {
3051 durability = tmpDur;
3052 }
3053 if (tmpDur == Durability.SKIP_WAL) {
3054 recordMutationWithoutWal(m.getFamilyCellMap());
3055 continue;
3056 }
3057
3058 long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i);
3059
3060
3061
3062 if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
3063 if (walEdit.size() > 0) {
3064 assert isInReplay;
3065 if (!isInReplay) {
3066 throw new IOException("Multiple nonces per batch and not in replay");
3067 }
3068
3069
3070 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3071 this.htableDescriptor.getTableName(), now, m.getClusterIds(),
3072 currentNonceGroup, currentNonce, mvcc);
3073 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey,
3074 walEdit, true);
3075 walEdit = new WALEdit(isInReplay);
3076 walKey = null;
3077 }
3078 currentNonceGroup = nonceGroup;
3079 currentNonce = nonce;
3080 }
3081
3082
3083 WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
3084 if (fromCP != null) {
3085 for (Cell cell : fromCP.getCells()) {
3086 walEdit.add(cell);
3087 }
3088 }
3089 addFamilyMapToWALEdit(familyMaps[i], walEdit);
3090 }
3091
3092
3093
3094
3095 Mutation mutation = batchOp.getMutation(firstIndex);
3096 if (isInReplay) {
3097
3098 walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3099 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3100 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
3101 long replaySeqId = batchOp.getReplaySequenceId();
3102 walKey.setOrigLogSeqNum(replaySeqId);
3103 }
3104 if (walEdit.size() > 0) {
3105 if (!isInReplay) {
3106
3107 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
3108 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
3109 mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
3110 }
3111 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
3112 }
3113
3114
3115
3116 if (walKey == null) {
3117
3118 walKey = this.appendEmptyEdit(this.wal);
3119 }
3120 if (!isInReplay) {
3121 writeEntry = walKey.getWriteEntry();
3122 mvccNum = writeEntry.getWriteNumber();
3123 } else {
3124 mvccNum = batchOp.getReplaySequenceId();
3125 }
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136 long addedSize = 0;
3137 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3138 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3139 != OperationStatusCode.NOT_RUN) {
3140 continue;
3141 }
3142 doRollBackMemstore = true;
3143 addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay);
3144 }
3145
3146
3147
3148
3149 if (locked) {
3150 this.updatesLock.readLock().unlock();
3151 locked = false;
3152 }
3153 releaseRowLocks(acquiredRowLocks);
3154
3155
3156
3157
3158 if (txid != 0) {
3159 syncOrDefer(txid, durability);
3160 }
3161
3162 doRollBackMemstore = false;
3163
3164 if (!isInReplay && coprocessorHost != null) {
3165 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3166 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3167 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
3168 coprocessorHost.postBatchMutate(miniBatchOp);
3169 }
3170
3171
3172
3173
3174 if (writeEntry != null) {
3175 mvcc.completeAndWait(writeEntry);
3176 writeEntry = null;
3177 } else if (isInReplay) {
3178
3179 mvcc.advanceTo(mvccNum);
3180 }
3181
3182 for (int i = firstIndex; i < lastIndexExclusive; i ++) {
3183 if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
3184 batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
3185 }
3186 }
3187
3188
3189
3190
3191
3192 if (!isInReplay && coprocessorHost != null) {
3193 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3194
3195 if (batchOp.retCodeDetails[i].getOperationStatusCode()
3196 != OperationStatusCode.SUCCESS) {
3197 continue;
3198 }
3199 Mutation m = batchOp.getMutation(i);
3200 if (m instanceof Put) {
3201 coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
3202 } else {
3203 coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
3204 }
3205 }
3206 }
3207
3208 success = true;
3209 return addedSize;
3210 } finally {
3211
3212 if (doRollBackMemstore) {
3213 for (int j = 0; j < familyMaps.length; j++) {
3214 for(List<Cell> cells:familyMaps[j].values()) {
3215 rollbackMemstore(cells);
3216 }
3217 }
3218 if (writeEntry != null) mvcc.complete(writeEntry);
3219 } else if (writeEntry != null) {
3220 mvcc.completeAndWait(writeEntry);
3221 }
3222
3223 if (locked) {
3224 this.updatesLock.readLock().unlock();
3225 }
3226 releaseRowLocks(acquiredRowLocks);
3227
3228
3229
3230
3231
3232
3233
3234 if (noOfPuts > 0) {
3235
3236 if (this.metricsRegion != null) {
3237 this.metricsRegion.updatePut();
3238 }
3239 }
3240 if (noOfDeletes > 0) {
3241
3242 if (this.metricsRegion != null) {
3243 this.metricsRegion.updateDelete();
3244 }
3245 }
3246 if (!success) {
3247 for (int i = firstIndex; i < lastIndexExclusive; i++) {
3248 if (batchOp.retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
3249 batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
3250 }
3251 }
3252 }
3253 if (coprocessorHost != null && !batchOp.isInReplay()) {
3254
3255
3256 MiniBatchOperationInProgress<Mutation> miniBatchOp =
3257 new MiniBatchOperationInProgress<Mutation>(batchOp.getMutationsForCoprocs(),
3258 batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex,
3259 lastIndexExclusive);
3260 coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
3261 }
3262
3263 batchOp.nextIndexToProcess = lastIndexExclusive;
3264 }
3265 }
3266
3267
3268
3269
3270
3271 protected Durability getEffectiveDurability(Durability d) {
3272 return d == Durability.USE_DEFAULT ? this.durability : d;
3273 }
3274
3275
3276
3277
3278
3279
3280 @Override
3281 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
3282 CompareOp compareOp, ByteArrayComparable comparator, Mutation w,
3283 boolean writeToWAL)
3284 throws IOException{
3285 checkReadOnly();
3286
3287
3288 checkResources();
3289 boolean isPut = w instanceof Put;
3290 if (!isPut && !(w instanceof Delete))
3291 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " +
3292 "be Put or Delete");
3293 if (!Bytes.equals(row, w.getRow())) {
3294 throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " +
3295 "getRow must match the passed row");
3296 }
3297
3298 startRegionOperation();
3299 try {
3300 Get get = new Get(row);
3301 checkFamily(family);
3302 get.addColumn(family, qualifier);
3303
3304
3305 RowLock rowLock = getRowLock(get.getRow());
3306
3307 mvcc.await();
3308 try {
3309 if (this.getCoprocessorHost() != null) {
3310 Boolean processed = null;
3311 if (w instanceof Put) {
3312 processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family,
3313 qualifier, compareOp, comparator, (Put) w);
3314 } else if (w instanceof Delete) {
3315 processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family,
3316 qualifier, compareOp, comparator, (Delete) w);
3317 }
3318 if (processed != null) {
3319 return processed;
3320 }
3321 }
3322 List<Cell> result = get(get, false);
3323
3324 boolean valueIsNull = comparator.getValue() == null ||
3325 comparator.getValue().length == 0;
3326 boolean matches = false;
3327 long cellTs = 0;
3328 if (result.size() == 0 && valueIsNull) {
3329 matches = true;
3330 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3331 valueIsNull) {
3332 matches = true;
3333 cellTs = result.get(0).getTimestamp();
3334 } else if (result.size() == 1 && !valueIsNull) {
3335 Cell kv = result.get(0);
3336 cellTs = kv.getTimestamp();
3337 int compareResult = CellComparator.compareValue(kv, comparator);
3338 switch (compareOp) {
3339 case LESS:
3340 matches = compareResult < 0;
3341 break;
3342 case LESS_OR_EQUAL:
3343 matches = compareResult <= 0;
3344 break;
3345 case EQUAL:
3346 matches = compareResult == 0;
3347 break;
3348 case NOT_EQUAL:
3349 matches = compareResult != 0;
3350 break;
3351 case GREATER_OR_EQUAL:
3352 matches = compareResult >= 0;
3353 break;
3354 case GREATER:
3355 matches = compareResult > 0;
3356 break;
3357 default:
3358 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3359 }
3360 }
3361
3362 if (matches) {
3363
3364
3365
3366
3367 long now = EnvironmentEdgeManager.currentTime();
3368 long ts = Math.max(now, cellTs);
3369 byte[] byteTs = Bytes.toBytes(ts);
3370
3371 if (w instanceof Put) {
3372 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3373 }
3374
3375
3376
3377
3378
3379 doBatchMutate(w);
3380 this.checkAndMutateChecksPassed.increment();
3381 return true;
3382 }
3383 this.checkAndMutateChecksFailed.increment();
3384 return false;
3385 } finally {
3386 rowLock.release();
3387 }
3388 } finally {
3389 closeRegionOperation();
3390 }
3391 }
3392
3393
3394
3395
3396
3397
3398 @Override
3399 public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
3400 CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm,
3401 boolean writeToWAL) throws IOException {
3402 checkReadOnly();
3403
3404
3405 checkResources();
3406
3407 startRegionOperation();
3408 try {
3409 Get get = new Get(row);
3410 checkFamily(family);
3411 get.addColumn(family, qualifier);
3412
3413
3414 RowLock rowLock = getRowLock(get.getRow());
3415
3416 mvcc.await();
3417 try {
3418 List<Cell> result = get(get, false);
3419
3420 boolean valueIsNull = comparator.getValue() == null ||
3421 comparator.getValue().length == 0;
3422 boolean matches = false;
3423 long cellTs = 0;
3424 if (result.size() == 0 && valueIsNull) {
3425 matches = true;
3426 } else if (result.size() > 0 && result.get(0).getValueLength() == 0 &&
3427 valueIsNull) {
3428 matches = true;
3429 cellTs = result.get(0).getTimestamp();
3430 } else if (result.size() == 1 && !valueIsNull) {
3431 Cell kv = result.get(0);
3432 cellTs = kv.getTimestamp();
3433 int compareResult = CellComparator.compareValue(kv, comparator);
3434 switch (compareOp) {
3435 case LESS:
3436 matches = compareResult < 0;
3437 break;
3438 case LESS_OR_EQUAL:
3439 matches = compareResult <= 0;
3440 break;
3441 case EQUAL:
3442 matches = compareResult == 0;
3443 break;
3444 case NOT_EQUAL:
3445 matches = compareResult != 0;
3446 break;
3447 case GREATER_OR_EQUAL:
3448 matches = compareResult >= 0;
3449 break;
3450 case GREATER:
3451 matches = compareResult > 0;
3452 break;
3453 default:
3454 throw new RuntimeException("Unknown Compare op " + compareOp.name());
3455 }
3456 }
3457
3458 if (matches) {
3459
3460
3461
3462
3463 long now = EnvironmentEdgeManager.currentTime();
3464 long ts = Math.max(now, cellTs);
3465 byte[] byteTs = Bytes.toBytes(ts);
3466
3467 for (Mutation w : rm.getMutations()) {
3468 if (w instanceof Put) {
3469 updateCellTimestamps(w.getFamilyCellMap().values(), byteTs);
3470 }
3471
3472
3473 }
3474
3475
3476
3477 mutateRow(rm);
3478 this.checkAndMutateChecksPassed.increment();
3479 return true;
3480 }
3481 this.checkAndMutateChecksFailed.increment();
3482 return false;
3483 } finally {
3484 rowLock.release();
3485 }
3486 } finally {
3487 closeRegionOperation();
3488 }
3489 }
3490
3491 private void doBatchMutate(Mutation mutation) throws IOException {
3492
3493 OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
3494 if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
3495 throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
3496 } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
3497 throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
3498 }
3499 }
3500
3501
3502
3503
3504
3505
3506
3507
3508
3509
3510
3511
3512
3513
3514 public void addRegionToSnapshot(SnapshotDescription desc,
3515 ForeignExceptionSnare exnSnare) throws IOException {
3516 Path rootDir = FSUtils.getRootDir(conf);
3517 Path snapshotDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(desc, rootDir);
3518
3519 SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
3520 snapshotDir, desc, exnSnare);
3521 manifest.addRegion(this);
3522
3523
3524
3525 if (!Bytes.equals(getRegionInfo().getStartKey(), HConstants.EMPTY_START_ROW))
3526 return;
3527
3528
3529 List<Store> stores = getStores();
3530 for (Store store : stores) {
3531 boolean hasMobStore = store.getFamily().isMobEnabled();
3532 if (hasMobStore) {
3533
3534 HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(this.getTableDesc().getTableName());
3535 mobRegionInfo.setOffline(true);
3536 manifest.addMobRegion(mobRegionInfo, this.getTableDesc().getColumnFamilies());
3537 return;
3538 }
3539 }
3540 }
3541
3542 @Override
3543 public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final byte[] now)
3544 throws IOException {
3545 for (List<Cell> cells: cellItr) {
3546 if (cells == null) continue;
3547 assert cells instanceof RandomAccess;
3548 int listSize = cells.size();
3549 for (int i = 0; i < listSize; i++) {
3550 CellUtil.updateLatestStamp(cells.get(i), now, 0);
3551 }
3552 }
3553 }
3554
3555
3556
3557
3558 void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
3559
3560
3561
3562 if (m.getTTL() == Long.MAX_VALUE) {
3563 return;
3564 }
3565
3566
3567
3568 for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
3569 List<Cell> cells = e.getValue();
3570 assert cells instanceof RandomAccess;
3571 int listSize = cells.size();
3572 for (int i = 0; i < listSize; i++) {
3573 Cell cell = cells.get(i);
3574 List<Tag> newTags = new ArrayList<Tag>();
3575 Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
3576 cell.getTagsOffset(), cell.getTagsLength());
3577
3578
3579
3580 while (tagIterator.hasNext()) {
3581
3582
3583
3584 newTags.add(tagIterator.next());
3585 }
3586
3587
3588
3589
3590
3591 if (m.getTTL() != Long.MAX_VALUE) {
3592
3593 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
3594 }
3595
3596
3597 cells.set(i, new TagRewriteCell(cell, Tag.fromList(newTags)));
3598 }
3599 }
3600 }
3601
3602
3603
3604
3605
3606
3607
3608 private void checkResources() throws RegionTooBusyException {
3609
3610 if (this.getRegionInfo().isMetaRegion()) return;
3611
3612 if (this.memstoreSize.get() > this.blockingMemStoreSize) {
3613 blockedRequestsCount.increment();
3614 requestFlush();
3615 throw new RegionTooBusyException("Above memstore limit, " +
3616 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
3617 this.getRegionInfo().getRegionNameAsString()) +
3618 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
3619 this.getRegionServerServices().getServerName()) +
3620 ", memstoreSize=" + memstoreSize.get() +
3621 ", blockingMemStoreSize=" + blockingMemStoreSize);
3622 }
3623 }
3624
3625
3626
3627
3628 protected void checkReadOnly() throws IOException {
3629 if (isReadOnly()) {
3630 throw new DoNotRetryIOException("region is read only");
3631 }
3632 }
3633
3634 protected void checkReadsEnabled() throws IOException {
3635 if (!this.writestate.readsEnabled) {
3636 throw new IOException(getRegionInfo().getEncodedName()
3637 + ": The region's reads are disabled. Cannot serve the request");
3638 }
3639 }
3640
3641 public void setReadsEnabled(boolean readsEnabled) {
3642 if (readsEnabled && !this.writestate.readsEnabled) {
3643 LOG.info(getRegionInfo().getEncodedName() + " : Enabling reads for region.");
3644 }
3645 this.writestate.setReadsEnabled(readsEnabled);
3646 }
3647
3648
3649
3650
3651
3652
3653
3654 private void put(final byte [] row, byte [] family, List<Cell> edits)
3655 throws IOException {
3656 NavigableMap<byte[], List<Cell>> familyMap;
3657 familyMap = new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
3658
3659 familyMap.put(family, edits);
3660 Put p = new Put(row);
3661 p.setFamilyCellMap(familyMap);
3662 doBatchMutate(p);
3663 }
3664
3665
3666
3667
3668
3669
3670
3671
3672
3673
3674
3675
3676
3677 private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
3678 long mvccNum, boolean isInReplay) throws IOException {
3679 long size = 0;
3680
3681 for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
3682 byte[] family = e.getKey();
3683 List<Cell> cells = e.getValue();
3684 assert cells instanceof RandomAccess;
3685 Store store = getStore(family);
3686 int listSize = cells.size();
3687 for (int i=0; i < listSize; i++) {
3688 Cell cell = cells.get(i);
3689 if (cell.getSequenceId() == 0 || isInReplay) {
3690 CellUtil.setSequenceId(cell, mvccNum);
3691 }
3692 size += store.add(cell);
3693 }
3694 }
3695
3696 return size;
3697 }
3698
3699
3700
3701
3702
3703
3704 private void rollbackMemstore(List<Cell> memstoreCells) {
3705 int kvsRolledback = 0;
3706
3707 for (Cell cell : memstoreCells) {
3708 byte[] family = CellUtil.cloneFamily(cell);
3709 Store store = getStore(family);
3710 store.rollback(cell);
3711 kvsRolledback++;
3712 }
3713 LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
3714 }
3715
3716 @Override
3717 public void checkFamilies(Collection<byte[]> families) throws NoSuchColumnFamilyException {
3718 for (byte[] family : families) {
3719 checkFamily(family);
3720 }
3721 }
3722
3723
3724
3725
3726
3727 private void removeNonExistentColumnFamilyForReplay(
3728 final Map<byte[], List<Cell>> familyMap) {
3729 List<byte[]> nonExistentList = null;
3730 for (byte[] family : familyMap.keySet()) {
3731 if (!this.htableDescriptor.hasFamily(family)) {
3732 if (nonExistentList == null) {
3733 nonExistentList = new ArrayList<byte[]>();
3734 }
3735 nonExistentList.add(family);
3736 }
3737 }
3738 if (nonExistentList != null) {
3739 for (byte[] family : nonExistentList) {
3740
3741 LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
3742 familyMap.remove(family);
3743 }
3744 }
3745 }
3746
3747 @Override
3748 public void checkTimestamps(final Map<byte[], List<Cell>> familyMap, long now)
3749 throws FailedSanityCheckException {
3750 if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
3751 return;
3752 }
3753 long maxTs = now + timestampSlop;
3754 for (List<Cell> kvs : familyMap.values()) {
3755 assert kvs instanceof RandomAccess;
3756 int listSize = kvs.size();
3757 for (int i=0; i < listSize; i++) {
3758 Cell cell = kvs.get(i);
3759
3760 long ts = cell.getTimestamp();
3761 if (ts != HConstants.LATEST_TIMESTAMP && ts > maxTs) {
3762 throw new FailedSanityCheckException("Timestamp for KV out of range "
3763 + cell + " (too.new=" + timestampSlop + ")");
3764 }
3765 }
3766 }
3767 }
3768
3769
3770
3771
3772
3773
3774
3775 private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
3776 WALEdit walEdit) {
3777 for (List<Cell> edits : familyMap.values()) {
3778 assert edits instanceof RandomAccess;
3779 int listSize = edits.size();
3780 for (int i=0; i < listSize; i++) {
3781 Cell cell = edits.get(i);
3782 walEdit.add(cell);
3783 }
3784 }
3785 }
3786
3787 private void requestFlush() {
3788 if (this.rsServices == null) {
3789 return;
3790 }
3791 synchronized (writestate) {
3792 if (this.writestate.isFlushRequested()) {
3793 return;
3794 }
3795 writestate.flushRequested = true;
3796 }
3797
3798 this.rsServices.getFlushRequester().requestFlush(this, false);
3799 if (LOG.isDebugEnabled()) {
3800 LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
3801 }
3802 }
3803
3804
3805
3806
3807
3808 private boolean isFlushSize(final long size) {
3809 return size > this.memstoreFlushSize;
3810 }
3811
3812
3813
3814
3815
3816
3817
3818
3819
3820
3821
3822
3823
3824
3825
3826
3827
3828
3829
3830
3831
3832
3833
3834
3835
3836
3837
3838
3839
3840
3841
3842
3843
3844
3845 protected long replayRecoveredEditsIfAny(final Path regiondir,
3846 Map<byte[], Long> maxSeqIdInStores,
3847 final CancelableProgressable reporter, final MonitoredTask status)
3848 throws IOException {
3849 long minSeqIdForTheRegion = -1;
3850 for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
3851 if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
3852 minSeqIdForTheRegion = maxSeqIdInStore;
3853 }
3854 }
3855 long seqid = minSeqIdForTheRegion;
3856
3857 FileSystem fs = this.fs.getFileSystem();
3858 NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
3859 if (LOG.isDebugEnabled()) {
3860 LOG.debug("Found " + (files == null ? 0 : files.size())
3861 + " recovered edits file(s) under " + regiondir);
3862 }
3863
3864 if (files == null || files.isEmpty()) return seqid;
3865
3866 for (Path edits: files) {
3867 if (edits == null || !fs.exists(edits)) {
3868 LOG.warn("Null or non-existent edits file: " + edits);
3869 continue;
3870 }
3871 if (isZeroLengthThenDelete(fs, edits)) continue;
3872
3873 long maxSeqId;
3874 String fileName = edits.getName();
3875 maxSeqId = Math.abs(Long.parseLong(fileName));
3876 if (maxSeqId <= minSeqIdForTheRegion) {
3877 if (LOG.isDebugEnabled()) {
3878 String msg = "Maximum sequenceid for this wal is " + maxSeqId
3879 + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
3880 + ", skipped the whole file, path=" + edits;
3881 LOG.debug(msg);
3882 }
3883 continue;
3884 }
3885
3886 try {
3887
3888
3889 seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
3890 } catch (IOException e) {
3891 boolean skipErrors = conf.getBoolean(
3892 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
3893 conf.getBoolean(
3894 "hbase.skip.errors",
3895 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
3896 if (conf.get("hbase.skip.errors") != null) {
3897 LOG.warn(
3898 "The property 'hbase.skip.errors' has been deprecated. Please use " +
3899 HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
3900 }
3901 if (skipErrors) {
3902 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
3903 LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
3904 + "=true so continuing. Renamed " + edits +
3905 " as " + p, e);
3906 } else {
3907 throw e;
3908 }
3909 }
3910 }
3911
3912
3913 if (this.rsAccounting != null) {
3914 this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
3915 }
3916 if (seqid > minSeqIdForTheRegion) {
3917
3918 internalFlushcache(null, seqid, stores.values(), status, false);
3919 }
3920
3921 if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
3922
3923
3924
3925 String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
3926 Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
3927 for (Path file: files) {
3928 fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
3929 null, null));
3930 }
3931 getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
3932 } else {
3933 for (Path file: files) {
3934 if (!fs.delete(file, false)) {
3935 LOG.error("Failed delete of " + file);
3936 } else {
3937 LOG.debug("Deleted recovered.edits file=" + file);
3938 }
3939 }
3940 }
3941 return seqid;
3942 }
3943
3944
3945
3946
3947
3948
3949
3950
3951
3952
3953 private long replayRecoveredEdits(final Path edits,
3954 Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
3955 throws IOException {
3956 String msg = "Replaying edits from " + edits;
3957 LOG.info(msg);
3958 MonitoredTask status = TaskMonitor.get().createStatus(msg);
3959 FileSystem fs = this.fs.getFileSystem();
3960
3961 status.setStatus("Opening recovered edits");
3962 WAL.Reader reader = null;
3963 try {
3964 reader = WALFactory.createReader(fs, edits, conf);
3965 long currentEditSeqId = -1;
3966 long currentReplaySeqId = -1;
3967 long firstSeqIdInLog = -1;
3968 long skippedEdits = 0;
3969 long editsCount = 0;
3970 long intervalEdits = 0;
3971 WAL.Entry entry;
3972 Store store = null;
3973 boolean reported_once = false;
3974 ServerNonceManager ng = this.rsServices == null ? null : this.rsServices.getNonceManager();
3975
3976 try {
3977
3978 int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
3979
3980 int period = this.conf.getInt("hbase.hstore.report.period", 300000);
3981 long lastReport = EnvironmentEdgeManager.currentTime();
3982
3983 while ((entry = reader.next()) != null) {
3984 WALKey key = entry.getKey();
3985 WALEdit val = entry.getEdit();
3986
3987 if (ng != null) {
3988 ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime());
3989 }
3990
3991 if (reporter != null) {
3992 intervalEdits += val.size();
3993 if (intervalEdits >= interval) {
3994
3995 intervalEdits = 0;
3996 long cur = EnvironmentEdgeManager.currentTime();
3997 if (lastReport + period <= cur) {
3998 status.setStatus("Replaying edits..." +
3999 " skipped=" + skippedEdits +
4000 " edits=" + editsCount);
4001
4002 if(!reporter.progress()) {
4003 msg = "Progressable reporter failed, stopping replay";
4004 LOG.warn(msg);
4005 status.abort(msg);
4006 throw new IOException(msg);
4007 }
4008 reported_once = true;
4009 lastReport = cur;
4010 }
4011 }
4012 }
4013
4014 if (firstSeqIdInLog == -1) {
4015 firstSeqIdInLog = key.getLogSeqNum();
4016 }
4017 if (currentEditSeqId > key.getLogSeqNum()) {
4018
4019
4020 LOG.error(getRegionInfo().getEncodedName() + " : "
4021 + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
4022 + "; edit=" + val);
4023 } else {
4024 currentEditSeqId = key.getLogSeqNum();
4025 }
4026 currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
4027 key.getOrigLogSeqNum() : currentEditSeqId;
4028
4029
4030
4031 if (coprocessorHost != null) {
4032 status.setStatus("Running pre-WAL-restore hook in coprocessors");
4033 if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
4034
4035 continue;
4036 }
4037 }
4038
4039 if (!Bytes.equals(key.getEncodedRegionName(),
4040 this.getRegionInfo().getEncodedNameAsBytes())) {
4041 skippedEdits++;
4042 continue;
4043 }
4044
4045 boolean flush = false;
4046 for (Cell cell: val.getCells()) {
4047
4048
4049 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
4050
4051 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
4052 if (compaction != null) {
4053
4054 replayWALCompactionMarker(compaction, false, true, Long.MAX_VALUE);
4055 }
4056 skippedEdits++;
4057 continue;
4058 }
4059
4060 if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
4061 store = getStore(cell);
4062 }
4063 if (store == null) {
4064
4065
4066 LOG.warn("No family for " + cell);
4067 skippedEdits++;
4068 continue;
4069 }
4070
4071 if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
4072 .getName())) {
4073 skippedEdits++;
4074 continue;
4075 }
4076 CellUtil.setSequenceId(cell, currentReplaySeqId);
4077
4078
4079
4080
4081 flush |= restoreEdit(store, cell);
4082 editsCount++;
4083 }
4084 if (flush) {
4085 internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
4086 }
4087
4088 if (coprocessorHost != null) {
4089 coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
4090 }
4091 }
4092 } catch (EOFException eof) {
4093 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4094 msg = "Encountered EOF. Most likely due to Master failure during " +
4095 "wal splitting, so we have this data in another edit. " +
4096 "Continuing, but renaming " + edits + " as " + p;
4097 LOG.warn(msg, eof);
4098 status.abort(msg);
4099 } catch (IOException ioe) {
4100
4101
4102 if (ioe.getCause() instanceof ParseException) {
4103 Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
4104 msg = "File corruption encountered! " +
4105 "Continuing, but renaming " + edits + " as " + p;
4106 LOG.warn(msg, ioe);
4107 status.setStatus(msg);
4108 } else {
4109 status.abort(StringUtils.stringifyException(ioe));
4110
4111
4112 throw ioe;
4113 }
4114 }
4115 if (reporter != null && !reported_once) {
4116 reporter.progress();
4117 }
4118 msg = "Applied " + editsCount + ", skipped " + skippedEdits +
4119 ", firstSequenceIdInLog=" + firstSeqIdInLog +
4120 ", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
4121 status.markComplete(msg);
4122 LOG.debug(msg);
4123 return currentEditSeqId;
4124 } finally {
4125 status.cleanup();
4126 if (reader != null) {
4127 reader.close();
4128 }
4129 }
4130 }
4131
4132
4133
4134
4135
4136
4137 void replayWALCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles,
4138 boolean removeFiles, long replaySeqId)
4139 throws IOException {
4140 checkTargetRegion(compaction.getEncodedRegionName().toByteArray(),
4141 "Compaction marker from WAL ", compaction);
4142
4143 synchronized (writestate) {
4144 if (replaySeqId < lastReplayedOpenRegionSeqId) {
4145 LOG.warn(getRegionInfo().getEncodedName() + " : "
4146 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4147 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4148 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4149 return;
4150 }
4151 if (replaySeqId < lastReplayedCompactionSeqId) {
4152 LOG.warn(getRegionInfo().getEncodedName() + " : "
4153 + "Skipping replaying compaction event :" + TextFormat.shortDebugString(compaction)
4154 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4155 + "lastReplayedCompactionSeqId of " + lastReplayedCompactionSeqId);
4156 return;
4157 } else {
4158 lastReplayedCompactionSeqId = replaySeqId;
4159 }
4160
4161 if (LOG.isDebugEnabled()) {
4162 LOG.debug(getRegionInfo().getEncodedName() + " : "
4163 + "Replaying compaction marker " + TextFormat.shortDebugString(compaction)
4164 + " with seqId=" + replaySeqId + " and lastReplayedOpenRegionSeqId="
4165 + lastReplayedOpenRegionSeqId);
4166 }
4167
4168 startRegionOperation(Operation.REPLAY_EVENT);
4169 try {
4170 Store store = this.getStore(compaction.getFamilyName().toByteArray());
4171 if (store == null) {
4172 LOG.warn(getRegionInfo().getEncodedName() + " : "
4173 + "Found Compaction WAL edit for deleted family:"
4174 + Bytes.toString(compaction.getFamilyName().toByteArray()));
4175 return;
4176 }
4177 store.replayCompactionMarker(compaction, pickCompactionFiles, removeFiles);
4178 logRegionFiles();
4179 } catch (FileNotFoundException ex) {
4180 LOG.warn(getRegionInfo().getEncodedName() + " : "
4181 + "At least one of the store files in compaction: "
4182 + TextFormat.shortDebugString(compaction)
4183 + " doesn't exist any more. Skip loading the file(s)", ex);
4184 } finally {
4185 closeRegionOperation(Operation.REPLAY_EVENT);
4186 }
4187 }
4188 }
4189
4190 void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
4191 checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
4192 "Flush marker from WAL ", flush);
4193
4194 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4195 return;
4196 }
4197
4198 if (LOG.isDebugEnabled()) {
4199 LOG.debug(getRegionInfo().getEncodedName() + " : "
4200 + "Replaying flush marker " + TextFormat.shortDebugString(flush));
4201 }
4202
4203 startRegionOperation(Operation.REPLAY_EVENT);
4204 try {
4205 FlushAction action = flush.getAction();
4206 switch (action) {
4207 case START_FLUSH:
4208 replayWALFlushStartMarker(flush);
4209 break;
4210 case COMMIT_FLUSH:
4211 replayWALFlushCommitMarker(flush);
4212 break;
4213 case ABORT_FLUSH:
4214 replayWALFlushAbortMarker(flush);
4215 break;
4216 case CANNOT_FLUSH:
4217 replayWALFlushCannotFlushMarker(flush, replaySeqId);
4218 break;
4219 default:
4220 LOG.warn(getRegionInfo().getEncodedName() + " : " +
4221 "Received a flush event with unknown action, ignoring. " +
4222 TextFormat.shortDebugString(flush));
4223 break;
4224 }
4225
4226 logRegionFiles();
4227 } finally {
4228 closeRegionOperation(Operation.REPLAY_EVENT);
4229 }
4230 }
4231
4232
4233
4234
4235
4236 @VisibleForTesting
4237 PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
4238 long flushSeqId = flush.getFlushSequenceNumber();
4239
4240 HashSet<Store> storesToFlush = new HashSet<Store>();
4241 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4242 byte[] family = storeFlush.getFamilyName().toByteArray();
4243 Store store = getStore(family);
4244 if (store == null) {
4245 LOG.warn(getRegionInfo().getEncodedName() + " : "
4246 + "Received a flush start marker from primary, but the family is not found. Ignoring"
4247 + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
4248 continue;
4249 }
4250 storesToFlush.add(store);
4251 }
4252
4253 MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
4254
4255
4256
4257 synchronized (writestate) {
4258 try {
4259 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4260 LOG.warn(getRegionInfo().getEncodedName() + " : "
4261 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4262 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4263 + " of " + lastReplayedOpenRegionSeqId);
4264 return null;
4265 }
4266 if (numMutationsWithoutWAL.get() > 0) {
4267 numMutationsWithoutWAL.set(0);
4268 dataInMemoryWithoutWAL.set(0);
4269 }
4270
4271 if (!writestate.flushing) {
4272
4273
4274
4275
4276 PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
4277 flushSeqId, storesToFlush, status, false);
4278 if (prepareResult.result == null) {
4279
4280 this.writestate.flushing = true;
4281 this.prepareFlushResult = prepareResult;
4282 status.markComplete("Flush prepare successful");
4283 if (LOG.isDebugEnabled()) {
4284 LOG.debug(getRegionInfo().getEncodedName() + " : "
4285 + " Prepared flush with seqId:" + flush.getFlushSequenceNumber());
4286 }
4287 } else {
4288
4289
4290 if (prepareResult.getResult().getResult() ==
4291 FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
4292 this.writestate.flushing = true;
4293 this.prepareFlushResult = prepareResult;
4294 if (LOG.isDebugEnabled()) {
4295 LOG.debug(getRegionInfo().getEncodedName() + " : "
4296 + " Prepared empty flush with seqId:" + flush.getFlushSequenceNumber());
4297 }
4298 }
4299 status.abort("Flush prepare failed with " + prepareResult.result);
4300
4301 }
4302 return prepareResult;
4303 } else {
4304
4305 if (flush.getFlushSequenceNumber() == this.prepareFlushResult.flushOpSeqId) {
4306
4307 LOG.warn(getRegionInfo().getEncodedName() + " : "
4308 + "Received a flush prepare marker with the same seqId: " +
4309 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4310 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4311
4312 } else if (flush.getFlushSequenceNumber() < this.prepareFlushResult.flushOpSeqId) {
4313
4314
4315 LOG.warn(getRegionInfo().getEncodedName() + " : "
4316 + "Received a flush prepare marker with a smaller seqId: " +
4317 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4318 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4319
4320 } else {
4321
4322 LOG.warn(getRegionInfo().getEncodedName() + " : "
4323 + "Received a flush prepare marker with a larger seqId: " +
4324 + flush.getFlushSequenceNumber() + " before clearing the previous one with seqId: "
4325 + prepareFlushResult.flushOpSeqId + ". Ignoring");
4326
4327
4328
4329
4330
4331
4332
4333
4334
4335
4336
4337 }
4338 }
4339 } finally {
4340 status.cleanup();
4341 writestate.notifyAll();
4342 }
4343 }
4344 return null;
4345 }
4346
4347 @VisibleForTesting
4348 void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
4349 MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
4350
4351
4352
4353
4354
4355 synchronized (writestate) {
4356 try {
4357 if (flush.getFlushSequenceNumber() < lastReplayedOpenRegionSeqId) {
4358 LOG.warn(getRegionInfo().getEncodedName() + " : "
4359 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4360 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4361 + " of " + lastReplayedOpenRegionSeqId);
4362 return;
4363 }
4364
4365 if (writestate.flushing) {
4366 PrepareFlushResult prepareFlushResult = this.prepareFlushResult;
4367 if (flush.getFlushSequenceNumber() == prepareFlushResult.flushOpSeqId) {
4368 if (LOG.isDebugEnabled()) {
4369 LOG.debug(getRegionInfo().getEncodedName() + " : "
4370 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4371 + " and a previous prepared snapshot was found");
4372 }
4373
4374
4375 replayFlushInStores(flush, prepareFlushResult, true);
4376
4377
4378 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4379
4380 this.prepareFlushResult = null;
4381 writestate.flushing = false;
4382 } else if (flush.getFlushSequenceNumber() < prepareFlushResult.flushOpSeqId) {
4383
4384
4385
4386
4387 LOG.warn(getRegionInfo().getEncodedName() + " : "
4388 + "Received a flush commit marker with smaller seqId: "
4389 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: "
4390 + prepareFlushResult.flushOpSeqId + ". Picking up new file, but not dropping"
4391 +" prepared memstore snapshot");
4392 replayFlushInStores(flush, prepareFlushResult, false);
4393
4394
4395
4396 } else {
4397
4398
4399
4400
4401
4402 LOG.warn(getRegionInfo().getEncodedName() + " : "
4403 + "Received a flush commit marker with larger seqId: "
4404 + flush.getFlushSequenceNumber() + " than what we have prepared with seqId: " +
4405 prepareFlushResult.flushOpSeqId + ". Picking up new file and dropping prepared"
4406 +" memstore snapshot");
4407
4408 replayFlushInStores(flush, prepareFlushResult, true);
4409
4410
4411 this.addAndGetGlobalMemstoreSize(-prepareFlushResult.totalFlushableSize);
4412
4413
4414
4415 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4416
4417 this.prepareFlushResult = null;
4418 writestate.flushing = false;
4419 }
4420
4421
4422
4423
4424
4425 this.setReadsEnabled(true);
4426 } else {
4427 LOG.warn(getRegionInfo().getEncodedName() + " : "
4428 + "Received a flush commit marker with seqId:" + flush.getFlushSequenceNumber()
4429 + ", but no previous prepared snapshot was found");
4430
4431
4432 replayFlushInStores(flush, null, false);
4433
4434
4435
4436 dropMemstoreContentsForSeqId(flush.getFlushSequenceNumber(), null);
4437 }
4438
4439 status.markComplete("Flush commit successful");
4440
4441
4442 this.maxFlushedSeqId = flush.getFlushSequenceNumber();
4443
4444
4445 mvcc.advanceTo(flush.getFlushSequenceNumber());
4446
4447 } catch (FileNotFoundException ex) {
4448 LOG.warn(getRegionInfo().getEncodedName() + " : "
4449 + "At least one of the store files in flush: " + TextFormat.shortDebugString(flush)
4450 + " doesn't exist any more. Skip loading the file(s)", ex);
4451 }
4452 finally {
4453 status.cleanup();
4454 writestate.notifyAll();
4455 }
4456 }
4457
4458
4459
4460 synchronized (this) {
4461 notifyAll();
4462 }
4463 }
4464
4465
4466
4467
4468
4469
4470
4471
4472
4473 private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
4474 boolean dropMemstoreSnapshot)
4475 throws IOException {
4476 for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
4477 byte[] family = storeFlush.getFamilyName().toByteArray();
4478 Store store = getStore(family);
4479 if (store == null) {
4480 LOG.warn(getRegionInfo().getEncodedName() + " : "
4481 + "Received a flush commit marker from primary, but the family is not found."
4482 + "Ignoring StoreFlushDescriptor:" + storeFlush);
4483 continue;
4484 }
4485 List<String> flushFiles = storeFlush.getFlushOutputList();
4486 StoreFlushContext ctx = null;
4487 long startTime = EnvironmentEdgeManager.currentTime();
4488 if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
4489 ctx = store.createFlushContext(flush.getFlushSequenceNumber());
4490 } else {
4491 ctx = prepareFlushResult.storeFlushCtxs.get(family);
4492 startTime = prepareFlushResult.startTime;
4493 }
4494
4495 if (ctx == null) {
4496 LOG.warn(getRegionInfo().getEncodedName() + " : "
4497 + "Unexpected: flush commit marker received from store "
4498 + Bytes.toString(family) + " but no associated flush context. Ignoring");
4499 continue;
4500 }
4501
4502 ctx.replayFlush(flushFiles, dropMemstoreSnapshot);
4503
4504
4505 this.lastStoreFlushTimeMap.put(store, startTime);
4506 }
4507 }
4508
4509
4510
4511
4512
4513
4514 private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
4515 long totalFreedSize = 0;
4516 this.updatesLock.writeLock().lock();
4517 try {
4518
4519 long currentSeqId = mvcc.getReadPoint();
4520 if (seqId >= currentSeqId) {
4521
4522 LOG.info(getRegionInfo().getEncodedName() + " : "
4523 + "Dropping memstore contents as well since replayed flush seqId: "
4524 + seqId + " is greater than current seqId:" + currentSeqId);
4525
4526
4527 if (store == null) {
4528 for (Store s : stores.values()) {
4529 totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
4530 }
4531 } else {
4532 totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId);
4533 }
4534 } else {
4535 LOG.info(getRegionInfo().getEncodedName() + " : "
4536 + "Not dropping memstore contents since replayed flush seqId: "
4537 + seqId + " is smaller than current seqId:" + currentSeqId);
4538 }
4539 } finally {
4540 this.updatesLock.writeLock().unlock();
4541 }
4542 return totalFreedSize;
4543 }
4544
4545 private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException {
4546 long snapshotSize = s.getFlushableSize();
4547 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4548 StoreFlushContext ctx = s.createFlushContext(currentSeqId);
4549 ctx.prepare();
4550 ctx.abort();
4551 return snapshotSize;
4552 }
4553
4554 private void replayWALFlushAbortMarker(FlushDescriptor flush) {
4555
4556
4557
4558 }
4559
4560 private void replayWALFlushCannotFlushMarker(FlushDescriptor flush, long replaySeqId) {
4561 synchronized (writestate) {
4562 if (this.lastReplayedOpenRegionSeqId > replaySeqId) {
4563 LOG.warn(getRegionInfo().getEncodedName() + " : "
4564 + "Skipping replaying flush event :" + TextFormat.shortDebugString(flush)
4565 + " because its sequence id " + replaySeqId + " is smaller than this regions "
4566 + "lastReplayedOpenRegionSeqId of " + lastReplayedOpenRegionSeqId);
4567 return;
4568 }
4569
4570
4571
4572
4573
4574
4575 this.setReadsEnabled(true);
4576 }
4577 }
4578
4579 @VisibleForTesting
4580 PrepareFlushResult getPrepareFlushResult() {
4581 return prepareFlushResult;
4582 }
4583
4584 void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
4585 checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
4586 "RegionEvent marker from WAL ", regionEvent);
4587
4588 startRegionOperation(Operation.REPLAY_EVENT);
4589 try {
4590 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4591 return;
4592 }
4593
4594 if (regionEvent.getEventType() == EventType.REGION_CLOSE) {
4595
4596 return;
4597 }
4598 if (regionEvent.getEventType() != EventType.REGION_OPEN) {
4599 LOG.warn(getRegionInfo().getEncodedName() + " : "
4600 + "Unknown region event received, ignoring :"
4601 + TextFormat.shortDebugString(regionEvent));
4602 return;
4603 }
4604
4605 if (LOG.isDebugEnabled()) {
4606 LOG.debug(getRegionInfo().getEncodedName() + " : "
4607 + "Replaying region open event marker " + TextFormat.shortDebugString(regionEvent));
4608 }
4609
4610
4611 synchronized (writestate) {
4612
4613
4614
4615
4616
4617
4618 if (this.lastReplayedOpenRegionSeqId <= regionEvent.getLogSequenceNumber()) {
4619 this.lastReplayedOpenRegionSeqId = regionEvent.getLogSequenceNumber();
4620 } else {
4621 LOG.warn(getRegionInfo().getEncodedName() + " : "
4622 + "Skipping replaying region event :" + TextFormat.shortDebugString(regionEvent)
4623 + " because its sequence id is smaller than this regions lastReplayedOpenRegionSeqId "
4624 + " of " + lastReplayedOpenRegionSeqId);
4625 return;
4626 }
4627
4628
4629
4630 for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
4631
4632 byte[] family = storeDescriptor.getFamilyName().toByteArray();
4633 Store store = getStore(family);
4634 if (store == null) {
4635 LOG.warn(getRegionInfo().getEncodedName() + " : "
4636 + "Received a region open marker from primary, but the family is not found. "
4637 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4638 continue;
4639 }
4640
4641 long storeSeqId = store.getMaxSequenceId();
4642 List<String> storeFiles = storeDescriptor.getStoreFileList();
4643 try {
4644 store.refreshStoreFiles(storeFiles);
4645 } catch (FileNotFoundException ex) {
4646 LOG.warn(getRegionInfo().getEncodedName() + " : "
4647 + "At least one of the store files: " + storeFiles
4648 + " doesn't exist any more. Skip loading the file(s)", ex);
4649 continue;
4650 }
4651 if (store.getMaxSequenceId() != storeSeqId) {
4652
4653 lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime());
4654 }
4655
4656 if (writestate.flushing) {
4657
4658 if (this.prepareFlushResult.flushOpSeqId <= regionEvent.getLogSequenceNumber()) {
4659 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4660 null : this.prepareFlushResult.storeFlushCtxs.get(family);
4661 if (ctx != null) {
4662 long snapshotSize = store.getFlushableSize();
4663 ctx.abort();
4664 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4665 this.prepareFlushResult.storeFlushCtxs.remove(family);
4666 }
4667 }
4668 }
4669
4670
4671 dropMemstoreContentsForSeqId(regionEvent.getLogSequenceNumber(), store);
4672 if (storeSeqId > this.maxFlushedSeqId) {
4673 this.maxFlushedSeqId = storeSeqId;
4674 }
4675 }
4676
4677
4678
4679 dropPrepareFlushIfPossible();
4680
4681
4682 mvcc.await();
4683
4684
4685
4686 this.setReadsEnabled(true);
4687
4688
4689
4690 synchronized (this) {
4691 notifyAll();
4692 }
4693 }
4694 logRegionFiles();
4695 } finally {
4696 closeRegionOperation(Operation.REPLAY_EVENT);
4697 }
4698 }
4699
4700 void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
4701 checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
4702 "BulkLoad marker from WAL ", bulkLoadEvent);
4703
4704 if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4705 return;
4706 }
4707
4708 if (LOG.isDebugEnabled()) {
4709 LOG.debug(getRegionInfo().getEncodedName() + " : "
4710 + "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent));
4711 }
4712
4713 boolean multipleFamilies = false;
4714 byte[] family = null;
4715 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4716 byte[] fam = storeDescriptor.getFamilyName().toByteArray();
4717 if (family == null) {
4718 family = fam;
4719 } else if (!Bytes.equals(family, fam)) {
4720 multipleFamilies = true;
4721 break;
4722 }
4723 }
4724
4725 startBulkRegionOperation(multipleFamilies);
4726 try {
4727
4728 synchronized (writestate) {
4729
4730
4731
4732
4733
4734
4735 if (bulkLoadEvent.getBulkloadSeqNum() >= 0
4736 && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) {
4737 LOG.warn(getRegionInfo().getEncodedName() + " : "
4738 + "Skipping replaying bulkload event :"
4739 + TextFormat.shortDebugString(bulkLoadEvent)
4740 + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId"
4741 + " =" + lastReplayedOpenRegionSeqId);
4742
4743 return;
4744 }
4745
4746 for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
4747
4748 family = storeDescriptor.getFamilyName().toByteArray();
4749 Store store = getStore(family);
4750 if (store == null) {
4751 LOG.warn(getRegionInfo().getEncodedName() + " : "
4752 + "Received a bulk load marker from primary, but the family is not found. "
4753 + "Ignoring. StoreDescriptor:" + storeDescriptor);
4754 continue;
4755 }
4756
4757 List<String> storeFiles = storeDescriptor.getStoreFileList();
4758 for (String storeFile : storeFiles) {
4759 StoreFileInfo storeFileInfo = null;
4760 try {
4761 storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile);
4762 store.bulkLoadHFile(storeFileInfo);
4763 } catch(FileNotFoundException ex) {
4764 LOG.warn(getRegionInfo().getEncodedName() + " : "
4765 + ((storeFileInfo != null) ? storeFileInfo.toString() :
4766 (new Path(Bytes.toString(family), storeFile)).toString())
4767 + " doesn't exist any more. Skip loading the file");
4768 }
4769 }
4770 }
4771 }
4772 if (bulkLoadEvent.getBulkloadSeqNum() > 0) {
4773 mvcc.advanceTo(bulkLoadEvent.getBulkloadSeqNum());
4774 }
4775 } finally {
4776 closeBulkRegionOperation();
4777 }
4778 }
4779
4780
4781
4782
4783 private void dropPrepareFlushIfPossible() {
4784 if (writestate.flushing) {
4785 boolean canDrop = true;
4786 if (prepareFlushResult.storeFlushCtxs != null) {
4787 for (Entry<byte[], StoreFlushContext> entry
4788 : prepareFlushResult.storeFlushCtxs.entrySet()) {
4789 Store store = getStore(entry.getKey());
4790 if (store == null) {
4791 continue;
4792 }
4793 if (store.getSnapshotSize() > 0) {
4794 canDrop = false;
4795 break;
4796 }
4797 }
4798 }
4799
4800
4801
4802 if (canDrop) {
4803 writestate.flushing = false;
4804 this.prepareFlushResult = null;
4805 }
4806 }
4807 }
4808
4809 @Override
4810 public boolean refreshStoreFiles() throws IOException {
4811 return refreshStoreFiles(false);
4812 }
4813
4814 protected boolean refreshStoreFiles(boolean force) throws IOException {
4815 if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
4816 return false;
4817 }
4818
4819 if (LOG.isDebugEnabled()) {
4820 LOG.debug(getRegionInfo().getEncodedName() + " : "
4821 + "Refreshing store files to see whether we can free up memstore");
4822 }
4823
4824 long totalFreedSize = 0;
4825
4826 long smallestSeqIdInStores = Long.MAX_VALUE;
4827
4828 startRegionOperation();
4829 try {
4830 synchronized (writestate) {
4831 for (Store store : getStores()) {
4832
4833
4834 long maxSeqIdBefore = store.getMaxSequenceId();
4835
4836
4837 store.refreshStoreFiles();
4838
4839 long storeSeqId = store.getMaxSequenceId();
4840 if (storeSeqId < smallestSeqIdInStores) {
4841 smallestSeqIdInStores = storeSeqId;
4842 }
4843
4844
4845 if (storeSeqId > maxSeqIdBefore) {
4846
4847 if (writestate.flushing) {
4848
4849 if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
4850 StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ?
4851 null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName());
4852 if (ctx != null) {
4853 long snapshotSize = store.getFlushableSize();
4854 ctx.abort();
4855 this.addAndGetGlobalMemstoreSize(-snapshotSize);
4856 this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName());
4857 totalFreedSize += snapshotSize;
4858 }
4859 }
4860 }
4861
4862
4863 totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store);
4864 }
4865 }
4866
4867
4868
4869 dropPrepareFlushIfPossible();
4870
4871
4872
4873 for (Store s : getStores()) {
4874 mvcc.advanceTo(s.getMaxMemstoreTS());
4875 }
4876
4877
4878
4879
4880
4881
4882 if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) {
4883 this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores;
4884 }
4885 }
4886
4887
4888 synchronized (this) {
4889 notifyAll();
4890 }
4891 return totalFreedSize > 0;
4892 } finally {
4893 closeRegionOperation();
4894 }
4895 }
4896
4897 private void logRegionFiles() {
4898 if (LOG.isTraceEnabled()) {
4899 LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
4900 for (Store s : stores.values()) {
4901 for (StoreFile sf : s.getStorefiles()) {
4902 LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
4903 }
4904 }
4905 }
4906 }
4907
4908
4909
4910
4911 private void checkTargetRegion(byte[] encodedRegionName, String exceptionMsg, Object payload)
4912 throws WrongRegionException {
4913 if (Bytes.equals(this.getRegionInfo().getEncodedNameAsBytes(), encodedRegionName)) {
4914 return;
4915 }
4916
4917 if (!RegionReplicaUtil.isDefaultReplica(this.getRegionInfo()) &&
4918 Bytes.equals(encodedRegionName,
4919 this.fs.getRegionInfoForFS().getEncodedNameAsBytes())) {
4920 return;
4921 }
4922
4923 throw new WrongRegionException(exceptionMsg + payload
4924 + " targetted for region " + Bytes.toStringBinary(encodedRegionName)
4925 + " does not match this region: " + this.getRegionInfo());
4926 }
4927
4928
4929
4930
4931
4932
4933
4934 protected boolean restoreEdit(final Store s, final Cell cell) {
4935 long kvSize = s.add(cell);
4936 if (this.rsAccounting != null) {
4937 rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize);
4938 }
4939 return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize));
4940 }
4941
4942
4943
4944
4945
4946
4947
4948 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
4949 throws IOException {
4950 FileStatus stat = fs.getFileStatus(p);
4951 if (stat.getLen() > 0) return false;
4952 LOG.warn("File " + p + " is zero-length, deleting.");
4953 fs.delete(p, false);
4954 return true;
4955 }
4956
4957 protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
4958 if (family.isMobEnabled()) {
4959 if (HFile.getFormatVersion(this.conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
4960 throw new IOException("A minimum HFile version of "
4961 + HFile.MIN_FORMAT_VERSION_WITH_TAGS
4962 + " is required for MOB feature. Consider setting " + HFile.FORMAT_VERSION_KEY
4963 + " accordingly.");
4964 }
4965 return new HMobStore(this, family, this.conf);
4966 }
4967 return new HStore(this, family, this.conf);
4968 }
4969
4970 @Override
4971 public Store getStore(final byte[] column) {
4972 return this.stores.get(column);
4973 }
4974
4975
4976
4977
4978
4979 private Store getStore(Cell cell) {
4980 for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
4981 if (Bytes.equals(
4982 cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
4983 famStore.getKey(), 0, famStore.getKey().length)) {
4984 return famStore.getValue();
4985 }
4986 }
4987
4988 return null;
4989 }
4990
4991 @Override
4992 public List<Store> getStores() {
4993 List<Store> list = new ArrayList<Store>(stores.size());
4994 list.addAll(stores.values());
4995 return list;
4996 }
4997
4998 @Override
4999 public List<String> getStoreFileList(final byte [][] columns)
5000 throws IllegalArgumentException {
5001 List<String> storeFileNames = new ArrayList<String>();
5002 synchronized(closeLock) {
5003 for(byte[] column : columns) {
5004 Store store = this.stores.get(column);
5005 if (store == null) {
5006 throw new IllegalArgumentException("No column family : " +
5007 new String(column) + " available");
5008 }
5009 for (StoreFile storeFile: store.getStorefiles()) {
5010 storeFileNames.add(storeFile.getPath().toString());
5011 }
5012
5013 logRegionFiles();
5014 }
5015 }
5016 return storeFileNames;
5017 }
5018
5019
5020
5021
5022
5023
5024 void checkRow(final byte [] row, String op) throws IOException {
5025 if (!rowIsInRange(getRegionInfo(), row)) {
5026 throw new WrongRegionException("Requested row out of range for " +
5027 op + " on HRegion " + this + ", startKey='" +
5028 Bytes.toStringBinary(getRegionInfo().getStartKey()) + "', getEndKey()='" +
5029 Bytes.toStringBinary(getRegionInfo().getEndKey()) + "', row='" +
5030 Bytes.toStringBinary(row) + "'");
5031 }
5032 }
5033
5034
5035
5036
5037
5038
5039
5040
5041 public RowLock getRowLock(byte[] row) throws IOException {
5042 return getRowLock(row, false);
5043 }
5044
5045
5046
5047
5048
5049
5050
5051
5052
5053
5054
5055 @Override
5056 public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
5057
5058 checkRow(row, "row lock");
5059
5060 HashedBytes rowKey = new HashedBytes(row);
5061
5062 RowLockContext rowLockContext = null;
5063 RowLockImpl result = null;
5064 TraceScope traceScope = null;
5065
5066
5067 if (Trace.isTracing()) {
5068 traceScope = Trace.startSpan("HRegion.getRowLock");
5069 traceScope.getSpan().addTimelineAnnotation("Getting a " + (readLock?"readLock":"writeLock"));
5070 }
5071
5072 try {
5073
5074
5075 while (result == null) {
5076
5077
5078
5079 rowLockContext = new RowLockContext(rowKey);
5080 RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
5081
5082
5083 if (existingContext != null) {
5084 rowLockContext = existingContext;
5085 }
5086
5087
5088
5089
5090 if (readLock) {
5091 result = rowLockContext.newReadLock();
5092 } else {
5093 result = rowLockContext.newWriteLock();
5094 }
5095 }
5096 if (!result.getLock().tryLock(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
5097 if (traceScope != null) {
5098 traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
5099 }
5100 result = null;
5101
5102 rowLockContext.cleanUp();
5103 throw new IOException("Timed out waiting for lock for row: " + rowKey);
5104 }
5105 return result;
5106 } catch (InterruptedException ie) {
5107 LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
5108 InterruptedIOException iie = new InterruptedIOException();
5109 iie.initCause(ie);
5110 if (traceScope != null) {
5111 traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
5112 }
5113 Thread.currentThread().interrupt();
5114 throw iie;
5115 } finally {
5116 if (traceScope != null) {
5117 traceScope.close();
5118 }
5119 }
5120 }
5121
5122 @Override
5123 public void releaseRowLocks(List<RowLock> rowLocks) {
5124 if (rowLocks != null) {
5125 for (RowLock rowLock : rowLocks) {
5126 rowLock.release();
5127 }
5128 rowLocks.clear();
5129 }
5130 }
5131
5132 @VisibleForTesting
5133 class RowLockContext {
5134 private final HashedBytes row;
5135 final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
5136 final AtomicBoolean usable = new AtomicBoolean(true);
5137 final AtomicInteger count = new AtomicInteger(0);
5138 final Object lock = new Object();
5139
5140 RowLockContext(HashedBytes row) {
5141 this.row = row;
5142 }
5143
5144 RowLockImpl newWriteLock() {
5145 Lock l = readWriteLock.writeLock();
5146 return getRowLock(l);
5147 }
5148 RowLockImpl newReadLock() {
5149 Lock l = readWriteLock.readLock();
5150 return getRowLock(l);
5151 }
5152
5153 private RowLockImpl getRowLock(Lock l) {
5154 count.incrementAndGet();
5155 synchronized (lock) {
5156 if (usable.get()) {
5157 return new RowLockImpl(this, l);
5158 } else {
5159 return null;
5160 }
5161 }
5162 }
5163
5164 void cleanUp() {
5165 long c = count.decrementAndGet();
5166 if (c <= 0) {
5167 synchronized (lock) {
5168 if (count.get() <= 0 ){
5169 usable.set(false);
5170 RowLockContext removed = lockedRows.remove(row);
5171 assert removed == this: "we should never remove a different context";
5172 }
5173 }
5174 }
5175 }
5176
5177 @Override
5178 public String toString() {
5179 return "RowLockContext{" +
5180 "row=" + row +
5181 ", readWriteLock=" + readWriteLock +
5182 ", count=" + count +
5183 '}';
5184 }
5185 }
5186
5187
5188
5189
5190 public static class RowLockImpl implements RowLock {
5191 private final RowLockContext context;
5192 private final Lock lock;
5193
5194 public RowLockImpl(RowLockContext context, Lock lock) {
5195 this.context = context;
5196 this.lock = lock;
5197 }
5198
5199 public Lock getLock() {
5200 return lock;
5201 }
5202
5203 @VisibleForTesting
5204 public RowLockContext getContext() {
5205 return context;
5206 }
5207
5208 @Override
5209 public void release() {
5210 lock.unlock();
5211 context.cleanUp();
5212 }
5213
5214 @Override
5215 public String toString() {
5216 return "RowLockImpl{" +
5217 "context=" + context +
5218 ", lock=" + lock +
5219 '}';
5220 }
5221 }
5222
5223
5224
5225
5226
5227
5228
5229 private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {
5230 boolean multipleFamilies = false;
5231 byte[] family = null;
5232 for (Pair<byte[], String> pair : familyPaths) {
5233 byte[] fam = pair.getFirst();
5234 if (family == null) {
5235 family = fam;
5236 } else if (!Bytes.equals(family, fam)) {
5237 multipleFamilies = true;
5238 break;
5239 }
5240 }
5241 return multipleFamilies;
5242 }
5243
5244 @Override
5245 public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
5246 BulkLoadListener bulkLoadListener) throws IOException {
5247 long seqId = -1;
5248 Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
5249 Preconditions.checkNotNull(familyPaths);
5250
5251 startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
5252 try {
5253 this.writeRequestsCount.increment();
5254
5255
5256
5257
5258 List<IOException> ioes = new ArrayList<IOException>();
5259 List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
5260 for (Pair<byte[], String> p : familyPaths) {
5261 byte[] familyName = p.getFirst();
5262 String path = p.getSecond();
5263
5264 Store store = getStore(familyName);
5265 if (store == null) {
5266 IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
5267 "No such column family " + Bytes.toStringBinary(familyName));
5268 ioes.add(ioe);
5269 } else {
5270 try {
5271 store.assertBulkLoadHFileOk(new Path(path));
5272 } catch (WrongRegionException wre) {
5273
5274 failures.add(p);
5275 } catch (IOException ioe) {
5276
5277 ioes.add(ioe);
5278 }
5279 }
5280 }
5281
5282
5283 if (ioes.size() != 0) {
5284 IOException e = MultipleIOException.createIOException(ioes);
5285 LOG.error("There were one or more IO errors when checking if the bulk load is ok.", e);
5286 throw e;
5287 }
5288
5289
5290 if (failures.size() != 0) {
5291 StringBuilder list = new StringBuilder();
5292 for (Pair<byte[], String> p : failures) {
5293 list.append("\n").append(Bytes.toString(p.getFirst())).append(" : ")
5294 .append(p.getSecond());
5295 }
5296
5297 LOG.warn("There was a recoverable bulk load failure likely due to a" +
5298 " split. These (family, HFile) pairs were not loaded: " + list);
5299 return false;
5300 }
5301
5302
5303
5304
5305
5306
5307 if (assignSeqId) {
5308 FlushResult fs = flushcache(true, false);
5309 if (fs.isFlushSucceeded()) {
5310 seqId = ((FlushResultImpl)fs).flushSequenceId;
5311 } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
5312 seqId = ((FlushResultImpl)fs).flushSequenceId;
5313 } else {
5314 throw new IOException("Could not bulk load with an assigned sequential ID because the "+
5315 "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
5316 }
5317 }
5318
5319 for (Pair<byte[], String> p : familyPaths) {
5320 byte[] familyName = p.getFirst();
5321 String path = p.getSecond();
5322 Store store = getStore(familyName);
5323 try {
5324 String finalPath = path;
5325 if (bulkLoadListener != null) {
5326 finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
5327 }
5328 Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
5329
5330 if(storeFiles.containsKey(familyName)) {
5331 storeFiles.get(familyName).add(commitedStoreFile);
5332 } else {
5333 List<Path> storeFileNames = new ArrayList<Path>();
5334 storeFileNames.add(commitedStoreFile);
5335 storeFiles.put(familyName, storeFileNames);
5336 }
5337 if (bulkLoadListener != null) {
5338 bulkLoadListener.doneBulkLoad(familyName, path);
5339 }
5340 } catch (IOException ioe) {
5341
5342
5343
5344
5345 LOG.error("There was a partial failure due to IO when attempting to" +
5346 " load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
5347 if (bulkLoadListener != null) {
5348 try {
5349 bulkLoadListener.failedBulkLoad(familyName, path);
5350 } catch (Exception ex) {
5351 LOG.error("Error while calling failedBulkLoad for family " +
5352 Bytes.toString(familyName) + " with path " + path, ex);
5353 }
5354 }
5355 throw ioe;
5356 }
5357 }
5358
5359 return true;
5360 } finally {
5361 if (wal != null && !storeFiles.isEmpty()) {
5362
5363 try {
5364 WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
5365 this.getRegionInfo().getTable(),
5366 ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
5367 WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
5368 loadDescriptor, mvcc);
5369 } catch (IOException ioe) {
5370 if (this.rsServices != null) {
5371
5372
5373 this.rsServices.abort("Failed to write bulk load event into WAL.", ioe);
5374 }
5375 }
5376 }
5377
5378 closeBulkRegionOperation();
5379 }
5380 }
5381
5382 @Override
5383 public boolean equals(Object o) {
5384 return o instanceof HRegion && Bytes.equals(getRegionInfo().getRegionName(),
5385 ((HRegion) o).getRegionInfo().getRegionName());
5386 }
5387
5388 @Override
5389 public int hashCode() {
5390 return Bytes.hashCode(getRegionInfo().getRegionName());
5391 }
5392
5393 @Override
5394 public String toString() {
5395 return getRegionInfo().getRegionNameAsString();
5396 }
5397
5398
5399
5400
5401 class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {
5402
5403 KeyValueHeap storeHeap = null;
5404
5405
5406 KeyValueHeap joinedHeap = null;
5407
5408
5409
5410 protected Cell joinedContinuationRow = null;
5411 private boolean filterClosed = false;
5412
5413 protected final int isScan;
5414 protected final byte[] stopRow;
5415 protected final HRegion region;
5416 protected final CellComparator comparator;
5417 protected boolean copyCellsFromSharedMem = false;
5418
5419 private final long readPt;
5420 private final long maxResultSize;
5421 private final ScannerContext defaultScannerContext;
5422 private final FilterWrapper filter;
5423
5424 @Override
5425 public HRegionInfo getRegionInfo() {
5426 return region.getRegionInfo();
5427 }
5428
5429 public void setCopyCellsFromSharedMem(boolean copyCells) {
5430 this.copyCellsFromSharedMem = copyCells;
5431 }
5432
5433 RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region,
5434 boolean copyCellsFromSharedMem)
5435 throws IOException {
5436 this.region = region;
5437 this.maxResultSize = scan.getMaxResultSize();
5438 if (scan.hasFilter()) {
5439 this.filter = new FilterWrapper(scan.getFilter());
5440 } else {
5441 this.filter = null;
5442 }
5443 this.comparator = region.getCellCompartor();
5444
5445
5446
5447
5448
5449 defaultScannerContext = ScannerContext.newBuilder()
5450 .setBatchLimit(scan.getBatch()).build();
5451
5452 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) {
5453 this.stopRow = null;
5454 } else {
5455 this.stopRow = scan.getStopRow();
5456 }
5457
5458
5459 this.isScan = scan.isGetScan() ? 1 : 0;
5460
5461
5462
5463 IsolationLevel isolationLevel = scan.getIsolationLevel();
5464 synchronized(scannerReadPoints) {
5465 this.readPt = getReadpoint(isolationLevel);
5466 scannerReadPoints.put(this, this.readPt);
5467 }
5468
5469
5470
5471 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
5472 List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>();
5473 if (additionalScanners != null) {
5474 scanners.addAll(additionalScanners);
5475 }
5476
5477 for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
5478 Store store = stores.get(entry.getKey());
5479 KeyValueScanner scanner;
5480 try {
5481 scanner = store.getScanner(scan, entry.getValue(), this.readPt);
5482 } catch (FileNotFoundException e) {
5483 throw handleFileNotFound(e);
5484 }
5485 if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
5486 || this.filter.isFamilyEssential(entry.getKey())) {
5487 scanners.add(scanner);
5488 } else {
5489 joinedScanners.add(scanner);
5490 }
5491 }
5492 this.copyCellsFromSharedMem = copyCellsFromSharedMem;
5493 initializeKVHeap(scanners, joinedScanners, region);
5494 }
5495
5496 protected void initializeKVHeap(List<KeyValueScanner> scanners,
5497 List<KeyValueScanner> joinedScanners, HRegion region)
5498 throws IOException {
5499 this.storeHeap = new KeyValueHeap(scanners, comparator);
5500 if (!joinedScanners.isEmpty()) {
5501 this.joinedHeap = new KeyValueHeap(joinedScanners, comparator);
5502 }
5503 }
5504
5505 @Override
5506 public long getMaxResultSize() {
5507 return maxResultSize;
5508 }
5509
5510 @Override
5511 public long getMvccReadPoint() {
5512 return this.readPt;
5513 }
5514
5515 @Override
5516 public int getBatch() {
5517 return this.defaultScannerContext.getBatchLimit();
5518 }
5519
5520
5521
5522
5523
5524
5525 protected void resetFilters() throws IOException {
5526 if (filter != null) {
5527 filter.reset();
5528 }
5529 }
5530
5531 @Override
5532 public boolean next(List<Cell> outResults)
5533 throws IOException {
5534
5535 return next(outResults, defaultScannerContext);
5536 }
5537
5538 @Override
5539 public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext)
5540 throws IOException {
5541 if (this.filterClosed) {
5542 throw new UnknownScannerException("Scanner was closed (timed out?) " +
5543 "after we renewed it. Could be caused by a very slow scanner " +
5544 "or a lengthy garbage collection");
5545 }
5546 startRegionOperation(Operation.SCAN);
5547 readRequestsCount.increment();
5548 try {
5549 return nextRaw(outResults, scannerContext);
5550 } finally {
5551 closeRegionOperation(Operation.SCAN);
5552 }
5553 }
5554
5555 @Override
5556 public boolean nextRaw(List<Cell> outResults) throws IOException {
5557
5558 return nextRaw(outResults, defaultScannerContext);
5559 }
5560
5561 @Override
5562 public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
5563 throws IOException {
5564 if (storeHeap == null) {
5565
5566 throw new UnknownScannerException("Scanner was closed");
5567 }
5568 boolean moreValues = false;
5569 try {
5570 if (outResults.isEmpty()) {
5571
5572
5573 moreValues = nextInternal(outResults, scannerContext);
5574 } else {
5575 List<Cell> tmpList = new ArrayList<Cell>();
5576 moreValues = nextInternal(tmpList, scannerContext);
5577 outResults.addAll(tmpList);
5578 }
5579
5580
5581
5582
5583
5584
5585 if (!scannerContext.partialResultFormed()) resetFilters();
5586
5587 if (isFilterDoneInternal()) {
5588 moreValues = false;
5589 }
5590
5591
5592
5593 if (copyCellsFromSharedMem && !outResults.isEmpty()) {
5594
5595 ListIterator<Cell> listItr = outResults.listIterator();
5596 Cell cell = null;
5597 while (listItr.hasNext()) {
5598 cell = listItr.next();
5599 if (cell instanceof ShareableMemory) {
5600 listItr.set(((ShareableMemory) cell).cloneToCell());
5601 }
5602 }
5603 }
5604 } finally {
5605 if (copyCellsFromSharedMem) {
5606
5607
5608 this.shipped();
5609 }
5610 }
5611 return moreValues;
5612 }
5613
5614
5615
5616
5617 private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext)
5618 throws IOException {
5619 assert joinedContinuationRow != null;
5620 boolean moreValues = populateResult(results, this.joinedHeap, scannerContext,
5621 joinedContinuationRow);
5622
5623 if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5624
5625 joinedContinuationRow = null;
5626 }
5627
5628
5629 Collections.sort(results, comparator);
5630 return moreValues;
5631 }
5632
5633
5634
5635
5636
5637
5638
5639
5640
5641 private boolean populateResult(List<Cell> results, KeyValueHeap heap,
5642 ScannerContext scannerContext, Cell currentRowCell) throws IOException {
5643 Cell nextKv;
5644 boolean moreCellsInRow = false;
5645 boolean tmpKeepProgress = scannerContext.getKeepProgress();
5646
5647 LimitScope limitScope = LimitScope.BETWEEN_CELLS;
5648 try {
5649 do {
5650
5651
5652
5653 scannerContext.setKeepProgress(true);
5654 heap.next(results, scannerContext);
5655 scannerContext.setKeepProgress(tmpKeepProgress);
5656
5657 nextKv = heap.peek();
5658 moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
5659 if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
5660 if (scannerContext.checkBatchLimit(limitScope)) {
5661 return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
5662 } else if (scannerContext.checkSizeLimit(limitScope)) {
5663 ScannerContext.NextState state =
5664 moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
5665 return scannerContext.setScannerState(state).hasMoreValues();
5666 } else if (scannerContext.checkTimeLimit(limitScope)) {
5667 ScannerContext.NextState state =
5668 moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
5669 return scannerContext.setScannerState(state).hasMoreValues();
5670 }
5671 } while (moreCellsInRow);
5672 } catch (FileNotFoundException e) {
5673 throw handleFileNotFound(e);
5674 }
5675 return nextKv != null;
5676 }
5677
5678
5679
5680
5681
5682
5683
5684
5685
5686 private boolean moreCellsInRow(final Cell nextKv, Cell currentRowCell) {
5687 return nextKv != null && CellUtil.matchingRow(nextKv, currentRowCell);
5688 }
5689
5690
5691
5692
5693 @Override
5694 public synchronized boolean isFilterDone() throws IOException {
5695 return isFilterDoneInternal();
5696 }
5697
5698 private boolean isFilterDoneInternal() throws IOException {
5699 return this.filter != null && this.filter.filterAllRemaining();
5700 }
5701
5702 private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
5703 throws IOException {
5704 if (!results.isEmpty()) {
5705 throw new IllegalArgumentException("First parameter should be an empty list");
5706 }
5707 if (scannerContext == null) {
5708 throw new IllegalArgumentException("Scanner context cannot be null");
5709 }
5710 RpcCallContext rpcCall = RpcServer.getCurrentCall();
5711
5712
5713
5714
5715 int initialBatchProgress = scannerContext.getBatchProgress();
5716 long initialSizeProgress = scannerContext.getSizeProgress();
5717 long initialTimeProgress = scannerContext.getTimeProgress();
5718
5719
5720
5721
5722
5723
5724 while (true) {
5725
5726
5727 if (scannerContext.getKeepProgress()) {
5728
5729 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
5730 initialTimeProgress);
5731 } else {
5732 scannerContext.clearProgress();
5733 }
5734
5735 if (rpcCall != null) {
5736
5737
5738
5739
5740 long afterTime = rpcCall.disconnectSince();
5741 if (afterTime >= 0) {
5742 throw new CallerDisconnectedException(
5743 "Aborting on region " + getRegionInfo().getRegionNameAsString() + ", call " +
5744 this + " after " + afterTime + " ms, since " +
5745 "caller disconnected");
5746 }
5747 }
5748
5749
5750 Cell current = this.storeHeap.peek();
5751
5752 boolean stopRow = isStopRow(current);
5753
5754
5755
5756
5757 boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();
5758
5759
5760
5761
5762
5763 if (hasFilterRow) {
5764 if (LOG.isTraceEnabled()) {
5765 LOG.trace("filter#hasFilterRow is true which prevents partial results from being "
5766 + " formed. Changing scope of limits that may create partials");
5767 }
5768 scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
5769 scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
5770 }
5771
5772
5773
5774 if (joinedContinuationRow == null) {
5775
5776 if (stopRow) {
5777 if (hasFilterRow) {
5778 filter.filterRowCells(results);
5779 }
5780 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5781 }
5782
5783
5784
5785 if (filterRowKey(current)) {
5786 incrementCountOfRowsFilteredMetric(scannerContext);
5787
5788
5789
5790 incrementCountOfRowsScannedMetric(scannerContext);
5791 boolean moreRows = nextRow(scannerContext, current);
5792 if (!moreRows) {
5793 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5794 }
5795 results.clear();
5796 continue;
5797 }
5798
5799
5800 populateResult(results, this.storeHeap, scannerContext, current);
5801
5802 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5803 if (hasFilterRow) {
5804 throw new IncompatibleFilterException(
5805 "Filter whose hasFilterRow() returns true is incompatible with scans that must "
5806 + " stop mid-row because of a limit. ScannerContext:" + scannerContext);
5807 }
5808 return true;
5809 }
5810
5811 Cell nextKv = this.storeHeap.peek();
5812 stopRow = nextKv == null || isStopRow(nextKv);
5813
5814 final boolean isEmptyRow = results.isEmpty();
5815
5816
5817
5818 FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
5819 if (hasFilterRow) {
5820 ret = filter.filterRowCellsWithRet(results);
5821
5822
5823
5824
5825 long timeProgress = scannerContext.getTimeProgress();
5826 if (scannerContext.getKeepProgress()) {
5827 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
5828 initialTimeProgress);
5829 } else {
5830 scannerContext.clearProgress();
5831 }
5832 scannerContext.setTimeProgress(timeProgress);
5833 scannerContext.incrementBatchProgress(results.size());
5834 for (Cell cell : results) {
5835 scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOf(cell));
5836 }
5837 }
5838
5839 if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {
5840 incrementCountOfRowsFilteredMetric(scannerContext);
5841 results.clear();
5842 boolean moreRows = nextRow(scannerContext, current);
5843 if (!moreRows) {
5844 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5845 }
5846
5847
5848
5849 if (!stopRow) continue;
5850 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5851 }
5852
5853
5854
5855
5856
5857 if (this.joinedHeap != null) {
5858 boolean mayHaveData = joinedHeapMayHaveData(current);
5859 if (mayHaveData) {
5860 joinedContinuationRow = current;
5861 populateFromJoinedHeap(results, scannerContext);
5862
5863 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5864 return true;
5865 }
5866 }
5867 }
5868 } else {
5869
5870 populateFromJoinedHeap(results, scannerContext);
5871 if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {
5872 return true;
5873 }
5874 }
5875
5876
5877 if (joinedContinuationRow != null) {
5878 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5879 }
5880
5881
5882
5883
5884 if (results.isEmpty()) {
5885 incrementCountOfRowsFilteredMetric(scannerContext);
5886 boolean moreRows = nextRow(scannerContext, current);
5887 if (!moreRows) {
5888 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5889 }
5890 if (!stopRow) continue;
5891 }
5892
5893 if (stopRow) {
5894 return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
5895 } else {
5896 return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
5897 }
5898 }
5899 }
5900
5901 protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
5902 if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
5903
5904 scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
5905 }
5906
5907 protected void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) {
5908 if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
5909
5910 scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
5911 }
5912
5913
5914
5915
5916
5917
5918 private boolean joinedHeapMayHaveData(Cell currentRowCell)
5919 throws IOException {
5920 Cell nextJoinedKv = joinedHeap.peek();
5921 boolean matchCurrentRow =
5922 nextJoinedKv != null && CellUtil.matchingRow(nextJoinedKv, currentRowCell);
5923 boolean matchAfterSeek = false;
5924
5925
5926
5927 if (!matchCurrentRow) {
5928 Cell firstOnCurrentRow = CellUtil.createFirstOnRow(currentRowCell);
5929 boolean seekSuccessful = this.joinedHeap.requestSeek(firstOnCurrentRow, true, true);
5930 matchAfterSeek =
5931 seekSuccessful && joinedHeap.peek() != null
5932 && CellUtil.matchingRow(joinedHeap.peek(), currentRowCell);
5933 }
5934
5935 return matchCurrentRow || matchAfterSeek;
5936 }
5937
5938
5939
5940
5941
5942
5943
5944
5945 private boolean filterRow() throws IOException {
5946
5947
5948 return filter != null && (!filter.hasFilterRow())
5949 && filter.filterRow();
5950 }
5951
5952 private boolean filterRowKey(Cell current) throws IOException {
5953 return filter != null && filter.filterRowKey(current);
5954 }
5955
5956 protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
5957 assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";
5958 Cell next;
5959 while ((next = this.storeHeap.peek()) != null &&
5960 CellUtil.matchingRow(next, curRowCell)) {
5961 this.storeHeap.next(MOCKED_LIST);
5962 }
5963 resetFilters();
5964
5965
5966 return this.region.getCoprocessorHost() == null
5967 || this.region.getCoprocessorHost()
5968 .postScannerFilterRow(this, curRowCell);
5969 }
5970
5971 protected boolean isStopRow(Cell currentRowCell) {
5972 return currentRowCell == null
5973 || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan);
5974 }
5975
5976 @Override
5977 public synchronized void close() {
5978 if (storeHeap != null) {
5979 storeHeap.close();
5980 storeHeap = null;
5981 }
5982 if (joinedHeap != null) {
5983 joinedHeap.close();
5984 joinedHeap = null;
5985 }
5986
5987 scannerReadPoints.remove(this);
5988 this.filterClosed = true;
5989 }
5990
5991 KeyValueHeap getStoreHeapForTesting() {
5992 return storeHeap;
5993 }
5994
5995 @Override
5996 public synchronized boolean reseek(byte[] row) throws IOException {
5997 if (row == null) {
5998 throw new IllegalArgumentException("Row cannot be null.");
5999 }
6000 boolean result = false;
6001 startRegionOperation();
6002 KeyValue kv = KeyValueUtil.createFirstOnRow(row);
6003 try {
6004
6005 result = this.storeHeap.requestSeek(kv, true, true);
6006 if (this.joinedHeap != null) {
6007 result = this.joinedHeap.requestSeek(kv, true, true) || result;
6008 }
6009 } catch (FileNotFoundException e) {
6010 throw handleFileNotFound(e);
6011 } finally {
6012 closeRegionOperation();
6013 }
6014 return result;
6015 }
6016
6017 private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
6018
6019
6020 try {
6021 region.refreshStoreFiles(true);
6022 return new IOException("unable to read store file");
6023 } catch (IOException e) {
6024 String msg = "a store file got lost: " + fnfe.getMessage();
6025 LOG.error("unable to refresh store files", e);
6026 abortRegionServer(msg);
6027 return new NotServingRegionException(
6028 getRegionInfo().getRegionNameAsString() + " is closing");
6029 }
6030 }
6031
6032 private void abortRegionServer(String msg) throws IOException {
6033 if (rsServices instanceof HRegionServer) {
6034 ((HRegionServer)rsServices).abort(msg);
6035 }
6036 throw new UnsupportedOperationException("not able to abort RS after: " + msg);
6037 }
6038
6039 @Override
6040 public void shipped() throws IOException {
6041 if (storeHeap != null) {
6042 storeHeap.shipped();
6043 }
6044 if (joinedHeap != null) {
6045 joinedHeap.shipped();
6046 }
6047 }
6048
6049 @Override
6050 public void run() throws IOException {
6051
6052
6053 this.close();
6054 }
6055 }
6056
6057
6058
6059
6060
6061
6062
6063
6064
6065
6066
6067
6068
6069
6070
6071
6072
6073
6074
6075
6076 static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs,
6077 Configuration conf, HRegionInfo regionInfo, final HTableDescriptor htd,
6078 RegionServerServices rsServices) {
6079 try {
6080 @SuppressWarnings("unchecked")
6081 Class<? extends HRegion> regionClass =
6082 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
6083
6084 Constructor<? extends HRegion> c =
6085 regionClass.getConstructor(Path.class, WAL.class, FileSystem.class,
6086 Configuration.class, HRegionInfo.class, HTableDescriptor.class,
6087 RegionServerServices.class);
6088
6089 return c.newInstance(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
6090 } catch (Throwable e) {
6091
6092 throw new IllegalStateException("Could not instantiate a region instance.", e);
6093 }
6094 }
6095
6096
6097
6098
6099
6100
6101
6102
6103
6104
6105
6106 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6107 final Configuration conf, final HTableDescriptor hTableDescriptor,
6108 final WAL wal, final boolean initialize)
6109 throws IOException {
6110 LOG.info("creating HRegion " + info.getTable().getNameAsString()
6111 + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
6112 " Table name == " + info.getTable().getNameAsString());
6113 FileSystem fs = FileSystem.get(conf);
6114 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6115 HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
6116 HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null);
6117 if (initialize) region.initialize(null);
6118 return region;
6119 }
6120
6121 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
6122 final Configuration conf,
6123 final HTableDescriptor hTableDescriptor,
6124 final WAL wal)
6125 throws IOException {
6126 return createHRegion(info, rootDir, conf, hTableDescriptor, wal, true);
6127 }
6128
6129
6130
6131
6132
6133
6134
6135
6136
6137
6138
6139
6140
6141 public static HRegion openHRegion(final HRegionInfo info,
6142 final HTableDescriptor htd, final WAL wal,
6143 final Configuration conf)
6144 throws IOException {
6145 return openHRegion(info, htd, wal, conf, null, null);
6146 }
6147
6148
6149
6150
6151
6152
6153
6154
6155
6156
6157
6158
6159
6160
6161
6162
6163 public static HRegion openHRegion(final HRegionInfo info,
6164 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6165 final RegionServerServices rsServices,
6166 final CancelableProgressable reporter)
6167 throws IOException {
6168 return openHRegion(FSUtils.getRootDir(conf), info, htd, wal, conf, rsServices, reporter);
6169 }
6170
6171
6172
6173
6174
6175
6176
6177
6178
6179
6180
6181
6182
6183
6184 public static HRegion openHRegion(Path rootDir, final HRegionInfo info,
6185 final HTableDescriptor htd, final WAL wal, final Configuration conf)
6186 throws IOException {
6187 return openHRegion(rootDir, info, htd, wal, conf, null, null);
6188 }
6189
6190
6191
6192
6193
6194
6195
6196
6197
6198
6199
6200
6201
6202
6203
6204
6205 public static HRegion openHRegion(final Path rootDir, final HRegionInfo info,
6206 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6207 final RegionServerServices rsServices,
6208 final CancelableProgressable reporter)
6209 throws IOException {
6210 FileSystem fs = null;
6211 if (rsServices != null) {
6212 fs = rsServices.getFileSystem();
6213 }
6214 if (fs == null) {
6215 fs = FileSystem.get(conf);
6216 }
6217 return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
6218 }
6219
6220
6221
6222
6223
6224
6225
6226
6227
6228
6229
6230
6231
6232
6233
6234 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6235 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal)
6236 throws IOException {
6237 return openHRegion(conf, fs, rootDir, info, htd, wal, null, null);
6238 }
6239
6240
6241
6242
6243
6244
6245
6246
6247
6248
6249
6250
6251
6252
6253
6254
6255
6256 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6257 final Path rootDir, final HRegionInfo info, final HTableDescriptor htd, final WAL wal,
6258 final RegionServerServices rsServices, final CancelableProgressable reporter)
6259 throws IOException {
6260 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6261 return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
6262 }
6263
6264
6265
6266
6267
6268
6269
6270
6271
6272
6273
6274
6275
6276
6277
6278
6279
6280 public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
6281 final Path rootDir, final Path tableDir, final HRegionInfo info, final HTableDescriptor htd,
6282 final WAL wal, final RegionServerServices rsServices,
6283 final CancelableProgressable reporter)
6284 throws IOException {
6285 if (info == null) throw new NullPointerException("Passed region info is null");
6286 if (LOG.isDebugEnabled()) {
6287 LOG.debug("Opening region: " + info);
6288 }
6289 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6290 return r.openHRegion(reporter);
6291 }
6292
6293
6294
6295
6296
6297
6298
6299
6300
6301 public static HRegion openHRegion(final HRegion other, final CancelableProgressable reporter)
6302 throws IOException {
6303 HRegionFileSystem regionFs = other.getRegionFileSystem();
6304 HRegion r = newHRegion(regionFs.getTableDir(), other.getWAL(), regionFs.getFileSystem(),
6305 other.baseConf, other.getRegionInfo(), other.getTableDesc(), null);
6306 return r.openHRegion(reporter);
6307 }
6308
6309 public static Region openHRegion(final Region other, final CancelableProgressable reporter)
6310 throws IOException {
6311 return openHRegion((HRegion)other, reporter);
6312 }
6313
6314
6315
6316
6317
6318
6319
6320 protected HRegion openHRegion(final CancelableProgressable reporter)
6321 throws IOException {
6322
6323 checkCompressionCodecs();
6324
6325
6326 checkEncryption();
6327
6328 checkClassLoading();
6329 this.openSeqNum = initialize(reporter);
6330 this.mvcc.advanceTo(openSeqNum);
6331 if (wal != null && getRegionServerServices() != null && !writestate.readOnly
6332 && !recovering) {
6333
6334
6335
6336 writeRegionOpenMarker(wal, openSeqNum);
6337 }
6338 return this;
6339 }
6340
6341 public static void warmupHRegion(final HRegionInfo info,
6342 final HTableDescriptor htd, final WAL wal, final Configuration conf,
6343 final RegionServerServices rsServices,
6344 final CancelableProgressable reporter)
6345 throws IOException {
6346
6347 if (info == null) throw new NullPointerException("Passed region info is null");
6348
6349 if (LOG.isDebugEnabled()) {
6350 LOG.debug("HRegion.Warming up region: " + info);
6351 }
6352
6353 Path rootDir = FSUtils.getRootDir(conf);
6354 Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
6355
6356 FileSystem fs = null;
6357 if (rsServices != null) {
6358 fs = rsServices.getFileSystem();
6359 }
6360 if (fs == null) {
6361 fs = FileSystem.get(conf);
6362 }
6363
6364 HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
6365 r.initializeWarmup(reporter);
6366 r.close();
6367 }
6368
6369
6370 private void checkCompressionCodecs() throws IOException {
6371 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6372 CompressionTest.testCompression(fam.getCompressionType());
6373 CompressionTest.testCompression(fam.getCompactionCompressionType());
6374 }
6375 }
6376
6377 private void checkEncryption() throws IOException {
6378 for (HColumnDescriptor fam: this.htableDescriptor.getColumnFamilies()) {
6379 EncryptionTest.testEncryption(conf, fam.getEncryptionType(), fam.getEncryptionKey());
6380 }
6381 }
6382
6383 private void checkClassLoading() throws IOException {
6384 RegionSplitPolicy.getSplitPolicyClass(this.htableDescriptor, conf);
6385 RegionCoprocessorHost.testTableCoprocessorAttrs(conf, this.htableDescriptor);
6386 }
6387
6388
6389
6390
6391
6392
6393 HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
6394
6395 fs.commitDaughterRegion(hri);
6396
6397
6398 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
6399 this.getBaseConf(), hri, this.getTableDesc(), rsServices);
6400 r.readRequestsCount.set(this.getReadRequestsCount() / 2);
6401 r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
6402 return r;
6403 }
6404
6405
6406
6407
6408
6409
6410
6411 HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
6412 final HRegion region_b) throws IOException {
6413 HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
6414 fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
6415 this.getTableDesc(), this.rsServices);
6416 r.readRequestsCount.set(this.getReadRequestsCount()
6417 + region_b.getReadRequestsCount());
6418 r.writeRequestsCount.set(this.getWriteRequestsCount()
6419
6420 + region_b.getWriteRequestsCount());
6421 this.fs.commitMergedRegion(mergedRegionInfo);
6422 return r;
6423 }
6424
6425
6426
6427
6428
6429
6430
6431
6432
6433
6434
6435
6436 public static void addRegionToMETA(final HRegion meta, final HRegion r) throws IOException {
6437 meta.checkResources();
6438
6439 byte[] row = r.getRegionInfo().getRegionName();
6440 final long now = EnvironmentEdgeManager.currentTime();
6441 final List<Cell> cells = new ArrayList<Cell>(2);
6442 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6443 HConstants.REGIONINFO_QUALIFIER, now,
6444 r.getRegionInfo().toByteArray()));
6445
6446 cells.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
6447 HConstants.META_VERSION_QUALIFIER, now,
6448 Bytes.toBytes(HConstants.META_VERSION)));
6449 meta.put(row, HConstants.CATALOG_FAMILY, cells);
6450 }
6451
6452
6453
6454
6455
6456
6457
6458
6459
6460 @Deprecated
6461 public static Path getRegionDir(final Path tabledir, final String name) {
6462 return new Path(tabledir, name);
6463 }
6464
6465
6466
6467
6468
6469
6470
6471
6472
6473 @Deprecated
6474 @VisibleForTesting
6475 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
6476 return new Path(
6477 FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
6478 }
6479
6480
6481
6482
6483
6484
6485
6486
6487
6488 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
6489 return ((info.getStartKey().length == 0) ||
6490 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
6491 ((info.getEndKey().length == 0) ||
6492 (Bytes.compareTo(info.getEndKey(), row) > 0));
6493 }
6494
6495
6496
6497
6498
6499
6500
6501 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
6502 throws IOException {
6503 HRegion a = srcA;
6504 HRegion b = srcB;
6505
6506
6507
6508 if (srcA.getRegionInfo().getStartKey() == null) {
6509 if (srcB.getRegionInfo().getStartKey() == null) {
6510 throw new IOException("Cannot merge two regions with null start key");
6511 }
6512
6513 } else if ((srcB.getRegionInfo().getStartKey() == null) ||
6514 (Bytes.compareTo(srcA.getRegionInfo().getStartKey(),
6515 srcB.getRegionInfo().getStartKey()) > 0)) {
6516 a = srcB;
6517 b = srcA;
6518 }
6519
6520 if (!(Bytes.compareTo(a.getRegionInfo().getEndKey(),
6521 b.getRegionInfo().getStartKey()) == 0)) {
6522 throw new IOException("Cannot merge non-adjacent regions");
6523 }
6524 return merge(a, b);
6525 }
6526
6527
6528
6529
6530
6531
6532
6533
6534
6535 public static HRegion merge(final HRegion a, final HRegion b) throws IOException {
6536 if (!a.getRegionInfo().getTable().equals(b.getRegionInfo().getTable())) {
6537 throw new IOException("Regions do not belong to the same table");
6538 }
6539
6540 FileSystem fs = a.getRegionFileSystem().getFileSystem();
6541
6542 a.flush(true);
6543 b.flush(true);
6544
6545
6546 a.compact(true);
6547 if (LOG.isDebugEnabled()) {
6548 LOG.debug("Files for region: " + a);
6549 a.getRegionFileSystem().logFileSystemState(LOG);
6550 }
6551 b.compact(true);
6552 if (LOG.isDebugEnabled()) {
6553 LOG.debug("Files for region: " + b);
6554 b.getRegionFileSystem().logFileSystemState(LOG);
6555 }
6556
6557 RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true);
6558 if (!rmt.prepare(null)) {
6559 throw new IOException("Unable to merge regions " + a + " and " + b);
6560 }
6561 HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
6562 LOG.info("starting merge of regions: " + a + " and " + b
6563 + " into new region " + mergedRegionInfo.getRegionNameAsString()
6564 + " with start key <"
6565 + Bytes.toStringBinary(mergedRegionInfo.getStartKey())
6566 + "> and end key <"
6567 + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
6568 HRegion dstRegion;
6569 try {
6570 dstRegion = (HRegion)rmt.execute(null, null);
6571 } catch (IOException ioe) {
6572 rmt.rollback(null, null);
6573 throw new IOException("Failed merging region " + a + " and " + b
6574 + ", and successfully rolled back");
6575 }
6576 dstRegion.compact(true);
6577
6578 if (LOG.isDebugEnabled()) {
6579 LOG.debug("Files for new region");
6580 dstRegion.getRegionFileSystem().logFileSystemState(LOG);
6581 }
6582
6583 if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
6584 throw new IOException("Merged region " + dstRegion
6585 + " still has references after the compaction, is compaction canceled?");
6586 }
6587
6588
6589 HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
6590
6591 HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
6592
6593 LOG.info("merge completed. New region is " + dstRegion);
6594 return dstRegion;
6595 }
6596
6597 @Override
6598 public Result get(final Get get) throws IOException {
6599 prepareGet(get);
6600 List<Cell> results = get(get, true);
6601 boolean stale = this.getRegionInfo().getReplicaId() != 0;
6602 return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
6603 }
6604
6605 void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException {
6606 checkRow(get.getRow(), "Get");
6607
6608 if (get.hasFamilies()) {
6609 for (byte [] family: get.familySet()) {
6610 checkFamily(family);
6611 }
6612 } else {
6613 for (byte[] family: this.htableDescriptor.getFamiliesKeys()) {
6614 get.addFamily(family);
6615 }
6616 }
6617 }
6618
6619 @Override
6620 public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
6621
6622 List<Cell> results = new ArrayList<Cell>();
6623
6624
6625 if (withCoprocessor && (coprocessorHost != null)) {
6626 if (coprocessorHost.preGet(get, results)) {
6627 return results;
6628 }
6629 }
6630
6631 Scan scan = new Scan(get);
6632
6633 RegionScanner scanner = null;
6634 try {
6635 scanner = getScanner(scan);
6636 scanner.next(results);
6637 } finally {
6638 if (scanner != null)
6639 scanner.close();
6640 }
6641
6642
6643 if (withCoprocessor && (coprocessorHost != null)) {
6644 coprocessorHost.postGet(get, results);
6645 }
6646
6647 metricsUpdateForGet(results);
6648
6649 return results;
6650 }
6651
6652 void metricsUpdateForGet(List<Cell> results) {
6653 if (this.metricsRegion != null) {
6654 long totalSize = 0L;
6655 for (Cell cell : results) {
6656
6657
6658 totalSize += CellUtil.estimatedSerializedSizeOf(cell);
6659 }
6660 this.metricsRegion.updateGet(totalSize);
6661 }
6662 }
6663
6664 @Override
6665 public void mutateRow(RowMutations rm) throws IOException {
6666
6667 mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
6668 }
6669
6670
6671
6672
6673
6674 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6675 Collection<byte[]> rowsToLock) throws IOException {
6676 mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
6677 }
6678
6679
6680
6681
6682
6683
6684
6685
6686
6687
6688
6689
6690
6691 @Override
6692 public void mutateRowsWithLocks(Collection<Mutation> mutations,
6693 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
6694 MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
6695 processRowsWithLocks(proc, -1, nonceGroup, nonce);
6696 }
6697
6698
6699
6700
6701 public ClientProtos.RegionLoadStats getRegionStats() {
6702 if (!regionStatsEnabled) {
6703 return null;
6704 }
6705 ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder();
6706 stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this
6707 .memstoreFlushSize)));
6708 stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100);
6709 return stats.build();
6710 }
6711
6712 @Override
6713 public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
6714 processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE,
6715 HConstants.NO_NONCE);
6716 }
6717
6718 @Override
6719 public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
6720 throws IOException {
6721 processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
6722 }
6723
6724 @Override
6725 public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
6726 long nonceGroup, long nonce) throws IOException {
6727
6728 for (byte[] row : processor.getRowsToLock()) {
6729 checkRow(row, "processRowsWithLocks");
6730 }
6731 if (!processor.readOnly()) {
6732 checkReadOnly();
6733 }
6734 checkResources();
6735
6736 startRegionOperation();
6737 WALEdit walEdit = new WALEdit();
6738
6739
6740 try {
6741 processor.preProcess(this, walEdit);
6742 } catch (IOException e) {
6743 closeRegionOperation();
6744 throw e;
6745 }
6746
6747 if (processor.readOnly()) {
6748 try {
6749 long now = EnvironmentEdgeManager.currentTime();
6750 doProcessRowWithTimeout(
6751 processor, now, this, null, null, timeout);
6752 processor.postProcess(this, walEdit, true);
6753 } finally {
6754 closeRegionOperation();
6755 }
6756 return;
6757 }
6758
6759 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
6760 boolean locked;
6761 boolean walSyncSuccessful = false;
6762 List<RowLock> acquiredRowLocks;
6763 long addedSize = 0;
6764 List<Mutation> mutations = new ArrayList<Mutation>();
6765 Collection<byte[]> rowsToLock = processor.getRowsToLock();
6766 long mvccNum = 0;
6767 WALKey walKey = null;
6768 try {
6769
6770 acquiredRowLocks = new ArrayList<RowLock>(rowsToLock.size());
6771 for (byte[] row : rowsToLock) {
6772
6773
6774 acquiredRowLocks.add(getRowLock(row));
6775 }
6776
6777 lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size());
6778 locked = true;
6779
6780 long now = EnvironmentEdgeManager.currentTime();
6781 try {
6782
6783
6784 doProcessRowWithTimeout(
6785 processor, now, this, mutations, walEdit, timeout);
6786
6787 if (!mutations.isEmpty()) {
6788
6789
6790 processor.preBatchMutate(this, walEdit);
6791
6792 long txid = 0;
6793
6794 if (!walEdit.isEmpty()) {
6795
6796 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
6797 this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
6798 processor.getClusterIds(), nonceGroup, nonce, mvcc);
6799 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
6800 walKey, walEdit, false);
6801 }
6802 if(walKey == null){
6803
6804
6805 walKey = this.appendEmptyEdit(this.wal);
6806 }
6807
6808
6809 writeEntry = walKey.getWriteEntry();
6810 mvccNum = walKey.getSequenceId();
6811
6812
6813
6814
6815 for (Mutation m : mutations) {
6816
6817 rewriteCellTags(m.getFamilyCellMap(), m);
6818
6819 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6820 Cell cell = cellScanner.current();
6821 CellUtil.setSequenceId(cell, mvccNum);
6822 Store store = getStore(cell);
6823 if (store == null) {
6824 checkFamily(CellUtil.cloneFamily(cell));
6825
6826 }
6827 addedSize += store.add(cell);
6828 }
6829 }
6830
6831
6832 if (locked) {
6833 this.updatesLock.readLock().unlock();
6834 locked = false;
6835 }
6836
6837
6838 releaseRowLocks(acquiredRowLocks);
6839
6840
6841 if (txid != 0) {
6842 syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
6843 }
6844 walSyncSuccessful = true;
6845
6846 processor.postBatchMutate(this);
6847 }
6848 } finally {
6849
6850
6851
6852 if (!mutations.isEmpty() && !walSyncSuccessful) {
6853 LOG.warn("Wal sync failed. Roll back " + mutations.size() +
6854 " memstore keyvalues for row(s):" + StringUtils.byteToHexString(
6855 processor.getRowsToLock().iterator().next()) + "...");
6856 for (Mutation m : mutations) {
6857 for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
6858 Cell cell = cellScanner.current();
6859 getStore(cell).rollback(cell);
6860 }
6861 }
6862 if (writeEntry != null) {
6863 mvcc.complete(writeEntry);
6864 writeEntry = null;
6865 }
6866 }
6867
6868 if (writeEntry != null) {
6869 mvcc.completeAndWait(writeEntry);
6870 }
6871 if (locked) {
6872 this.updatesLock.readLock().unlock();
6873 }
6874
6875 releaseRowLocks(acquiredRowLocks);
6876 }
6877
6878
6879 processor.postProcess(this, walEdit, walSyncSuccessful);
6880
6881 } finally {
6882 closeRegionOperation();
6883 if (!mutations.isEmpty() &&
6884 isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
6885 requestFlush();
6886 }
6887 }
6888 }
6889
6890 private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
6891 final long now,
6892 final HRegion region,
6893 final List<Mutation> mutations,
6894 final WALEdit walEdit,
6895 final long timeout) throws IOException {
6896
6897 if (timeout < 0) {
6898 try {
6899 processor.process(now, region, mutations, walEdit);
6900 } catch (IOException e) {
6901 LOG.warn("RowProcessor:" + processor.getClass().getName() +
6902 " throws Exception on row(s):" +
6903 Bytes.toStringBinary(
6904 processor.getRowsToLock().iterator().next()) + "...", e);
6905 throw e;
6906 }
6907 return;
6908 }
6909
6910
6911 FutureTask<Void> task =
6912 new FutureTask<Void>(new Callable<Void>() {
6913 @Override
6914 public Void call() throws IOException {
6915 try {
6916 processor.process(now, region, mutations, walEdit);
6917 return null;
6918 } catch (IOException e) {
6919 LOG.warn("RowProcessor:" + processor.getClass().getName() +
6920 " throws Exception on row(s):" +
6921 Bytes.toStringBinary(
6922 processor.getRowsToLock().iterator().next()) + "...", e);
6923 throw e;
6924 }
6925 }
6926 });
6927 rowProcessorExecutor.execute(task);
6928 try {
6929 task.get(timeout, TimeUnit.MILLISECONDS);
6930 } catch (TimeoutException te) {
6931 LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
6932 Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
6933 "...");
6934 throw new IOException(te);
6935 } catch (Exception e) {
6936 throw new IOException(e);
6937 }
6938 }
6939
6940
6941
6942
6943
6944
6945 private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
6946 if (cell.getTagsLength() <= 0) return tags;
6947 List<Tag> newTags = tags == null? new ArrayList<Tag>():
6948 Iterator<Tag> i =
6949 CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
6950 while (i.hasNext()) newTags.add(i.next());
6951 return newTags;
6952 }
6953
6954
6955
6956
6957
6958
6959
6960
6961
6962
6963 private List<Cell> doGet(final Store store, final byte [] row,
6964 final Map.Entry<byte[], List<Cell>> family, final TimeRange tr)
6965 throws IOException {
6966
6967
6968
6969
6970 Collections.sort(family.getValue(), store.getComparator());
6971
6972 Get get = new Get(row);
6973 for (Cell cell : family.getValue()) {
6974 get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
6975 }
6976 if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax());
6977 return get(get, false);
6978 }
6979
6980 public Result append(Append append) throws IOException {
6981 return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
6982 }
6983
6984
6985
6986
6987
6988 @Override
6989 public Result append(Append mutate, long nonceGroup, long nonce) throws IOException {
6990 Operation op = Operation.APPEND;
6991 byte[] row = mutate.getRow();
6992 checkRow(row, op.toString());
6993 checkFamilies(mutate.getFamilyCellMap().keySet());
6994 boolean flush = false;
6995 Durability durability = getEffectiveDurability(mutate.getDurability());
6996 boolean writeToWAL = durability != Durability.SKIP_WAL;
6997 WALEdit walEdits = null;
6998 List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
6999 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
7000 long size = 0;
7001 long txid = 0;
7002 checkReadOnly();
7003 checkResources();
7004
7005 startRegionOperation(op);
7006 this.writeRequestsCount.increment();
7007 RowLock rowLock = null;
7008 WALKey walKey = null;
7009 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
7010 boolean doRollBackMemstore = false;
7011 try {
7012 rowLock = getRowLock(row);
7013 assert rowLock != null;
7014 try {
7015 lock(this.updatesLock.readLock());
7016 try {
7017
7018
7019 mvcc.await();
7020 if (this.coprocessorHost != null) {
7021 Result r = this.coprocessorHost.preAppendAfterRowLock(mutate);
7022 if (r!= null) {
7023 return r;
7024 }
7025 }
7026 long now = EnvironmentEdgeManager.currentTime();
7027
7028 for (Map.Entry<byte[], List<Cell>> family : mutate.getFamilyCellMap().entrySet()) {
7029 Store store = stores.get(family.getKey());
7030 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
7031
7032 List<Cell> results = doGet(store, row, family, null);
7033
7034
7035
7036
7037
7038
7039
7040 int idx = 0;
7041 for (Cell cell : family.getValue()) {
7042 Cell newCell;
7043 Cell oldCell = null;
7044 if (idx < results.size()
7045 && CellUtil.matchingQualifier(results.get(idx), cell)) {
7046 oldCell = results.get(idx);
7047 long ts = Math.max(now, oldCell.getTimestamp());
7048
7049
7050
7051 List<Tag> newTags = carryForwardTags(oldCell, new ArrayList<Tag>());
7052 newTags = carryForwardTags(cell, newTags);
7053
7054
7055
7056 if (mutate.getTTL() != Long.MAX_VALUE) {
7057
7058 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
7059 }
7060
7061
7062 byte[] tagBytes = Tag.fromList(newTags);
7063
7064
7065 newCell = new KeyValue(row.length, cell.getFamilyLength(),
7066 cell.getQualifierLength(), ts, KeyValue.Type.Put,
7067 oldCell.getValueLength() + cell.getValueLength(),
7068 tagBytes.length);
7069
7070 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
7071 newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
7072 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
7073 newCell.getFamilyArray(), newCell.getFamilyOffset(),
7074 cell.getFamilyLength());
7075 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
7076 newCell.getQualifierArray(), newCell.getQualifierOffset(),
7077 cell.getQualifierLength());
7078
7079 CellUtil.copyValueTo(oldCell, newCell.getValueArray(), newCell.getValueOffset());
7080 System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
7081 newCell.getValueArray(),
7082 newCell.getValueOffset() + oldCell.getValueLength(),
7083 cell.getValueLength());
7084
7085 System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
7086 tagBytes.length);
7087 idx++;
7088 } else {
7089
7090 CellUtil.updateLatestStamp(cell, now);
7091
7092
7093
7094 if (mutate.getTTL() != Long.MAX_VALUE) {
7095 List<Tag> newTags = new ArrayList<Tag>(1);
7096 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL())));
7097
7098 newCell = new TagRewriteCell(cell, Tag.fromList(newTags));
7099 } else {
7100 newCell = cell;
7101 }
7102 }
7103
7104
7105 if (coprocessorHost != null) {
7106 newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND,
7107 mutate, oldCell, newCell);
7108 }
7109 kvs.add(newCell);
7110
7111
7112 if (writeToWAL) {
7113 if (walEdits == null) {
7114 walEdits = new WALEdit();
7115 }
7116 walEdits.add(newCell);
7117 }
7118 }
7119
7120
7121 tempMemstore.put(store, kvs);
7122 }
7123
7124
7125 if (walEdits != null && !walEdits.isEmpty()) {
7126 if (writeToWAL) {
7127
7128
7129
7130
7131 walKey = new HLogKey(
7132 getRegionInfo().getEncodedNameAsBytes(),
7133 this.htableDescriptor.getTableName(),
7134 WALKey.NO_SEQUENCE_ID,
7135 nonceGroup,
7136 nonce,
7137 mvcc);
7138 txid =
7139 this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true);
7140 } else {
7141 recordMutationWithoutWal(mutate.getFamilyCellMap());
7142 }
7143 }
7144 if (walKey == null) {
7145
7146 walKey = this.appendEmptyEdit(this.wal);
7147 }
7148
7149
7150 writeEntry = walKey.getWriteEntry();
7151
7152
7153
7154 if (!tempMemstore.isEmpty()) {
7155 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
7156 Store store = entry.getKey();
7157 if (store.getFamily().getMaxVersions() == 1) {
7158
7159
7160 size += store.upsert(entry.getValue(), getSmallestReadPoint());
7161 } else {
7162
7163 for (Cell cell: entry.getValue()) {
7164 CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
7165 size += store.add(cell);
7166 doRollBackMemstore = true;
7167 }
7168 }
7169
7170
7171 allKVs.addAll(entry.getValue());
7172 }
7173
7174 size = this.addAndGetGlobalMemstoreSize(size);
7175 flush = isFlushSize(size);
7176 }
7177 } finally {
7178 this.updatesLock.readLock().unlock();
7179 }
7180
7181 } finally {
7182 rowLock.release();
7183 rowLock = null;
7184 }
7185
7186 if(txid != 0){
7187 syncOrDefer(txid, durability);
7188 }
7189 doRollBackMemstore = false;
7190 } finally {
7191 if (rowLock != null) {
7192 rowLock.release();
7193 }
7194
7195 if (doRollBackMemstore) {
7196 rollbackMemstore(allKVs);
7197 if (writeEntry != null) mvcc.complete(writeEntry);
7198 } else if (writeEntry != null) {
7199 mvcc.completeAndWait(writeEntry);
7200 }
7201
7202 closeRegionOperation(op);
7203 }
7204
7205 if (this.metricsRegion != null) {
7206 this.metricsRegion.updateAppend();
7207 }
7208
7209 if (flush) {
7210
7211 requestFlush();
7212 }
7213
7214 return mutate.isReturnResults() ? Result.create(allKVs) : null;
7215 }
7216
7217 public Result increment(Increment increment) throws IOException {
7218 return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
7219 }
7220
7221
7222
7223
7224
7225
7226
7227
7228
7229 @Override
7230 public Result increment(Increment mutation, long nonceGroup, long nonce)
7231 throws IOException {
7232 Operation op = Operation.INCREMENT;
7233 byte [] row = mutation.getRow();
7234 checkRow(row, op.toString());
7235 checkFamilies(mutation.getFamilyCellMap().keySet());
7236 boolean flush = false;
7237 Durability durability = getEffectiveDurability(mutation.getDurability());
7238 boolean writeToWAL = durability != Durability.SKIP_WAL;
7239 WALEdit walEdits = null;
7240 List<Cell> allKVs = new ArrayList<Cell>(mutation.size());
7241
7242 Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
7243 long size = 0;
7244 long txid = 0;
7245 checkReadOnly();
7246 checkResources();
7247
7248 startRegionOperation(op);
7249 this.writeRequestsCount.increment();
7250 RowLock rowLock = null;
7251 WALKey walKey = null;
7252 MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
7253 boolean doRollBackMemstore = false;
7254 TimeRange tr = mutation.getTimeRange();
7255 try {
7256 rowLock = getRowLock(row);
7257 assert rowLock != null;
7258 try {
7259 lock(this.updatesLock.readLock());
7260 try {
7261
7262
7263 mvcc.await();
7264 if (this.coprocessorHost != null) {
7265 Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation);
7266 if (r != null) {
7267 return r;
7268 }
7269 }
7270 long now = EnvironmentEdgeManager.currentTime();
7271
7272 for (Map.Entry<byte [], List<Cell>> family: mutation.getFamilyCellMap().entrySet()) {
7273 Store store = stores.get(family.getKey());
7274 List<Cell> kvs = new ArrayList<Cell>(family.getValue().size());
7275
7276 List<Cell> results = doGet(store, row, family, tr);
7277
7278
7279
7280
7281
7282
7283
7284 int idx = 0;
7285
7286 List<Cell> edits = family.getValue();
7287 for (int i = 0; i < edits.size(); i++) {
7288 Cell cell = edits.get(i);
7289 long amount = Bytes.toLong(CellUtil.cloneValue(cell));
7290 boolean noWriteBack = (amount == 0);
7291
7292 List<Tag> newTags = carryForwardTags(cell, new ArrayList<Tag>());
7293
7294 Cell c = null;
7295 long ts = now;
7296 if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
7297 c = results.get(idx);
7298 ts = Math.max(now, c.getTimestamp());
7299 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
7300 amount += CellUtil.getValueAsLong(c);
7301 } else {
7302
7303 throw new org.apache.hadoop.hbase.DoNotRetryIOException(
7304 "Attempted to increment field that isn't 64 bits wide");
7305 }
7306
7307 newTags = carryForwardTags(c, newTags);
7308 if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) {
7309 idx++;
7310 }
7311 }
7312
7313
7314 byte[] q = CellUtil.cloneQualifier(cell);
7315 byte[] val = Bytes.toBytes(amount);
7316
7317
7318 if (mutation.getTTL() != Long.MAX_VALUE) {
7319 newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL())));
7320 }
7321
7322 Cell newKV = new KeyValue(row, 0, row.length,
7323 family.getKey(), 0, family.getKey().length,
7324 q, 0, q.length,
7325 ts,
7326 KeyValue.Type.Put,
7327 val, 0, val.length,
7328 newTags);
7329
7330
7331 if (coprocessorHost != null) {
7332 newKV = coprocessorHost.postMutationBeforeWAL(
7333 RegionObserver.MutationType.INCREMENT, mutation, c, newKV);
7334 }
7335 allKVs.add(newKV);
7336
7337 if (!noWriteBack) {
7338 kvs.add(newKV);
7339
7340
7341 if (writeToWAL) {
7342 if (walEdits == null) {
7343 walEdits = new WALEdit();
7344 }
7345 walEdits.add(newKV);
7346 }
7347 }
7348 }
7349
7350
7351 if (!kvs.isEmpty()) {
7352 tempMemstore.put(store, kvs);
7353 }
7354 }
7355
7356
7357 if (walEdits != null && !walEdits.isEmpty()) {
7358 if (writeToWAL) {
7359
7360
7361
7362
7363 walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
7364 this.htableDescriptor.getTableName(),
7365 WALKey.NO_SEQUENCE_ID,
7366 nonceGroup,
7367 nonce,
7368 mvcc);
7369 txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(),
7370 walKey, walEdits, true);
7371 } else {
7372 recordMutationWithoutWal(mutation.getFamilyCellMap());
7373 }
7374 }
7375 if (walKey == null) {
7376
7377 walKey = this.appendEmptyEdit(this.wal);
7378 }
7379
7380
7381 writeEntry = walKey.getWriteEntry();
7382
7383
7384 if (!tempMemstore.isEmpty()) {
7385 for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
7386 Store store = entry.getKey();
7387 if (store.getFamily().getMaxVersions() == 1) {
7388
7389
7390 size += store.upsert(entry.getValue(), getSmallestReadPoint());
7391 } else {
7392
7393 for (Cell cell : entry.getValue()) {
7394 CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
7395 size += store.add(cell);
7396 doRollBackMemstore = true;
7397 }
7398 }
7399 }
7400 size = this.addAndGetGlobalMemstoreSize(size);
7401 flush = isFlushSize(size);
7402 }
7403 } finally {
7404 this.updatesLock.readLock().unlock();
7405 }
7406 } finally {
7407 rowLock.release();
7408 rowLock = null;
7409 }
7410
7411 if(txid != 0){
7412 syncOrDefer(txid, durability);
7413 }
7414 doRollBackMemstore = false;
7415 } finally {
7416 if (rowLock != null) {
7417 rowLock.release();
7418 }
7419
7420 if (doRollBackMemstore) {
7421 for(List<Cell> cells: tempMemstore.values()) {
7422 rollbackMemstore(cells);
7423 }
7424 if (writeEntry != null) mvcc.complete(writeEntry);
7425 } else if (writeEntry != null) {
7426 mvcc.completeAndWait(writeEntry);
7427 }
7428 closeRegionOperation(Operation.INCREMENT);
7429 if (this.metricsRegion != null) {
7430 this.metricsRegion.updateIncrement();
7431 }
7432 }
7433
7434 if (flush) {
7435
7436 requestFlush();
7437 }
7438 return mutation.isReturnResults() ? Result.create(allKVs) : null;
7439 }
7440
7441
7442
7443
7444
7445 void checkFamily(final byte [] family)
7446 throws NoSuchColumnFamilyException {
7447 if (!this.htableDescriptor.hasFamily(family)) {
7448 throw new NoSuchColumnFamilyException("Column family " +
7449 Bytes.toString(family) + " does not exist in region " + this
7450 + " in table " + this.htableDescriptor);
7451 }
7452 }
7453
7454 public static final long FIXED_OVERHEAD = ClassSize.align(
7455 ClassSize.OBJECT +
7456 ClassSize.ARRAY +
7457 43 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT +
7458 (14 * Bytes.SIZEOF_LONG) +
7459 5 * Bytes.SIZEOF_BOOLEAN);
7460
7461
7462
7463
7464
7465
7466
7467
7468
7469
7470
7471 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
7472 ClassSize.OBJECT +
7473 (2 * ClassSize.ATOMIC_BOOLEAN) +
7474 (3 * ClassSize.ATOMIC_LONG) +
7475 (2 * ClassSize.CONCURRENT_HASHMAP) +
7476 WriteState.HEAP_SIZE +
7477 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
7478 (2 * ClassSize.REENTRANT_LOCK) +
7479 MultiVersionConcurrencyControl.FIXED_SIZE
7480 + ClassSize.TREEMAP
7481 + 2 * ClassSize.ATOMIC_INTEGER
7482 ;
7483
7484 @Override
7485 public long heapSize() {
7486 long heapSize = DEEP_OVERHEAD;
7487 for (Store store : this.stores.values()) {
7488 heapSize += store.heapSize();
7489 }
7490
7491 return heapSize;
7492 }
7493
7494
7495
7496
7497
7498 private static void printUsageAndExit(final String message) {
7499 if (message != null && message.length() > 0) System.out.println(message);
7500 System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
7501 System.out.println("Options:");
7502 System.out.println(" major_compact Pass this option to major compact " +
7503 "passed region.");
7504 System.out.println("Default outputs scan of passed region.");
7505 System.exit(1);
7506 }
7507
7508 @Override
7509 public boolean registerService(Service instance) {
7510
7511
7512
7513 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
7514 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
7515 LOG.error("Coprocessor service " + serviceDesc.getFullName() +
7516 " already registered, rejecting request from " + instance
7517 );
7518 return false;
7519 }
7520
7521 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
7522 if (LOG.isDebugEnabled()) {
7523 LOG.debug("Registered coprocessor service: region=" +
7524 Bytes.toStringBinary(getRegionInfo().getRegionName()) +
7525 " service=" + serviceDesc.getFullName());
7526 }
7527 return true;
7528 }
7529
7530 @Override
7531 public Message execService(RpcController controller, CoprocessorServiceCall call)
7532 throws IOException {
7533 String serviceName = call.getServiceName();
7534 String methodName = call.getMethodName();
7535 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
7536 throw new UnknownProtocolException(null,
7537 "No registered coprocessor service found for name "+serviceName+
7538 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7539 }
7540
7541 Service service = coprocessorServiceHandlers.get(serviceName);
7542 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
7543 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
7544 if (methodDesc == null) {
7545 throw new UnknownProtocolException(service.getClass(),
7546 "Unknown method "+methodName+" called on service "+serviceName+
7547 " in region "+Bytes.toStringBinary(getRegionInfo().getRegionName()));
7548 }
7549
7550 Message.Builder builder = service.getRequestPrototype(methodDesc).newBuilderForType();
7551 ProtobufUtil.mergeFrom(builder, call.getRequest());
7552 Message request = builder.build();
7553
7554 if (coprocessorHost != null) {
7555 request = coprocessorHost.preEndpointInvocation(service, methodName, request);
7556 }
7557
7558 final Message.Builder responseBuilder =
7559 service.getResponsePrototype(methodDesc).newBuilderForType();
7560 service.callMethod(methodDesc, controller, request, new RpcCallback<Message>() {
7561 @Override
7562 public void run(Message message) {
7563 if (message != null) {
7564 responseBuilder.mergeFrom(message);
7565 }
7566 }
7567 });
7568
7569 if (coprocessorHost != null) {
7570 coprocessorHost.postEndpointInvocation(service, methodName, request, responseBuilder);
7571 }
7572
7573 IOException exception = ResponseConverter.getControllerException(controller);
7574 if (exception != null) {
7575 throw exception;
7576 }
7577
7578 return responseBuilder.build();
7579 }
7580
7581
7582
7583
7584
7585
7586 private static void processTable(final FileSystem fs, final Path p,
7587 final WALFactory walFactory, final Configuration c,
7588 final boolean majorCompact)
7589 throws IOException {
7590 HRegion region;
7591 FSTableDescriptors fst = new FSTableDescriptors(c);
7592
7593 if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
7594 final WAL wal = walFactory.getMetaWAL(
7595 HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes());
7596 region = HRegion.newHRegion(p, wal, fs, c,
7597 HRegionInfo.FIRST_META_REGIONINFO,
7598 fst.get(TableName.META_TABLE_NAME), null);
7599 } else {
7600 throw new IOException("Not a known catalog table: " + p.toString());
7601 }
7602 try {
7603 region.mvcc.advanceTo(region.initialize(null));
7604 if (majorCompact) {
7605 region.compact(true);
7606 } else {
7607
7608 Scan scan = new Scan();
7609
7610 RegionScanner scanner = region.getScanner(scan);
7611 try {
7612 List<Cell> kvs = new ArrayList<Cell>();
7613 boolean done;
7614 do {
7615 kvs.clear();
7616 done = scanner.next(kvs);
7617 if (kvs.size() > 0) LOG.info(kvs);
7618 } while (done);
7619 } finally {
7620 scanner.close();
7621 }
7622 }
7623 } finally {
7624 region.close();
7625 }
7626 }
7627
7628 boolean shouldForceSplit() {
7629 return this.splitRequest;
7630 }
7631
7632 byte[] getExplicitSplitPoint() {
7633 return this.explicitSplitPoint;
7634 }
7635
7636 void forceSplit(byte[] sp) {
7637
7638
7639 this.splitRequest = true;
7640 if (sp != null) {
7641 this.explicitSplitPoint = sp;
7642 }
7643 }
7644
7645 void clearSplit() {
7646 this.splitRequest = false;
7647 this.explicitSplitPoint = null;
7648 }
7649
7650
7651
7652
7653 protected void prepareToSplit() {
7654
7655 }
7656
7657
7658
7659
7660
7661
7662
7663 public byte[] checkSplit() {
7664
7665 if (this.getRegionInfo().isMetaTable() ||
7666 TableName.NAMESPACE_TABLE_NAME.equals(this.getRegionInfo().getTable())) {
7667 if (shouldForceSplit()) {
7668 LOG.warn("Cannot split meta region in HBase 0.20 and above");
7669 }
7670 return null;
7671 }
7672
7673
7674 if (this.isRecovering()) {
7675 LOG.info("Cannot split region " + this.getRegionInfo().getEncodedName() + " in recovery.");
7676 return null;
7677 }
7678
7679 if (!splitPolicy.shouldSplit()) {
7680 return null;
7681 }
7682
7683 byte[] ret = splitPolicy.getSplitPoint();
7684
7685 if (ret != null) {
7686 try {
7687 checkRow(ret, "calculated split");
7688 } catch (IOException e) {
7689 LOG.error("Ignoring invalid split", e);
7690 return null;
7691 }
7692 }
7693 return ret;
7694 }
7695
7696
7697
7698
7699 public int getCompactPriority() {
7700 int count = Integer.MAX_VALUE;
7701 for (Store store : stores.values()) {
7702 count = Math.min(count, store.getCompactPriority());
7703 }
7704 return count;
7705 }
7706
7707
7708
7709 @Override
7710 public RegionCoprocessorHost getCoprocessorHost() {
7711 return coprocessorHost;
7712 }
7713
7714
7715 public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
7716 this.coprocessorHost = coprocessorHost;
7717 }
7718
7719 @Override
7720 public void startRegionOperation() throws IOException {
7721 startRegionOperation(Operation.ANY);
7722 }
7723
7724 @Override
7725 public void startRegionOperation(Operation op) throws IOException {
7726 switch (op) {
7727 case GET:
7728 case SCAN:
7729 checkReadsEnabled();
7730 case INCREMENT:
7731 case APPEND:
7732 case SPLIT_REGION:
7733 case MERGE_REGION:
7734 case PUT:
7735 case DELETE:
7736 case BATCH_MUTATE:
7737 case COMPACT_REGION:
7738
7739 if (isRecovering() && (this.disallowWritesInRecovering ||
7740 (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) {
7741 throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() +
7742 " is recovering; cannot take reads");
7743 }
7744 break;
7745 default:
7746 break;
7747 }
7748 if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
7749 || op == Operation.COMPACT_REGION) {
7750
7751
7752 return;
7753 }
7754 if (this.closing.get()) {
7755 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
7756 }
7757 lock(lock.readLock());
7758 if (this.closed.get()) {
7759 lock.readLock().unlock();
7760 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
7761 }
7762 try {
7763 if (coprocessorHost != null) {
7764 coprocessorHost.postStartRegionOperation(op);
7765 }
7766 } catch (Exception e) {
7767 lock.readLock().unlock();
7768 throw new IOException(e);
7769 }
7770 }
7771
7772 @Override
7773 public void closeRegionOperation() throws IOException {
7774 closeRegionOperation(Operation.ANY);
7775 }
7776
7777
7778
7779
7780
7781
7782 public void closeRegionOperation(Operation operation) throws IOException {
7783 lock.readLock().unlock();
7784 if (coprocessorHost != null) {
7785 coprocessorHost.postCloseRegionOperation(operation);
7786 }
7787 }
7788
7789
7790
7791
7792
7793
7794
7795
7796
7797
7798 private void startBulkRegionOperation(boolean writeLockNeeded)
7799 throws NotServingRegionException, RegionTooBusyException, InterruptedIOException {
7800 if (this.closing.get()) {
7801 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closing");
7802 }
7803 if (writeLockNeeded) lock(lock.writeLock());
7804 else lock(lock.readLock());
7805 if (this.closed.get()) {
7806 if (writeLockNeeded) lock.writeLock().unlock();
7807 else lock.readLock().unlock();
7808 throw new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " is closed");
7809 }
7810 }
7811
7812
7813
7814
7815
7816 private void closeBulkRegionOperation(){
7817 if (lock.writeLock().isHeldByCurrentThread()) lock.writeLock().unlock();
7818 else lock.readLock().unlock();
7819 }
7820
7821
7822
7823
7824
7825 private void recordMutationWithoutWal(final Map<byte [], List<Cell>> familyMap) {
7826 numMutationsWithoutWAL.increment();
7827 if (numMutationsWithoutWAL.get() <= 1) {
7828 LOG.info("writing data to region " + this +
7829 " with WAL disabled. Data may be lost in the event of a crash.");
7830 }
7831
7832 long mutationSize = 0;
7833 for (List<Cell> cells: familyMap.values()) {
7834 assert cells instanceof RandomAccess;
7835 int listSize = cells.size();
7836 for (int i=0; i < listSize; i++) {
7837 Cell cell = cells.get(i);
7838
7839 mutationSize += KeyValueUtil.keyLength(cell) + cell.getValueLength();
7840 }
7841 }
7842
7843 dataInMemoryWithoutWAL.add(mutationSize);
7844 }
7845
7846 private void lock(final Lock lock)
7847 throws RegionTooBusyException, InterruptedIOException {
7848 lock(lock, 1);
7849 }
7850
7851
7852
7853
7854
7855
7856 private void lock(final Lock lock, final int multiplier)
7857 throws RegionTooBusyException, InterruptedIOException {
7858 try {
7859 final long waitTime = Math.min(maxBusyWaitDuration,
7860 busyWaitDuration * Math.min(multiplier, maxBusyWaitMultiplier));
7861 if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
7862 throw new RegionTooBusyException(
7863 "failed to get a lock in " + waitTime + " ms. " +
7864 "regionName=" + (this.getRegionInfo() == null ? "unknown" :
7865 this.getRegionInfo().getRegionNameAsString()) +
7866 ", server=" + (this.getRegionServerServices() == null ? "unknown" :
7867 this.getRegionServerServices().getServerName()));
7868 }
7869 } catch (InterruptedException ie) {
7870 LOG.info("Interrupted while waiting for a lock");
7871 InterruptedIOException iie = new InterruptedIOException();
7872 iie.initCause(ie);
7873 throw iie;
7874 }
7875 }
7876
7877
7878
7879
7880
7881
7882
7883 private void syncOrDefer(long txid, Durability durability) throws IOException {
7884 if (this.getRegionInfo().isMetaRegion()) {
7885 this.wal.sync(txid);
7886 } else {
7887 switch(durability) {
7888 case USE_DEFAULT:
7889
7890 if (shouldSyncWAL()) {
7891 this.wal.sync(txid);
7892 }
7893 break;
7894 case SKIP_WAL:
7895
7896 break;
7897 case ASYNC_WAL:
7898
7899 break;
7900 case SYNC_WAL:
7901 case FSYNC_WAL:
7902
7903 this.wal.sync(txid);
7904 break;
7905 default:
7906 throw new RuntimeException("Unknown durability " + durability);
7907 }
7908 }
7909 }
7910
7911
7912
7913
7914 private boolean shouldSyncWAL() {
7915 return durability.ordinal() > Durability.ASYNC_WAL.ordinal();
7916 }
7917
7918
7919
7920
7921 private static final List<Cell> MOCKED_LIST = new AbstractList<Cell>() {
7922
7923 @Override
7924 public void add(int index, Cell element) {
7925
7926 }
7927
7928 @Override
7929 public boolean addAll(int index, Collection<? extends Cell> c) {
7930 return false;
7931 }
7932
7933 @Override
7934 public KeyValue get(int index) {
7935 throw new UnsupportedOperationException();
7936 }
7937
7938 @Override
7939 public int size() {
7940 return 0;
7941 }
7942 };
7943
7944
7945
7946
7947
7948
7949
7950
7951
7952
7953 public static void main(String[] args) throws IOException {
7954 if (args.length < 1) {
7955 printUsageAndExit(null);
7956 }
7957 boolean majorCompact = false;
7958 if (args.length > 1) {
7959 if (!args[1].toLowerCase().startsWith("major")) {
7960 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
7961 }
7962 majorCompact = true;
7963 }
7964 final Path tableDir = new Path(args[0]);
7965 final Configuration c = HBaseConfiguration.create();
7966 final FileSystem fs = FileSystem.get(c);
7967 final Path logdir = new Path(c.get("hbase.tmp.dir"));
7968 final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis();
7969
7970 final Configuration walConf = new Configuration(c);
7971 FSUtils.setRootDir(walConf, logdir);
7972 final WALFactory wals = new WALFactory(walConf, null, logname);
7973 try {
7974 processTable(fs, tableDir, wals, c, majorCompact);
7975 } finally {
7976 wals.close();
7977
7978 BlockCache bc = new CacheConfig(c).getBlockCache();
7979 if (bc != null) bc.shutdown();
7980 }
7981 }
7982
7983 @Override
7984 public long getOpenSeqNum() {
7985 return this.openSeqNum;
7986 }
7987
7988 @Override
7989 public Map<byte[], Long> getMaxStoreSeqId() {
7990 return this.maxSeqIdInStores;
7991 }
7992
7993 @Override
7994 public long getOldestSeqIdOfStore(byte[] familyName) {
7995 return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);
7996 }
7997
7998 @Override
7999 public CompactionState getCompactionState() {
8000 boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
8001 return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
8002 : (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
8003 }
8004
8005 public void reportCompactionRequestStart(boolean isMajor){
8006 (isMajor ? majorInProgress : minorInProgress).incrementAndGet();
8007 }
8008
8009 public void reportCompactionRequestEnd(boolean isMajor, int numFiles, long filesSizeCompacted) {
8010 int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
8011
8012
8013 compactionsFinished.incrementAndGet();
8014 compactionNumFilesCompacted.addAndGet(numFiles);
8015 compactionNumBytesCompacted.addAndGet(filesSizeCompacted);
8016
8017 assert newValue >= 0;
8018 }
8019
8020
8021
8022
8023
8024 @VisibleForTesting
8025 public long getSequenceId() {
8026 return this.mvcc.getReadPoint();
8027 }
8028
8029
8030
8031
8032
8033
8034
8035
8036
8037 private WALKey appendEmptyEdit(final WAL wal) throws IOException {
8038
8039 @SuppressWarnings("deprecation")
8040 WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
8041 getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
8042 HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
8043
8044
8045
8046 try {
8047 wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false);
8048 } catch (Throwable t) {
8049
8050 getMVCC().complete(key.getWriteEntry());
8051 }
8052 return key;
8053 }
8054
8055
8056
8057
8058 @Override
8059 public void onConfigurationChange(Configuration conf) {
8060
8061 }
8062
8063
8064
8065
8066 @Override
8067 public void registerChildren(ConfigurationManager manager) {
8068 configurationManager = Optional.of(manager);
8069 for (Store s : this.stores.values()) {
8070 configurationManager.get().registerObserver(s);
8071 }
8072 }
8073
8074
8075
8076
8077 @Override
8078 public void deregisterChildren(ConfigurationManager manager) {
8079 for (Store s : this.stores.values()) {
8080 configurationManager.get().deregisterObserver(s);
8081 }
8082 }
8083
8084 @Override
8085 public CellComparator getCellCompartor() {
8086 return this.getRegionInfo().isMetaRegion() ? CellComparator.META_COMPARATOR
8087 : CellComparator.COMPARATOR;
8088 }
8089 }