1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.text.ParseException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableSet;
33 import java.util.Set;
34 import java.util.TreeMap;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.CompletionService;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorCompletionService;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.regex.Matcher;
50 import java.util.regex.Pattern;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FileAlreadyExistsException;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.hbase.Cell;
61 import org.apache.hadoop.hbase.CellScanner;
62 import org.apache.hadoop.hbase.CellUtil;
63 import org.apache.hadoop.hbase.CoordinatedStateManager;
64 import org.apache.hadoop.hbase.HBaseConfiguration;
65 import org.apache.hadoop.hbase.HConstants;
66 import org.apache.hadoop.hbase.HRegionInfo;
67 import org.apache.hadoop.hbase.HRegionLocation;
68 import org.apache.hadoop.hbase.MetaTableAccessor;
69 import org.apache.hadoop.hbase.ServerName;
70 import org.apache.hadoop.hbase.TableName;
71 import org.apache.hadoop.hbase.TableNotFoundException;
72 import org.apache.hadoop.hbase.classification.InterfaceAudience;
73 import org.apache.hadoop.hbase.client.ConnectionFactory;
74 import org.apache.hadoop.hbase.client.ConnectionUtils;
75 import org.apache.hadoop.hbase.client.Delete;
76 import org.apache.hadoop.hbase.client.Durability;
77 import org.apache.hadoop.hbase.client.HConnection;
78 import org.apache.hadoop.hbase.client.Mutation;
79 import org.apache.hadoop.hbase.client.Put;
80 import org.apache.hadoop.hbase.client.TableState;
81 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
82 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
83 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
84 import org.apache.hadoop.hbase.io.HeapSize;
85 import org.apache.hadoop.hbase.master.SplitLogManager;
86 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
87 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
88 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
89 import org.apache.hadoop.hbase.protobuf.RequestConverter;
90 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
91 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
92 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
93 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
94 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
95 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
96 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
97 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
98 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
99 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
100 import org.apache.hadoop.hbase.regionserver.HRegion;
101 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
102
103 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
104 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
105 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
106 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
107 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
108 import org.apache.hadoop.hbase.util.Bytes;
109 import org.apache.hadoop.hbase.util.CancelableProgressable;
110 import org.apache.hadoop.hbase.util.ClassSize;
111 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
112 import org.apache.hadoop.hbase.util.FSUtils;
113 import org.apache.hadoop.hbase.util.Pair;
114 import org.apache.hadoop.hbase.util.Threads;
115 import org.apache.hadoop.hbase.wal.WAL.Entry;
116 import org.apache.hadoop.hbase.wal.WAL.Reader;
117 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
118 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
119 import org.apache.hadoop.io.MultipleIOException;
120 import org.apache.hadoop.ipc.RemoteException;
121
122 import com.google.common.annotations.VisibleForTesting;
123 import com.google.common.base.Preconditions;
124 import com.google.common.collect.Lists;
125 import com.google.protobuf.ServiceException;
126 import com.google.protobuf.TextFormat;
127
128
129
130
131
132
133 @InterfaceAudience.Private
134 public class WALSplitter {
135 private static final Log LOG = LogFactory.getLog(WALSplitter.class);
136
137
138 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
139
140
141 protected final Path rootDir;
142 protected final FileSystem fs;
143 protected final Configuration conf;
144
145
146
147 PipelineController controller;
148 OutputSink outputSink;
149 EntryBuffers entryBuffers;
150
151 private Map<TableName, TableState> tableStatesCache =
152 new ConcurrentHashMap<>();
153 private BaseCoordinatedStateManager csm;
154 private final WALFactory walFactory;
155
156 private MonitoredTask status;
157
158
159 protected final LastSequenceId sequenceIdChecker;
160
161 protected boolean distributedLogReplay;
162
163
164 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
165
166
167 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
168 new ConcurrentHashMap<String, Map<byte[], Long>>();
169
170
171 protected String failedServerName = "";
172
173
174 private final int numWriterThreads;
175
176
177 private final int minBatchSize;
178
179 WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
180 FileSystem fs, LastSequenceId idChecker,
181 CoordinatedStateManager csm, RecoveryMode mode) {
182 this.conf = HBaseConfiguration.create(conf);
183 String codecClassName = conf
184 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
185 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
186 this.rootDir = rootDir;
187 this.fs = fs;
188 this.sequenceIdChecker = idChecker;
189 this.csm = (BaseCoordinatedStateManager)csm;
190 this.walFactory = factory;
191 this.controller = new PipelineController();
192
193 entryBuffers = new EntryBuffers(controller,
194 this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
195 128*1024*1024));
196
197
198
199 this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
200 this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
201
202 this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
203 if (csm != null && this.distributedLogReplay) {
204 outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
205 } else {
206 if (this.distributedLogReplay) {
207 LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
208 }
209 this.distributedLogReplay = false;
210 outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
211 }
212
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231 public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
232 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
233 CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
234 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
235 return s.splitLogFile(logfile, reporter);
236 }
237
238
239
240
241
242 @VisibleForTesting
243 public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
244 FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
245 final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
246 Collections.singletonList(logDir), null);
247 List<Path> splits = new ArrayList<Path>();
248 if (logfiles != null && logfiles.length > 0) {
249 for (FileStatus logfile: logfiles) {
250 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
251 RecoveryMode.LOG_SPLITTING);
252 if (s.splitLogFile(logfile, null)) {
253 finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
254 if (s.outputSink.splits != null) {
255 splits.addAll(s.outputSink.splits);
256 }
257 }
258 }
259 }
260 if (!fs.delete(logDir, true)) {
261 throw new IOException("Unable to delete src dir: " + logDir);
262 }
263 return splits;
264 }
265
266
267
268
269
270 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
271 Preconditions.checkState(status == null);
272 Preconditions.checkArgument(logfile.isFile(),
273 "passed in file status is for something other than a regular file.");
274 boolean isCorrupted = false;
275 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
276 SPLIT_SKIP_ERRORS_DEFAULT);
277 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
278 Path logPath = logfile.getPath();
279 boolean outputSinkStarted = false;
280 boolean progress_failed = false;
281 int editsCount = 0;
282 int editsSkipped = 0;
283
284 status =
285 TaskMonitor.get().createStatus(
286 "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
287 Reader in = null;
288 try {
289 long logLength = logfile.getLen();
290 LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
291 LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
292 status.setStatus("Opening log file");
293 if (reporter != null && !reporter.progress()) {
294 progress_failed = true;
295 return false;
296 }
297 try {
298 in = getReader(logfile, skipErrors, reporter);
299 } catch (CorruptedLogFileException e) {
300 LOG.warn("Could not get reader, corrupted log file " + logPath, e);
301 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
302 isCorrupted = true;
303 }
304 if (in == null) {
305 LOG.warn("Nothing to split in log file " + logPath);
306 return true;
307 }
308 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
309 int numOpenedFilesLastCheck = 0;
310 outputSink.setReporter(reporter);
311 outputSink.startWriterThreads();
312 outputSinkStarted = true;
313 Entry entry;
314 Long lastFlushedSequenceId = -1L;
315 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
316 failedServerName = (serverName == null) ? "" : serverName.getServerName();
317 while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
318 byte[] region = entry.getKey().getEncodedRegionName();
319 String encodedRegionNameAsStr = Bytes.toString(region);
320 lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
321 if (lastFlushedSequenceId == null) {
322 if (this.distributedLogReplay) {
323 RegionStoreSequenceIds ids =
324 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
325 encodedRegionNameAsStr);
326 if (ids != null) {
327 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
328 if (LOG.isDebugEnabled()) {
329 LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
330 TextFormat.shortDebugString(ids));
331 }
332 }
333 } else if (sequenceIdChecker != null) {
334 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
335 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
336 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
337 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
338 storeSeqId.getSequenceId());
339 }
340 regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
341 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
342 if (LOG.isDebugEnabled()) {
343 LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
344 TextFormat.shortDebugString(ids));
345 }
346 }
347 if (lastFlushedSequenceId == null) {
348 lastFlushedSequenceId = -1L;
349 }
350 lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
351 }
352 if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
353 editsSkipped++;
354 continue;
355 }
356 entryBuffers.appendEntry(entry);
357 editsCount++;
358 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
359
360 if (editsCount % interval == 0
361 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
362 numOpenedFilesLastCheck = this.getNumOpenWriters();
363 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
364 + " edits, skipped " + editsSkipped + " edits.";
365 status.setStatus("Split " + countsStr);
366 if (reporter != null && !reporter.progress()) {
367 progress_failed = true;
368 return false;
369 }
370 }
371 }
372 } catch (InterruptedException ie) {
373 IOException iie = new InterruptedIOException();
374 iie.initCause(ie);
375 throw iie;
376 } catch (CorruptedLogFileException e) {
377 LOG.warn("Could not parse, corrupted log file " + logPath, e);
378 csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
379 logfile.getPath().getName(), fs);
380 isCorrupted = true;
381 } catch (IOException e) {
382 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
383 throw e;
384 } finally {
385 LOG.debug("Finishing writing output logs and closing down.");
386 try {
387 if (null != in) {
388 in.close();
389 }
390 } catch (IOException exception) {
391 LOG.warn("Could not close wal reader: " + exception.getMessage());
392 LOG.debug("exception details", exception);
393 }
394 try {
395 if (outputSinkStarted) {
396
397
398 progress_failed = true;
399 progress_failed = outputSink.finishWritingAndClose() == null;
400 }
401 } finally {
402 String msg =
403 "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
404 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
405 ", length=" + logfile.getLen() +
406 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
407 LOG.info(msg);
408 status.markComplete(msg);
409 }
410 }
411 return !progress_failed;
412 }
413
414
415
416
417
418
419
420
421
422
423
424
425 public static void finishSplitLogFile(String logfile,
426 Configuration conf) throws IOException {
427 Path rootdir = FSUtils.getRootDir(conf);
428 Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
429 Path logPath;
430 if (FSUtils.isStartingWithPath(rootdir, logfile)) {
431 logPath = new Path(logfile);
432 } else {
433 logPath = new Path(rootdir, logfile);
434 }
435 finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
436 }
437
438 static void finishSplitLogFile(Path rootdir, Path oldLogDir,
439 Path logPath, Configuration conf) throws IOException {
440 List<Path> processedLogs = new ArrayList<Path>();
441 List<Path> corruptedLogs = new ArrayList<Path>();
442 FileSystem fs;
443 fs = rootdir.getFileSystem(conf);
444 if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
445 corruptedLogs.add(logPath);
446 } else {
447 processedLogs.add(logPath);
448 }
449 archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
450 Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
451 fs.delete(stagingDir, true);
452 }
453
454
455
456
457
458
459
460
461
462
463
464
465
466 private static void archiveLogs(
467 final List<Path> corruptedLogs,
468 final List<Path> processedLogs, final Path oldLogDir,
469 final FileSystem fs, final Configuration conf) throws IOException {
470 final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
471 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
472
473 if (!fs.mkdirs(corruptDir)) {
474 LOG.info("Unable to mkdir " + corruptDir);
475 }
476 fs.mkdirs(oldLogDir);
477
478
479
480 for (Path corrupted : corruptedLogs) {
481 Path p = new Path(corruptDir, corrupted.getName());
482 if (fs.exists(corrupted)) {
483 if (!fs.rename(corrupted, p)) {
484 LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
485 } else {
486 LOG.warn("Moved corrupted log " + corrupted + " to " + p);
487 }
488 }
489 }
490
491 for (Path p : processedLogs) {
492 Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
493 if (fs.exists(p)) {
494 if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
495 LOG.warn("Unable to move " + p + " to " + newPath);
496 } else {
497 LOG.info("Archived processed log " + p + " to " + newPath);
498 }
499 }
500 }
501 }
502
503
504
505
506
507
508
509
510
511
512
513
514
515 @SuppressWarnings("deprecation")
516 static Path getRegionSplitEditsPath(final FileSystem fs,
517 final Entry logEntry, final Path rootDir, boolean isCreate)
518 throws IOException {
519 Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
520 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
521 Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
522 Path dir = getRegionDirRecoveredEditsDir(regiondir);
523
524 if (!fs.exists(regiondir)) {
525 LOG.info("This region's directory doesn't exist: "
526 + regiondir.toString() + ". It is very likely that it was" +
527 " already split so it's safe to discard those edits.");
528 return null;
529 }
530 if (fs.exists(dir) && fs.isFile(dir)) {
531 Path tmp = new Path("/tmp");
532 if (!fs.exists(tmp)) {
533 fs.mkdirs(tmp);
534 }
535 tmp = new Path(tmp,
536 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
537 LOG.warn("Found existing old file: " + dir + ". It could be some "
538 + "leftover of an old installation. It should be a folder instead. "
539 + "So moving it to " + tmp);
540 if (!fs.rename(dir, tmp)) {
541 LOG.warn("Failed to sideline old file " + dir);
542 }
543 }
544
545 if (isCreate && !fs.exists(dir)) {
546 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
547 }
548
549
550 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
551 fileName = getTmpRecoveredEditsFileName(fileName);
552 return new Path(dir, fileName);
553 }
554
555 static String getTmpRecoveredEditsFileName(String fileName) {
556 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
557 }
558
559
560
561
562
563
564
565
566
567 static Path getCompletedRecoveredEditsFilePath(Path srcPath,
568 Long maximumEditLogSeqNum) {
569 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
570 return new Path(srcPath.getParent(), fileName);
571 }
572
573 static String formatRecoveredEditsFileName(final long seqid) {
574 return String.format("%019d", seqid);
575 }
576
577 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
578 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
579
580
581
582
583
584
585
586 public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
587 return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
588 }
589
590
591
592
593
594
595
596
597
598
599 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
600 final Path regiondir) throws IOException {
601 NavigableSet<Path> filesSorted = new TreeSet<Path>();
602 Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
603 if (!fs.exists(editsdir))
604 return filesSorted;
605 FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
606 @Override
607 public boolean accept(Path p) {
608 boolean result = false;
609 try {
610
611
612
613
614 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
615 result = fs.isFile(p) && m.matches();
616
617
618 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
619 result = false;
620 }
621
622 if (isSequenceIdFile(p)) {
623 result = false;
624 }
625 } catch (IOException e) {
626 LOG.warn("Failed isFile check on " + p);
627 }
628 return result;
629 }
630 });
631 if (files == null) {
632 return filesSorted;
633 }
634 for (FileStatus status : files) {
635 filesSorted.add(status.getPath());
636 }
637 return filesSorted;
638 }
639
640
641
642
643
644
645
646
647
648
649 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
650 throws IOException {
651 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
652 + System.currentTimeMillis());
653 if (!fs.rename(edits, moveAsideName)) {
654 LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
655 }
656 return moveAsideName;
657 }
658
659 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
660 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
661 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
662
663
664
665
666 @VisibleForTesting
667 public static boolean isSequenceIdFile(final Path file) {
668 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
669 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
670 }
671
672
673
674
675
676
677
678
679
680
681 public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
682 long newSeqId, long saftyBumper) throws IOException {
683
684 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
685 long maxSeqId = 0;
686 FileStatus[] files = null;
687 if (fs.exists(editsdir)) {
688 files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
689 @Override
690 public boolean accept(Path p) {
691 return isSequenceIdFile(p);
692 }
693 });
694 if (files != null) {
695 for (FileStatus status : files) {
696 String fileName = status.getPath().getName();
697 try {
698 Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
699 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
700 maxSeqId = Math.max(tmpSeqId, maxSeqId);
701 } catch (NumberFormatException ex) {
702 LOG.warn("Invalid SeqId File Name=" + fileName);
703 }
704 }
705 }
706 }
707 if (maxSeqId > newSeqId) {
708 newSeqId = maxSeqId;
709 }
710 newSeqId += saftyBumper;
711
712
713 Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
714 if (newSeqId != maxSeqId) {
715 try {
716 if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
717 throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
718 }
719 if (LOG.isDebugEnabled()) {
720 LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
721 + ", maxSeqId=" + maxSeqId);
722 }
723 } catch (FileAlreadyExistsException ignored) {
724
725 }
726 }
727
728 if (files != null) {
729 for (FileStatus status : files) {
730 if (newSeqIdFile.equals(status.getPath())) {
731 continue;
732 }
733 fs.delete(status.getPath(), false);
734 }
735 }
736 return newSeqId;
737 }
738
739
740
741
742
743
744
745
746
747 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
748 throws IOException, CorruptedLogFileException {
749 Path path = file.getPath();
750 long length = file.getLen();
751 Reader in;
752
753
754
755
756 if (length <= 0) {
757 LOG.warn("File " + path + " might be still open, length is 0");
758 }
759
760 try {
761 FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
762 try {
763 in = getReader(path, reporter);
764 } catch (EOFException e) {
765 if (length <= 0) {
766
767
768
769
770
771 LOG.warn("Could not open " + path + " for reading. File is empty", e);
772 return null;
773 } else {
774
775 return null;
776 }
777 }
778 } catch (IOException e) {
779 if (e instanceof FileNotFoundException) {
780
781 LOG.warn("File " + path + " doesn't exist anymore.", e);
782 return null;
783 }
784 if (!skipErrors || e instanceof InterruptedIOException) {
785 throw e;
786 }
787 CorruptedLogFileException t =
788 new CorruptedLogFileException("skipErrors=true Could not open wal " +
789 path + " ignoring");
790 t.initCause(e);
791 throw t;
792 }
793 return in;
794 }
795
796 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
797 throws CorruptedLogFileException, IOException {
798 try {
799 return in.next();
800 } catch (EOFException eof) {
801
802 LOG.info("EOF from wal " + path + ". continuing");
803 return null;
804 } catch (IOException e) {
805
806
807 if (e.getCause() != null &&
808 (e.getCause() instanceof ParseException ||
809 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
810 LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
811 + path + ". continuing");
812 return null;
813 }
814 if (!skipErrors) {
815 throw e;
816 }
817 CorruptedLogFileException t =
818 new CorruptedLogFileException("skipErrors=true Ignoring exception" +
819 " while parsing wal " + path + ". Marking as corrupted");
820 t.initCause(e);
821 throw t;
822 }
823 }
824
825
826
827
828
829 protected Writer createWriter(Path logfile)
830 throws IOException {
831 return walFactory.createRecoveredEditsWriter(fs, logfile);
832 }
833
834
835
836
837
838 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
839 return walFactory.createReader(fs, curLogFile, reporter);
840 }
841
842
843
844
845 private int getNumOpenWriters() {
846 int result = 0;
847 if (this.outputSink != null) {
848 result += this.outputSink.getNumOpenWriters();
849 }
850 return result;
851 }
852
853
854
855
856 public static class PipelineController {
857
858
859 AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
860
861
862
863 public final Object dataAvailable = new Object();
864
865 void writerThreadError(Throwable t) {
866 thrown.compareAndSet(null, t);
867 }
868
869
870
871
872 void checkForErrors() throws IOException {
873 Throwable thrown = this.thrown.get();
874 if (thrown == null) return;
875 if (thrown instanceof IOException) {
876 throw new IOException(thrown);
877 } else {
878 throw new RuntimeException(thrown);
879 }
880 }
881 }
882
883
884
885
886
887
888
889
890 public static class EntryBuffers {
891 PipelineController controller;
892
893 Map<byte[], RegionEntryBuffer> buffers =
894 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
895
896
897
898
899 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
900
901 long totalBuffered = 0;
902 long maxHeapUsage;
903
904 public EntryBuffers(PipelineController controller, long maxHeapUsage) {
905 this.controller = controller;
906 this.maxHeapUsage = maxHeapUsage;
907 }
908
909
910
911
912
913
914
915
916 public void appendEntry(Entry entry) throws InterruptedException, IOException {
917 WALKey key = entry.getKey();
918
919 RegionEntryBuffer buffer;
920 long incrHeap;
921 synchronized (this) {
922 buffer = buffers.get(key.getEncodedRegionName());
923 if (buffer == null) {
924 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
925 buffers.put(key.getEncodedRegionName(), buffer);
926 }
927 incrHeap= buffer.appendEntry(entry);
928 }
929
930
931 synchronized (controller.dataAvailable) {
932 totalBuffered += incrHeap;
933 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
934 LOG.debug("Used " + totalBuffered +
935 " bytes of buffered edits, waiting for IO threads...");
936 controller.dataAvailable.wait(2000);
937 }
938 controller.dataAvailable.notifyAll();
939 }
940 controller.checkForErrors();
941 }
942
943
944
945
946 synchronized RegionEntryBuffer getChunkToWrite() {
947 long biggestSize = 0;
948 byte[] biggestBufferKey = null;
949
950 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
951 long size = entry.getValue().heapSize();
952 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
953 biggestSize = size;
954 biggestBufferKey = entry.getKey();
955 }
956 }
957 if (biggestBufferKey == null) {
958 return null;
959 }
960
961 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
962 currentlyWriting.add(biggestBufferKey);
963 return buffer;
964 }
965
966 void doneWriting(RegionEntryBuffer buffer) {
967 synchronized (this) {
968 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
969 assert removed;
970 }
971 long size = buffer.heapSize();
972
973 synchronized (controller.dataAvailable) {
974 totalBuffered -= size;
975
976 controller.dataAvailable.notifyAll();
977 }
978 }
979
980 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
981 return currentlyWriting.contains(region);
982 }
983
984 public void waitUntilDrained() {
985 synchronized (controller.dataAvailable) {
986 while (totalBuffered > 0) {
987 try {
988 controller.dataAvailable.wait(2000);
989 } catch (InterruptedException e) {
990 LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
991 Thread.interrupted();
992 break;
993 }
994 }
995 }
996 }
997 }
998
999
1000
1001
1002
1003
1004
1005 public static class RegionEntryBuffer implements HeapSize {
1006 long heapInBuffer = 0;
1007 List<Entry> entryBuffer;
1008 TableName tableName;
1009 byte[] encodedRegionName;
1010
1011 RegionEntryBuffer(TableName tableName, byte[] region) {
1012 this.tableName = tableName;
1013 this.encodedRegionName = region;
1014 this.entryBuffer = new LinkedList<Entry>();
1015 }
1016
1017 long appendEntry(Entry entry) {
1018 internify(entry);
1019 entryBuffer.add(entry);
1020 long incrHeap = entry.getEdit().heapSize() +
1021 ClassSize.align(2 * ClassSize.REFERENCE) +
1022 0;
1023 heapInBuffer += incrHeap;
1024 return incrHeap;
1025 }
1026
1027 private void internify(Entry entry) {
1028 WALKey k = entry.getKey();
1029 k.internTableName(this.tableName);
1030 k.internEncodedRegionName(this.encodedRegionName);
1031 }
1032
1033 @Override
1034 public long heapSize() {
1035 return heapInBuffer;
1036 }
1037
1038 public byte[] getEncodedRegionName() {
1039 return encodedRegionName;
1040 }
1041
1042 public List<Entry> getEntryBuffer() {
1043 return entryBuffer;
1044 }
1045
1046 public TableName getTableName() {
1047 return tableName;
1048 }
1049 }
1050
1051 public static class WriterThread extends Thread {
1052 private volatile boolean shouldStop = false;
1053 private PipelineController controller;
1054 private EntryBuffers entryBuffers;
1055 private OutputSink outputSink = null;
1056
1057 WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1058 super(Thread.currentThread().getName() + "-Writer-" + i);
1059 this.controller = controller;
1060 this.entryBuffers = entryBuffers;
1061 outputSink = sink;
1062 }
1063
1064 @Override
1065 public void run() {
1066 try {
1067 doRun();
1068 } catch (Throwable t) {
1069 LOG.error("Exiting thread", t);
1070 controller.writerThreadError(t);
1071 }
1072 }
1073
1074 private void doRun() throws IOException {
1075 if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
1076 while (true) {
1077 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1078 if (buffer == null) {
1079
1080 synchronized (controller.dataAvailable) {
1081 if (shouldStop && !this.outputSink.flush()) {
1082 return;
1083 }
1084 try {
1085 controller.dataAvailable.wait(500);
1086 } catch (InterruptedException ie) {
1087 if (!shouldStop) {
1088 throw new RuntimeException(ie);
1089 }
1090 }
1091 }
1092 continue;
1093 }
1094
1095 assert buffer != null;
1096 try {
1097 writeBuffer(buffer);
1098 } finally {
1099 entryBuffers.doneWriting(buffer);
1100 }
1101 }
1102 }
1103
1104 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1105 outputSink.append(buffer);
1106 }
1107
1108 void finish() {
1109 synchronized (controller.dataAvailable) {
1110 shouldStop = true;
1111 controller.dataAvailable.notifyAll();
1112 }
1113 }
1114 }
1115
1116
1117
1118
1119
1120 public static abstract class OutputSink {
1121
1122 protected PipelineController controller;
1123 protected EntryBuffers entryBuffers;
1124
1125 protected Map<byte[], SinkWriter> writers = Collections
1126 .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1127
1128 protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1129 .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1130
1131 protected final List<WriterThread> writerThreads = Lists.newArrayList();
1132
1133
1134 protected final Set<byte[]> blacklistedRegions = Collections
1135 .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1136
1137 protected boolean closeAndCleanCompleted = false;
1138
1139 protected boolean writersClosed = false;
1140
1141 protected final int numThreads;
1142
1143 protected CancelableProgressable reporter = null;
1144
1145 protected AtomicLong skippedEdits = new AtomicLong();
1146
1147 protected List<Path> splits = null;
1148
1149 public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1150 numThreads = numWriters;
1151 this.controller = controller;
1152 this.entryBuffers = entryBuffers;
1153 }
1154
1155 void setReporter(CancelableProgressable reporter) {
1156 this.reporter = reporter;
1157 }
1158
1159
1160
1161
1162 public synchronized void startWriterThreads() {
1163 for (int i = 0; i < numThreads; i++) {
1164 WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1165 t.start();
1166 writerThreads.add(t);
1167 }
1168 }
1169
1170
1171
1172
1173
1174 void updateRegionMaximumEditLogSeqNum(Entry entry) {
1175 synchronized (regionMaximumEditLogSeqNum) {
1176 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1177 .getEncodedRegionName());
1178 if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1179 regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1180 .getLogSeqNum());
1181 }
1182 }
1183 }
1184
1185 Long getRegionMaximumEditLogSeqNum(byte[] region) {
1186 return regionMaximumEditLogSeqNum.get(region);
1187 }
1188
1189
1190
1191
1192 int getNumOpenWriters() {
1193 return this.writers.size();
1194 }
1195
1196 long getSkippedEdits() {
1197 return this.skippedEdits.get();
1198 }
1199
1200
1201
1202
1203
1204
1205 protected boolean finishWriting(boolean interrupt) throws IOException {
1206 LOG.debug("Waiting for split writer threads to finish");
1207 boolean progress_failed = false;
1208 for (WriterThread t : writerThreads) {
1209 t.finish();
1210 }
1211 if (interrupt) {
1212 for (WriterThread t : writerThreads) {
1213 t.interrupt();
1214 }
1215 }
1216
1217 for (WriterThread t : writerThreads) {
1218 if (!progress_failed && reporter != null && !reporter.progress()) {
1219 progress_failed = true;
1220 }
1221 try {
1222 t.join();
1223 } catch (InterruptedException ie) {
1224 IOException iie = new InterruptedIOException();
1225 iie.initCause(ie);
1226 throw iie;
1227 }
1228 }
1229 controller.checkForErrors();
1230 LOG.info(this.writerThreads.size() + " split writers finished; closing...");
1231 return (!progress_failed);
1232 }
1233
1234 public abstract List<Path> finishWritingAndClose() throws IOException;
1235
1236
1237
1238
1239 public abstract Map<byte[], Long> getOutputCounts();
1240
1241
1242
1243
1244 public abstract int getNumberOfRecoveredRegions();
1245
1246
1247
1248
1249
1250 public abstract void append(RegionEntryBuffer buffer) throws IOException;
1251
1252
1253
1254
1255
1256 public boolean flush() throws IOException {
1257 return false;
1258 }
1259 }
1260
1261
1262
1263
1264 class LogRecoveredEditsOutputSink extends OutputSink {
1265
1266 public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1267 int numWriters) {
1268
1269
1270
1271
1272
1273 super(controller, entryBuffers, numWriters);
1274 }
1275
1276
1277
1278
1279
1280 @Override
1281 public List<Path> finishWritingAndClose() throws IOException {
1282 boolean isSuccessful = false;
1283 List<Path> result = null;
1284 try {
1285 isSuccessful = finishWriting(false);
1286 } finally {
1287 result = close();
1288 List<IOException> thrown = closeLogWriters(null);
1289 if (thrown != null && !thrown.isEmpty()) {
1290 throw MultipleIOException.createIOException(thrown);
1291 }
1292 }
1293 if (isSuccessful) {
1294 splits = result;
1295 }
1296 return splits;
1297 }
1298
1299
1300
1301
1302
1303 private List<Path> close() throws IOException {
1304 Preconditions.checkState(!closeAndCleanCompleted);
1305
1306 final List<Path> paths = new ArrayList<Path>();
1307 final List<IOException> thrown = Lists.newArrayList();
1308 ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1309 TimeUnit.SECONDS, new ThreadFactory() {
1310 private int count = 1;
1311
1312 @Override
1313 public Thread newThread(Runnable r) {
1314 Thread t = new Thread(r, "split-log-closeStream-" + count++);
1315 return t;
1316 }
1317 });
1318 CompletionService<Void> completionService =
1319 new ExecutorCompletionService<Void>(closeThreadPool);
1320 for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1321 if (LOG.isTraceEnabled()) {
1322 LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1323 }
1324 completionService.submit(new Callable<Void>() {
1325 @Override
1326 public Void call() throws Exception {
1327 WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1328 if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
1329 try {
1330 wap.w.close();
1331 } catch (IOException ioe) {
1332 LOG.error("Couldn't close log at " + wap.p, ioe);
1333 thrown.add(ioe);
1334 return null;
1335 }
1336 if (LOG.isDebugEnabled()) {
1337 LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1338 + " edits, skipped " + wap.editsSkipped + " edits in "
1339 + (wap.nanosSpent / 1000 / 1000) + "ms");
1340 }
1341 if (wap.editsWritten == 0) {
1342
1343 if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1344 LOG.warn("Failed deleting empty " + wap.p);
1345 throw new IOException("Failed deleting empty " + wap.p);
1346 }
1347 return null;
1348 }
1349
1350 Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1351 regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1352 try {
1353 if (!dst.equals(wap.p) && fs.exists(dst)) {
1354 LOG.warn("Found existing old edits file. It could be the "
1355 + "result of a previous failed split attempt. Deleting " + dst + ", length="
1356 + fs.getFileStatus(dst).getLen());
1357 if (!fs.delete(dst, false)) {
1358 LOG.warn("Failed deleting of old " + dst);
1359 throw new IOException("Failed deleting of old " + dst);
1360 }
1361 }
1362
1363
1364
1365 if (fs.exists(wap.p)) {
1366 if (!fs.rename(wap.p, dst)) {
1367 throw new IOException("Failed renaming " + wap.p + " to " + dst);
1368 }
1369 LOG.info("Rename " + wap.p + " to " + dst);
1370 }
1371 } catch (IOException ioe) {
1372 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1373 thrown.add(ioe);
1374 return null;
1375 }
1376 paths.add(dst);
1377 return null;
1378 }
1379 });
1380 }
1381
1382 boolean progress_failed = false;
1383 try {
1384 for (int i = 0, n = this.writers.size(); i < n; i++) {
1385 Future<Void> future = completionService.take();
1386 future.get();
1387 if (!progress_failed && reporter != null && !reporter.progress()) {
1388 progress_failed = true;
1389 }
1390 }
1391 } catch (InterruptedException e) {
1392 IOException iie = new InterruptedIOException();
1393 iie.initCause(e);
1394 throw iie;
1395 } catch (ExecutionException e) {
1396 throw new IOException(e.getCause());
1397 } finally {
1398 closeThreadPool.shutdownNow();
1399 }
1400
1401 if (!thrown.isEmpty()) {
1402 throw MultipleIOException.createIOException(thrown);
1403 }
1404 writersClosed = true;
1405 closeAndCleanCompleted = true;
1406 if (progress_failed) {
1407 return null;
1408 }
1409 return paths;
1410 }
1411
1412 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1413 if (writersClosed) {
1414 return thrown;
1415 }
1416
1417 if (thrown == null) {
1418 thrown = Lists.newArrayList();
1419 }
1420 try {
1421 for (WriterThread t : writerThreads) {
1422 while (t.isAlive()) {
1423 t.shouldStop = true;
1424 t.interrupt();
1425 try {
1426 t.join(10);
1427 } catch (InterruptedException e) {
1428 IOException iie = new InterruptedIOException();
1429 iie.initCause(e);
1430 throw iie;
1431 }
1432 }
1433 }
1434 } finally {
1435 synchronized (writers) {
1436 WriterAndPath wap = null;
1437 for (SinkWriter tmpWAP : writers.values()) {
1438 try {
1439 wap = (WriterAndPath) tmpWAP;
1440 wap.w.close();
1441 } catch (IOException ioe) {
1442 LOG.error("Couldn't close log at " + wap.p, ioe);
1443 thrown.add(ioe);
1444 continue;
1445 }
1446 LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1447 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1448 }
1449 }
1450 writersClosed = true;
1451 }
1452
1453 return thrown;
1454 }
1455
1456
1457
1458
1459
1460
1461 private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1462 byte region[] = entry.getKey().getEncodedRegionName();
1463 WriterAndPath ret = (WriterAndPath) writers.get(region);
1464 if (ret != null) {
1465 return ret;
1466 }
1467
1468
1469 if (blacklistedRegions.contains(region)) {
1470 return null;
1471 }
1472 ret = createWAP(region, entry, rootDir);
1473 if (ret == null) {
1474 blacklistedRegions.add(region);
1475 return null;
1476 }
1477 writers.put(region, ret);
1478 return ret;
1479 }
1480
1481
1482
1483
1484 private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1485 Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1486 if (regionedits == null) {
1487 return null;
1488 }
1489 if (fs.exists(regionedits)) {
1490 LOG.warn("Found old edits file. It could be the "
1491 + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1492 + fs.getFileStatus(regionedits).getLen());
1493 if (!fs.delete(regionedits, false)) {
1494 LOG.warn("Failed delete of old " + regionedits);
1495 }
1496 }
1497 Writer w = createWriter(regionedits);
1498 LOG.debug("Creating writer path=" + regionedits);
1499 return new WriterAndPath(regionedits, w);
1500 }
1501
1502 private void filterCellByStore(Entry logEntry) {
1503 Map<byte[], Long> maxSeqIdInStores =
1504 regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1505 if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1506 return;
1507 }
1508 List<Cell> skippedCells = new ArrayList<Cell>();
1509 for (Cell cell : logEntry.getEdit().getCells()) {
1510 if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1511 byte[] family = CellUtil.cloneFamily(cell);
1512 Long maxSeqId = maxSeqIdInStores.get(family);
1513
1514
1515 if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) {
1516 skippedCells.add(cell);
1517 }
1518 }
1519 }
1520 if (!skippedCells.isEmpty()) {
1521 logEntry.getEdit().getCells().removeAll(skippedCells);
1522 }
1523 }
1524
1525 @Override
1526 public void append(RegionEntryBuffer buffer) throws IOException {
1527 List<Entry> entries = buffer.entryBuffer;
1528 if (entries.isEmpty()) {
1529 LOG.warn("got an empty buffer, skipping");
1530 return;
1531 }
1532
1533 WriterAndPath wap = null;
1534
1535 long startTime = System.nanoTime();
1536 try {
1537 int editsCount = 0;
1538
1539 for (Entry logEntry : entries) {
1540 if (wap == null) {
1541 wap = getWriterAndPath(logEntry);
1542 if (wap == null) {
1543 if (LOG.isDebugEnabled()) {
1544 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1545 }
1546 return;
1547 }
1548 }
1549 filterCellByStore(logEntry);
1550 if (!logEntry.getEdit().isEmpty()) {
1551 wap.w.append(logEntry);
1552 this.updateRegionMaximumEditLogSeqNum(logEntry);
1553 editsCount++;
1554 } else {
1555 wap.incrementSkippedEdits(1);
1556 }
1557 }
1558
1559 wap.incrementEdits(editsCount);
1560 wap.incrementNanoTime(System.nanoTime() - startTime);
1561 } catch (IOException e) {
1562 e = e instanceof RemoteException ?
1563 ((RemoteException)e).unwrapRemoteException() : e;
1564 LOG.fatal(" Got while writing log entry to log", e);
1565 throw e;
1566 }
1567 }
1568
1569
1570
1571
1572 @Override
1573 public Map<byte[], Long> getOutputCounts() {
1574 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1575 synchronized (writers) {
1576 for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1577 ret.put(entry.getKey(), entry.getValue().editsWritten);
1578 }
1579 }
1580 return ret;
1581 }
1582
1583 @Override
1584 public int getNumberOfRecoveredRegions() {
1585 return writers.size();
1586 }
1587 }
1588
1589
1590
1591
1592 public abstract static class SinkWriter {
1593
1594 long editsWritten = 0;
1595
1596 long editsSkipped = 0;
1597
1598 long nanosSpent = 0;
1599
1600 void incrementEdits(int edits) {
1601 editsWritten += edits;
1602 }
1603
1604 void incrementSkippedEdits(int skipped) {
1605 editsSkipped += skipped;
1606 }
1607
1608 void incrementNanoTime(long nanos) {
1609 nanosSpent += nanos;
1610 }
1611 }
1612
1613
1614
1615
1616
1617 private final static class WriterAndPath extends SinkWriter {
1618 final Path p;
1619 final Writer w;
1620
1621 WriterAndPath(final Path p, final Writer w) {
1622 this.p = p;
1623 this.w = w;
1624 }
1625 }
1626
1627
1628
1629
1630 class LogReplayOutputSink extends OutputSink {
1631 private static final double BUFFER_THRESHOLD = 0.35;
1632 private static final String KEY_DELIMITER = "#";
1633
1634 private long waitRegionOnlineTimeOut;
1635 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1636 private final Map<String, RegionServerWriter> writers =
1637 new ConcurrentHashMap<String, RegionServerWriter>();
1638
1639 private final Map<String, HRegionLocation> onlineRegions =
1640 new ConcurrentHashMap<String, HRegionLocation>();
1641
1642 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1643 .synchronizedMap(new TreeMap<TableName, HConnection>());
1644
1645
1646
1647
1648 private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1649 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1650 private List<Throwable> thrown = new ArrayList<Throwable>();
1651
1652
1653
1654
1655
1656 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1657 private boolean hasEditsInDisablingOrDisabledTables = false;
1658
1659 public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1660 int numWriters) {
1661 super(controller, entryBuffers, numWriters);
1662 this.waitRegionOnlineTimeOut =
1663 conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1664 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1665 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1666 entryBuffers, numWriters);
1667 this.logRecoveredEditsOutputSink.setReporter(reporter);
1668 }
1669
1670 @Override
1671 public void append(RegionEntryBuffer buffer) throws IOException {
1672 List<Entry> entries = buffer.entryBuffer;
1673 if (entries.isEmpty()) {
1674 LOG.warn("got an empty buffer, skipping");
1675 return;
1676 }
1677
1678
1679 if (isTableDisabledOrDisabling(buffer.tableName)) {
1680
1681 logRecoveredEditsOutputSink.append(buffer);
1682 hasEditsInDisablingOrDisabledTables = true;
1683
1684 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1685 return;
1686 }
1687
1688
1689 groupEditsByServer(entries);
1690
1691
1692 String maxLocKey = null;
1693 int maxSize = 0;
1694 List<Pair<HRegionLocation, Entry>> maxQueue = null;
1695 synchronized (this.serverToBufferQueueMap) {
1696 for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
1697 this.serverToBufferQueueMap.entrySet()) {
1698 List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue();
1699 if (curQueue.size() > maxSize) {
1700 maxSize = curQueue.size();
1701 maxQueue = curQueue;
1702 maxLocKey = entry.getKey();
1703 }
1704 }
1705 if (maxSize < minBatchSize
1706 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1707
1708 return;
1709 } else if (maxSize > 0) {
1710 this.serverToBufferQueueMap.remove(maxLocKey);
1711 }
1712 }
1713
1714 if (maxSize > 0) {
1715 processWorkItems(maxLocKey, maxQueue);
1716 }
1717 }
1718
1719 private void addToRecoveredRegions(String encodedRegionName) {
1720 if (!recoveredRegions.contains(encodedRegionName)) {
1721 recoveredRegions.add(encodedRegionName);
1722 }
1723 }
1724
1725
1726
1727
1728
1729 private void groupEditsByServer(List<Entry> entries) throws IOException {
1730 Set<TableName> nonExistentTables = null;
1731 Long cachedLastFlushedSequenceId = -1l;
1732 for (Entry entry : entries) {
1733 WALEdit edit = entry.getEdit();
1734 TableName table = entry.getKey().getTablename();
1735
1736 entry.getKey().setScopes(null);
1737 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1738
1739 if (nonExistentTables != null && nonExistentTables.contains(table)) {
1740 this.skippedEdits.incrementAndGet();
1741 continue;
1742 }
1743
1744 Map<byte[], Long> maxStoreSequenceIds = null;
1745 boolean needSkip = false;
1746 HRegionLocation loc = null;
1747 String locKey = null;
1748 List<Cell> cells = edit.getCells();
1749 List<Cell> skippedCells = new ArrayList<Cell>();
1750 HConnection hconn = this.getConnectionByTableName(table);
1751
1752 for (Cell cell : cells) {
1753 byte[] row = CellUtil.cloneRow(cell);
1754 byte[] family = CellUtil.cloneFamily(cell);
1755 boolean isCompactionEntry = false;
1756 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1757 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1758 if (compaction != null && compaction.hasRegionName()) {
1759 try {
1760 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1761 .toByteArray());
1762 row = regionName[1];
1763 family = compaction.getFamilyName().toByteArray();
1764 isCompactionEntry = true;
1765 } catch (Exception ex) {
1766 LOG.warn("Unexpected exception received, ignoring " + ex);
1767 skippedCells.add(cell);
1768 continue;
1769 }
1770 } else {
1771 skippedCells.add(cell);
1772 continue;
1773 }
1774 }
1775
1776 try {
1777 loc =
1778 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1779 encodeRegionNameStr);
1780
1781 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1782 loc.getRegionInfo().getEncodedName())) {
1783 LOG.info("Not replaying a compaction marker for an older region: "
1784 + encodeRegionNameStr);
1785 needSkip = true;
1786 }
1787 } catch (TableNotFoundException ex) {
1788
1789 LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1790 + encodeRegionNameStr);
1791 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1792 if (nonExistentTables == null) {
1793 nonExistentTables = new TreeSet<TableName>();
1794 }
1795 nonExistentTables.add(table);
1796 this.skippedEdits.incrementAndGet();
1797 needSkip = true;
1798 break;
1799 }
1800
1801 cachedLastFlushedSequenceId =
1802 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1803 if (cachedLastFlushedSequenceId != null
1804 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1805
1806 this.skippedEdits.incrementAndGet();
1807 needSkip = true;
1808 break;
1809 } else {
1810 if (maxStoreSequenceIds == null) {
1811 maxStoreSequenceIds =
1812 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1813 }
1814 if (maxStoreSequenceIds != null) {
1815 Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1816 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1817
1818 skippedCells.add(cell);
1819 continue;
1820 }
1821 }
1822 }
1823 }
1824
1825
1826 if (loc == null || needSkip) continue;
1827
1828 if (!skippedCells.isEmpty()) {
1829 cells.removeAll(skippedCells);
1830 }
1831
1832 synchronized (serverToBufferQueueMap) {
1833 locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1834 List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1835 if (queue == null) {
1836 queue =
1837 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1838 serverToBufferQueueMap.put(locKey, queue);
1839 }
1840 queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1841 }
1842
1843 addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1844 }
1845 }
1846
1847
1848
1849
1850
1851
1852 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1853 TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1854
1855 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1856 if(loc != null) return loc;
1857
1858 loc = hconn.getRegionLocation(table, row, true);
1859 if (loc == null) {
1860 throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1861 + " of table:" + table);
1862 }
1863
1864 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1865
1866 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1867 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1868 if (tmpLoc != null) return tmpLoc;
1869 }
1870
1871 Long lastFlushedSequenceId = -1l;
1872 AtomicBoolean isRecovering = new AtomicBoolean(true);
1873 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1874 if (!isRecovering.get()) {
1875
1876
1877 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1878 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1879 + " because it's not in recovering.");
1880 } else {
1881 Long cachedLastFlushedSequenceId =
1882 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1883
1884
1885
1886 RegionStoreSequenceIds ids =
1887 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1888 loc.getRegionInfo().getEncodedName());
1889 if (ids != null) {
1890 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1891 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1892 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1893 for (StoreSequenceId id : maxSeqIdInStores) {
1894 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1895 }
1896 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1897 }
1898
1899 if (cachedLastFlushedSequenceId == null
1900 || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1901 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1902 }
1903 }
1904
1905 onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1906 return loc;
1907 }
1908
1909 private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1910 throws IOException {
1911 RegionServerWriter rsw = null;
1912
1913 long startTime = System.nanoTime();
1914 try {
1915 rsw = getRegionServerWriter(key);
1916 rsw.sink.replayEntries(actions);
1917
1918
1919 rsw.incrementEdits(actions.size());
1920 rsw.incrementNanoTime(System.nanoTime() - startTime);
1921 } catch (IOException e) {
1922 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1923 LOG.fatal(" Got while writing log entry to log", e);
1924 throw e;
1925 }
1926 }
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1938 final long timeout, AtomicBoolean isRecovering)
1939 throws IOException {
1940 final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1941 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1942 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1943 boolean reloadLocation = false;
1944 TableName tableName = loc.getRegionInfo().getTable();
1945 int tries = 0;
1946 Throwable cause = null;
1947 while (endTime > EnvironmentEdgeManager.currentTime()) {
1948 try {
1949
1950 HConnection hconn = getConnectionByTableName(tableName);
1951 if(reloadLocation) {
1952 loc = hconn.getRegionLocation(tableName, row, true);
1953 }
1954 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1955 HRegionInfo region = loc.getRegionInfo();
1956 try {
1957 GetRegionInfoRequest request =
1958 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1959 GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1960 if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1961 isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1962 return loc;
1963 }
1964 } catch (ServiceException se) {
1965 throw ProtobufUtil.getRemoteException(se);
1966 }
1967 } catch (IOException e) {
1968 cause = e.getCause();
1969 if(!(cause instanceof RegionOpeningException)) {
1970 reloadLocation = true;
1971 }
1972 }
1973 long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1974 try {
1975 Thread.sleep(expectedSleep);
1976 } catch (InterruptedException e) {
1977 throw new IOException("Interrupted when waiting region " +
1978 loc.getRegionInfo().getEncodedName() + " online.", e);
1979 }
1980 tries++;
1981 }
1982
1983 throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1984 " online for " + timeout + " milliseconds.", cause);
1985 }
1986
1987 @Override
1988 public boolean flush() throws IOException {
1989 String curLoc = null;
1990 int curSize = 0;
1991 List<Pair<HRegionLocation, Entry>> curQueue = null;
1992 synchronized (this.serverToBufferQueueMap) {
1993 for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
1994 this.serverToBufferQueueMap.entrySet()) {
1995 curQueue = entry.getValue();
1996 if (!curQueue.isEmpty()) {
1997 curSize = curQueue.size();
1998 curLoc = entry.getKey();
1999 break;
2000 }
2001 }
2002 if (curSize > 0) {
2003 this.serverToBufferQueueMap.remove(curLoc);
2004 }
2005 }
2006
2007 if (curSize > 0) {
2008 this.processWorkItems(curLoc, curQueue);
2009
2010 synchronized(controller.dataAvailable) {
2011 controller.dataAvailable.notifyAll();
2012 }
2013 return true;
2014 }
2015 return false;
2016 }
2017
2018 void addWriterError(Throwable t) {
2019 thrown.add(t);
2020 }
2021
2022 @Override
2023 public List<Path> finishWritingAndClose() throws IOException {
2024 try {
2025 if (!finishWriting(false)) {
2026 return null;
2027 }
2028 if (hasEditsInDisablingOrDisabledTables) {
2029 splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2030 } else {
2031 splits = new ArrayList<Path>();
2032 }
2033
2034 return splits;
2035 } finally {
2036 List<IOException> thrown = closeRegionServerWriters();
2037 if (thrown != null && !thrown.isEmpty()) {
2038 throw MultipleIOException.createIOException(thrown);
2039 }
2040 }
2041 }
2042
2043 @Override
2044 int getNumOpenWriters() {
2045 return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2046 }
2047
2048 private List<IOException> closeRegionServerWriters() throws IOException {
2049 List<IOException> result = null;
2050 if (!writersClosed) {
2051 result = Lists.newArrayList();
2052 try {
2053 for (WriterThread t : writerThreads) {
2054 while (t.isAlive()) {
2055 t.shouldStop = true;
2056 t.interrupt();
2057 try {
2058 t.join(10);
2059 } catch (InterruptedException e) {
2060 IOException iie = new InterruptedIOException();
2061 iie.initCause(e);
2062 throw iie;
2063 }
2064 }
2065 }
2066 } finally {
2067 synchronized (writers) {
2068 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2069 RegionServerWriter tmpW = entry.getValue();
2070 try {
2071 tmpW.close();
2072 } catch (IOException ioe) {
2073 LOG.error("Couldn't close writer for region server:" + entry.getKey(), ioe);
2074 result.add(ioe);
2075 }
2076 }
2077 }
2078
2079
2080 synchronized (this.tableNameToHConnectionMap) {
2081 for (Map.Entry<TableName,HConnection> entry :
2082 this.tableNameToHConnectionMap.entrySet()) {
2083 HConnection hconn = entry.getValue();
2084 try {
2085 hconn.clearRegionCache();
2086 hconn.close();
2087 } catch (IOException ioe) {
2088 result.add(ioe);
2089 }
2090 }
2091 }
2092 writersClosed = true;
2093 }
2094 }
2095 return result;
2096 }
2097
2098 @Override
2099 public Map<byte[], Long> getOutputCounts() {
2100 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2101 synchronized (writers) {
2102 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2103 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2104 }
2105 }
2106 return ret;
2107 }
2108
2109 @Override
2110 public int getNumberOfRecoveredRegions() {
2111 return this.recoveredRegions.size();
2112 }
2113
2114 private boolean isTableDisabledOrDisabling(TableName tableName) {
2115 if (csm == null)
2116 return false;
2117 if (tableName.isSystemTable())
2118 return false;
2119 TableState tableState = tableStatesCache.get(tableName);
2120 if (tableState == null) {
2121 try {
2122 tableState =
2123 MetaTableAccessor.getTableState(csm.getServer().getConnection(), tableName);
2124 if (tableState != null)
2125 tableStatesCache.put(tableName, tableState);
2126 } catch (IOException e) {
2127 LOG.warn("State is not accessible for table " + tableName, e);
2128 }
2129 }
2130 return tableState != null && tableState
2131 .inStates(TableState.State.DISABLED, TableState.State.DISABLING);
2132 }
2133
2134
2135
2136
2137
2138
2139 private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2140 RegionServerWriter ret = writers.get(loc);
2141 if (ret != null) {
2142 return ret;
2143 }
2144
2145 TableName tableName = getTableFromLocationStr(loc);
2146 if(tableName == null){
2147 throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2148 }
2149
2150 HConnection hconn = getConnectionByTableName(tableName);
2151 synchronized (writers) {
2152 ret = writers.get(loc);
2153 if (ret == null) {
2154 ret = new RegionServerWriter(conf, tableName, hconn);
2155 writers.put(loc, ret);
2156 }
2157 }
2158 return ret;
2159 }
2160
2161 private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2162 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2163 if (hconn == null) {
2164 synchronized (this.tableNameToHConnectionMap) {
2165 hconn = this.tableNameToHConnectionMap.get(tableName);
2166 if (hconn == null) {
2167 hconn = (HConnection) ConnectionFactory.createConnection(conf);
2168 this.tableNameToHConnectionMap.put(tableName, hconn);
2169 }
2170 }
2171 }
2172 return hconn;
2173 }
2174 private TableName getTableFromLocationStr(String loc) {
2175
2176
2177
2178 String[] splits = loc.split(KEY_DELIMITER);
2179 if (splits.length != 2) {
2180 return null;
2181 }
2182 return TableName.valueOf(splits[1]);
2183 }
2184 }
2185
2186
2187
2188
2189
2190 private final static class RegionServerWriter extends SinkWriter {
2191 final WALEditsReplaySink sink;
2192
2193 RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2194 throws IOException {
2195 this.sink = new WALEditsReplaySink(conf, tableName, conn);
2196 }
2197
2198 void close() throws IOException {
2199 }
2200 }
2201
2202 static class CorruptedLogFileException extends Exception {
2203 private static final long serialVersionUID = 1L;
2204
2205 CorruptedLogFileException(String s) {
2206 super(s);
2207 }
2208 }
2209
2210
2211 public static class MutationReplay {
2212 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2213 this.type = type;
2214 this.mutation = mutation;
2215 if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2216
2217 this.mutation.setDurability(Durability.ASYNC_WAL);
2218 }
2219 this.nonceGroup = nonceGroup;
2220 this.nonce = nonce;
2221 }
2222
2223 public final MutationType type;
2224 public final Mutation mutation;
2225 public final long nonceGroup;
2226 public final long nonce;
2227 }
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2240 Pair<WALKey, WALEdit> logEntry, Durability durability)
2241 throws IOException {
2242
2243 if (entry == null) {
2244
2245 return new ArrayList<MutationReplay>();
2246 }
2247
2248 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2249 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2250 int count = entry.getAssociatedCellCount();
2251 List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2252 Cell previousCell = null;
2253 Mutation m = null;
2254 WALKey key = null;
2255 WALEdit val = null;
2256 if (logEntry != null) val = new WALEdit();
2257
2258 for (int i = 0; i < count; i++) {
2259
2260 if (!cells.advance()) {
2261 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2262 }
2263 Cell cell = cells.current();
2264 if (val != null) val.add(cell);
2265
2266 boolean isNewRowOrType =
2267 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2268 || !CellUtil.matchingRow(previousCell, cell);
2269 if (isNewRowOrType) {
2270
2271 if (CellUtil.isDelete(cell)) {
2272 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2273
2274 mutations.add(new MutationReplay(
2275 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2276 } else {
2277 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2278
2279 long nonceGroup = entry.getKey().hasNonceGroup()
2280 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2281 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2282 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2283 }
2284 }
2285 if (CellUtil.isDelete(cell)) {
2286 ((Delete) m).addDeleteMarker(cell);
2287 } else {
2288 ((Put) m).add(cell);
2289 }
2290 m.setDurability(durability);
2291 previousCell = cell;
2292 }
2293
2294
2295 if (logEntry != null) {
2296 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2297 List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2298 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2299 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2300 }
2301
2302 key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2303 walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2304 clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
2305 logEntry.setFirst(key);
2306 logEntry.setSecond(val);
2307 }
2308
2309 return mutations;
2310 }
2311 }