View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.wal;
20  
21  import java.io.EOFException;
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.io.InterruptedIOException;
25  import java.text.ParseException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashSet;
29  import java.util.LinkedList;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableSet;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.TreeSet;
36  import java.util.UUID;
37  import java.util.concurrent.Callable;
38  import java.util.concurrent.CompletionService;
39  import java.util.concurrent.ConcurrentHashMap;
40  import java.util.concurrent.ExecutionException;
41  import java.util.concurrent.ExecutorCompletionService;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.ThreadFactory;
44  import java.util.concurrent.ThreadPoolExecutor;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicBoolean;
47  import java.util.concurrent.atomic.AtomicLong;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.regex.Matcher;
50  import java.util.regex.Pattern;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  import org.apache.hadoop.conf.Configuration;
55  import org.apache.hadoop.fs.FileAlreadyExistsException;
56  import org.apache.hadoop.fs.FileStatus;
57  import org.apache.hadoop.fs.FileSystem;
58  import org.apache.hadoop.fs.Path;
59  import org.apache.hadoop.fs.PathFilter;
60  import org.apache.hadoop.hbase.Cell;
61  import org.apache.hadoop.hbase.CellScanner;
62  import org.apache.hadoop.hbase.CellUtil;
63  import org.apache.hadoop.hbase.CoordinatedStateManager;
64  import org.apache.hadoop.hbase.HBaseConfiguration;
65  import org.apache.hadoop.hbase.HConstants;
66  import org.apache.hadoop.hbase.HRegionInfo;
67  import org.apache.hadoop.hbase.HRegionLocation;
68  import org.apache.hadoop.hbase.MetaTableAccessor;
69  import org.apache.hadoop.hbase.ServerName;
70  import org.apache.hadoop.hbase.TableName;
71  import org.apache.hadoop.hbase.TableNotFoundException;
72  import org.apache.hadoop.hbase.classification.InterfaceAudience;
73  import org.apache.hadoop.hbase.client.ConnectionFactory;
74  import org.apache.hadoop.hbase.client.ConnectionUtils;
75  import org.apache.hadoop.hbase.client.Delete;
76  import org.apache.hadoop.hbase.client.Durability;
77  import org.apache.hadoop.hbase.client.HConnection;
78  import org.apache.hadoop.hbase.client.Mutation;
79  import org.apache.hadoop.hbase.client.Put;
80  import org.apache.hadoop.hbase.client.TableState;
81  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
82  import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
83  import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
84  import org.apache.hadoop.hbase.io.HeapSize;
85  import org.apache.hadoop.hbase.master.SplitLogManager;
86  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
87  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
88  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
89  import org.apache.hadoop.hbase.protobuf.RequestConverter;
90  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
91  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
92  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
93  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
94  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
95  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
96  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
97  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
98  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
99  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
100 import org.apache.hadoop.hbase.regionserver.HRegion;
101 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
102 // imports for things that haven't moved from regionserver.wal yet.
103 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
104 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
105 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
106 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
107 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
108 import org.apache.hadoop.hbase.util.Bytes;
109 import org.apache.hadoop.hbase.util.CancelableProgressable;
110 import org.apache.hadoop.hbase.util.ClassSize;
111 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
112 import org.apache.hadoop.hbase.util.FSUtils;
113 import org.apache.hadoop.hbase.util.Pair;
114 import org.apache.hadoop.hbase.util.Threads;
115 import org.apache.hadoop.hbase.wal.WAL.Entry;
116 import org.apache.hadoop.hbase.wal.WAL.Reader;
117 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
118 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
119 import org.apache.hadoop.io.MultipleIOException;
120 import org.apache.hadoop.ipc.RemoteException;
121 
122 import com.google.common.annotations.VisibleForTesting;
123 import com.google.common.base.Preconditions;
124 import com.google.common.collect.Lists;
125 import com.google.protobuf.ServiceException;
126 import com.google.protobuf.TextFormat;
127 
128 /**
129  * This class is responsible for splitting up a bunch of regionserver commit log
130  * files that are no longer being written to, into new files, one per region for
131  * region to replay on startup. Delete the old log files when finished.
132  */
133 @InterfaceAudience.Private
134 public class WALSplitter {
135   private static final Log LOG = LogFactory.getLog(WALSplitter.class);
136 
137   /** By default we retry errors in splitting, rather than skipping. */
138   public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
139 
140   // Parameters for split process
141   protected final Path rootDir;
142   protected final FileSystem fs;
143   protected final Configuration conf;
144 
145   // Major subcomponents of the split process.
146   // These are separated into inner classes to make testing easier.
147   PipelineController controller;
148   OutputSink outputSink;
149   EntryBuffers entryBuffers;
150 
151   private Map<TableName, TableState> tableStatesCache =
152       new ConcurrentHashMap<>();
153   private BaseCoordinatedStateManager csm;
154   private final WALFactory walFactory;
155 
156   private MonitoredTask status;
157 
158   // For checking the latest flushed sequence id
159   protected final LastSequenceId sequenceIdChecker;
160 
161   protected boolean distributedLogReplay;
162 
163   // Map encodedRegionName -> lastFlushedSequenceId
164   protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
165 
166   // Map encodedRegionName -> maxSeqIdInStores
167   protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
168       new ConcurrentHashMap<String, Map<byte[], Long>>();
169 
170   // Failed region server that the wal file being split belongs to
171   protected String failedServerName = "";
172 
173   // Number of writer threads
174   private final int numWriterThreads;
175 
176   // Min batch size when replay WAL edits
177   private final int minBatchSize;
178 
179   WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
180       FileSystem fs, LastSequenceId idChecker,
181       CoordinatedStateManager csm, RecoveryMode mode) {
182     this.conf = HBaseConfiguration.create(conf);
183     String codecClassName = conf
184         .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
185     this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
186     this.rootDir = rootDir;
187     this.fs = fs;
188     this.sequenceIdChecker = idChecker;
189     this.csm = (BaseCoordinatedStateManager)csm;
190     this.walFactory = factory;
191     this.controller = new PipelineController();
192 
193     entryBuffers = new EntryBuffers(controller,
194         this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
195             128*1024*1024));
196 
197     // a larger minBatchSize may slow down recovery because replay writer has to wait for
198     // enough edits before replaying them
199     this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
200     this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
201 
202     this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
203     if (csm != null && this.distributedLogReplay) {
204       outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
205     } else {
206       if (this.distributedLogReplay) {
207         LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
208       }
209       this.distributedLogReplay = false;
210       outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
211     }
212 
213   }
214 
215   /**
216    * Splits a WAL file into region's recovered-edits directory.
217    * This is the main entry point for distributed log splitting from SplitLogWorker.
218    * <p>
219    * If the log file has N regions then N recovered.edits files will be produced.
220    * <p>
221    * @param rootDir
222    * @param logfile
223    * @param fs
224    * @param conf
225    * @param reporter
226    * @param idChecker
227    * @param cp coordination state manager
228    * @return false if it is interrupted by the progress-able.
229    * @throws IOException
230    */
231   public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
232       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
233       CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
234     WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
235     return s.splitLogFile(logfile, reporter);
236   }
237 
238   // A wrapper to split one log folder using the method used by distributed
239   // log splitting. Used by tools and unit tests. It should be package private.
240   // It is public only because TestWALObserver is in a different package,
241   // which uses this method to do log splitting.
242   @VisibleForTesting
243   public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
244       FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
245     final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
246         Collections.singletonList(logDir), null);
247     List<Path> splits = new ArrayList<Path>();
248     if (logfiles != null && logfiles.length > 0) {
249       for (FileStatus logfile: logfiles) {
250         WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
251             RecoveryMode.LOG_SPLITTING);
252         if (s.splitLogFile(logfile, null)) {
253           finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
254           if (s.outputSink.splits != null) {
255             splits.addAll(s.outputSink.splits);
256           }
257         }
258       }
259     }
260     if (!fs.delete(logDir, true)) {
261       throw new IOException("Unable to delete src dir: " + logDir);
262     }
263     return splits;
264   }
265 
266   /**
267    * log splitting implementation, splits one log file.
268    * @param logfile should be an actual log file.
269    */
270   boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
271     Preconditions.checkState(status == null);
272     Preconditions.checkArgument(logfile.isFile(),
273         "passed in file status is for something other than a regular file.");
274     boolean isCorrupted = false;
275     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
276       SPLIT_SKIP_ERRORS_DEFAULT);
277     int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
278     Path logPath = logfile.getPath();
279     boolean outputSinkStarted = false;
280     boolean progress_failed = false;
281     int editsCount = 0;
282     int editsSkipped = 0;
283 
284     status =
285         TaskMonitor.get().createStatus(
286           "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
287     Reader in = null;
288     try {
289       long logLength = logfile.getLen();
290       LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
291       LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
292       status.setStatus("Opening log file");
293       if (reporter != null && !reporter.progress()) {
294         progress_failed = true;
295         return false;
296       }
297       try {
298         in = getReader(logfile, skipErrors, reporter);
299       } catch (CorruptedLogFileException e) {
300         LOG.warn("Could not get reader, corrupted log file " + logPath, e);
301         ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
302         isCorrupted = true;
303       }
304       if (in == null) {
305         LOG.warn("Nothing to split in log file " + logPath);
306         return true;
307       }
308       int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
309       int numOpenedFilesLastCheck = 0;
310       outputSink.setReporter(reporter);
311       outputSink.startWriterThreads();
312       outputSinkStarted = true;
313       Entry entry;
314       Long lastFlushedSequenceId = -1L;
315       ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
316       failedServerName = (serverName == null) ? "" : serverName.getServerName();
317       while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
318         byte[] region = entry.getKey().getEncodedRegionName();
319         String encodedRegionNameAsStr = Bytes.toString(region);
320         lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
321         if (lastFlushedSequenceId == null) {
322           if (this.distributedLogReplay) {
323             RegionStoreSequenceIds ids =
324                 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
325                   encodedRegionNameAsStr);
326             if (ids != null) {
327               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
328               if (LOG.isDebugEnabled()) {
329                 LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
330                   TextFormat.shortDebugString(ids));
331               }
332             }
333           } else if (sequenceIdChecker != null) {
334             RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
335             Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
336             for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
337               maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
338                 storeSeqId.getSequenceId());
339             }
340             regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
341             lastFlushedSequenceId = ids.getLastFlushedSequenceId();
342             if (LOG.isDebugEnabled()) {
343               LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
344                   TextFormat.shortDebugString(ids));
345             }
346           }
347           if (lastFlushedSequenceId == null) {
348             lastFlushedSequenceId = -1L;
349           }
350           lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
351         }
352         if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
353           editsSkipped++;
354           continue;
355         }
356         entryBuffers.appendEntry(entry);
357         editsCount++;
358         int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
359         // If sufficient edits have passed, check if we should report progress.
360         if (editsCount % interval == 0
361             || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
362           numOpenedFilesLastCheck = this.getNumOpenWriters();
363           String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
364               + " edits, skipped " + editsSkipped + " edits.";
365           status.setStatus("Split " + countsStr);
366           if (reporter != null && !reporter.progress()) {
367             progress_failed = true;
368             return false;
369           }
370         }
371       }
372     } catch (InterruptedException ie) {
373       IOException iie = new InterruptedIOException();
374       iie.initCause(ie);
375       throw iie;
376     } catch (CorruptedLogFileException e) {
377       LOG.warn("Could not parse, corrupted log file " + logPath, e);
378       csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
379         logfile.getPath().getName(), fs);
380       isCorrupted = true;
381     } catch (IOException e) {
382       e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
383       throw e;
384     } finally {
385       LOG.debug("Finishing writing output logs and closing down.");
386       try {
387         if (null != in) {
388           in.close();
389         }
390       } catch (IOException exception) {
391         LOG.warn("Could not close wal reader: " + exception.getMessage());
392         LOG.debug("exception details", exception);
393       }
394       try {
395         if (outputSinkStarted) {
396           // Set progress_failed to true as the immediate following statement will reset its value
397           // when finishWritingAndClose() throws exception, progress_failed has the right value
398           progress_failed = true;
399           progress_failed = outputSink.finishWritingAndClose() == null;
400         }
401       } finally {
402         String msg =
403             "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
404                 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
405                 ", length=" + logfile.getLen() + // See if length got updated post lease recovery
406                 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
407         LOG.info(msg);
408         status.markComplete(msg);
409       }
410     }
411     return !progress_failed;
412   }
413 
414   /**
415    * Completes the work done by splitLogFile by archiving logs
416    * <p>
417    * It is invoked by SplitLogManager once it knows that one of the
418    * SplitLogWorkers have completed the splitLogFile() part. If the master
419    * crashes then this function might get called multiple times.
420    * <p>
421    * @param logfile
422    * @param conf
423    * @throws IOException
424    */
425   public static void finishSplitLogFile(String logfile,
426       Configuration conf)  throws IOException {
427     Path rootdir = FSUtils.getRootDir(conf);
428     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
429     Path logPath;
430     if (FSUtils.isStartingWithPath(rootdir, logfile)) {
431       logPath = new Path(logfile);
432     } else {
433       logPath = new Path(rootdir, logfile);
434     }
435     finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
436   }
437 
438   static void finishSplitLogFile(Path rootdir, Path oldLogDir,
439       Path logPath, Configuration conf) throws IOException {
440     List<Path> processedLogs = new ArrayList<Path>();
441     List<Path> corruptedLogs = new ArrayList<Path>();
442     FileSystem fs;
443     fs = rootdir.getFileSystem(conf);
444     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
445       corruptedLogs.add(logPath);
446     } else {
447       processedLogs.add(logPath);
448     }
449     archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
450     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
451     fs.delete(stagingDir, true);
452   }
453 
454   /**
455    * Moves processed logs to a oldLogDir after successful processing Moves
456    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
457    * (.corrupt) for later investigation
458    *
459    * @param corruptedLogs
460    * @param processedLogs
461    * @param oldLogDir
462    * @param fs
463    * @param conf
464    * @throws IOException
465    */
466   private static void archiveLogs(
467       final List<Path> corruptedLogs,
468       final List<Path> processedLogs, final Path oldLogDir,
469       final FileSystem fs, final Configuration conf) throws IOException {
470     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
471         "hbase.regionserver.hlog.splitlog.corrupt.dir",  HConstants.CORRUPT_DIR_NAME));
472 
473     if (!fs.mkdirs(corruptDir)) {
474       LOG.info("Unable to mkdir " + corruptDir);
475     }
476     fs.mkdirs(oldLogDir);
477 
478     // this method can get restarted or called multiple times for archiving
479     // the same log files.
480     for (Path corrupted : corruptedLogs) {
481       Path p = new Path(corruptDir, corrupted.getName());
482       if (fs.exists(corrupted)) {
483         if (!fs.rename(corrupted, p)) {
484           LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
485         } else {
486           LOG.warn("Moved corrupted log " + corrupted + " to " + p);
487         }
488       }
489     }
490 
491     for (Path p : processedLogs) {
492       Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
493       if (fs.exists(p)) {
494         if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
495           LOG.warn("Unable to move  " + p + " to " + newPath);
496         } else {
497           LOG.info("Archived processed log " + p + " to " + newPath);
498         }
499       }
500     }
501   }
502 
503   /**
504    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
505    * <code>logEntry</code> named for the sequenceid in the passed
506    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
507    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
508    * creating it if necessary.
509    * @param fs
510    * @param logEntry
511    * @param rootDir HBase root dir.
512    * @return Path to file into which to dump split log edits.
513    * @throws IOException
514    */
515   @SuppressWarnings("deprecation")
516   static Path getRegionSplitEditsPath(final FileSystem fs,
517       final Entry logEntry, final Path rootDir, boolean isCreate)
518   throws IOException {
519     Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
520     String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
521     Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
522     Path dir = getRegionDirRecoveredEditsDir(regiondir);
523 
524     if (!fs.exists(regiondir)) {
525       LOG.info("This region's directory doesn't exist: "
526           + regiondir.toString() + ". It is very likely that it was" +
527           " already split so it's safe to discard those edits.");
528       return null;
529     }
530     if (fs.exists(dir) && fs.isFile(dir)) {
531       Path tmp = new Path("/tmp");
532       if (!fs.exists(tmp)) {
533         fs.mkdirs(tmp);
534       }
535       tmp = new Path(tmp,
536         HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
537       LOG.warn("Found existing old file: " + dir + ". It could be some "
538         + "leftover of an old installation. It should be a folder instead. "
539         + "So moving it to " + tmp);
540       if (!fs.rename(dir, tmp)) {
541         LOG.warn("Failed to sideline old file " + dir);
542       }
543     }
544 
545     if (isCreate && !fs.exists(dir)) {
546       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
547     }
548     // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
549     // region's replayRecoveredEdits will not delete it
550     String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
551     fileName = getTmpRecoveredEditsFileName(fileName);
552     return new Path(dir, fileName);
553   }
554 
555   static String getTmpRecoveredEditsFileName(String fileName) {
556     return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
557   }
558 
559   /**
560    * Get the completed recovered edits file path, renaming it to be by last edit
561    * in the file from its first edit. Then we could use the name to skip
562    * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
563    * @param srcPath
564    * @param maximumEditLogSeqNum
565    * @return dstPath take file's last edit log seq num as the name
566    */
567   static Path getCompletedRecoveredEditsFilePath(Path srcPath,
568       Long maximumEditLogSeqNum) {
569     String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
570     return new Path(srcPath.getParent(), fileName);
571   }
572 
573   static String formatRecoveredEditsFileName(final long seqid) {
574     return String.format("%019d", seqid);
575   }
576 
577   private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
578   private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
579 
580   /**
581    * @param regiondir
582    *          This regions directory in the filesystem.
583    * @return The directory that holds recovered edits files for the region
584    *         <code>regiondir</code>
585    */
586   public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
587     return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
588   }
589 
590   /**
591    * Returns sorted set of edit files made by splitter, excluding files
592    * with '.temp' suffix.
593    *
594    * @param fs
595    * @param regiondir
596    * @return Files in passed <code>regiondir</code> as a sorted set.
597    * @throws IOException
598    */
599   public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
600       final Path regiondir) throws IOException {
601     NavigableSet<Path> filesSorted = new TreeSet<Path>();
602     Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
603     if (!fs.exists(editsdir))
604       return filesSorted;
605     FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
606       @Override
607       public boolean accept(Path p) {
608         boolean result = false;
609         try {
610           // Return files and only files that match the editfile names pattern.
611           // There can be other files in this directory other than edit files.
612           // In particular, on error, we'll move aside the bad edit file giving
613           // it a timestamp suffix. See moveAsideBadEditsFile.
614           Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
615           result = fs.isFile(p) && m.matches();
616           // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
617           // because it means splitwal thread is writting this file.
618           if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
619             result = false;
620           }
621           // Skip SeqId Files
622           if (isSequenceIdFile(p)) {
623             result = false;
624           }
625         } catch (IOException e) {
626           LOG.warn("Failed isFile check on " + p);
627         }
628         return result;
629       }
630     });
631     if (files == null) {
632       return filesSorted;
633     }
634     for (FileStatus status : files) {
635       filesSorted.add(status.getPath());
636     }
637     return filesSorted;
638   }
639 
640   /**
641    * Move aside a bad edits file.
642    *
643    * @param fs
644    * @param edits
645    *          Edits file to move aside.
646    * @return The name of the moved aside file.
647    * @throws IOException
648    */
649   public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
650       throws IOException {
651     Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
652         + System.currentTimeMillis());
653     if (!fs.rename(edits, moveAsideName)) {
654       LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
655     }
656     return moveAsideName;
657   }
658 
659   private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
660   private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
661   private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
662 
663   /**
664    * Is the given file a region open sequence id file.
665    */
666   @VisibleForTesting
667   public static boolean isSequenceIdFile(final Path file) {
668     return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
669         || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
670   }
671 
672   /**
673    * Create a file with name as region open sequence id
674    * @param fs
675    * @param regiondir
676    * @param newSeqId
677    * @param saftyBumper
678    * @return long new sequence Id value
679    * @throws IOException
680    */
681   public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
682       long newSeqId, long saftyBumper) throws IOException {
683 
684     Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
685     long maxSeqId = 0;
686     FileStatus[] files = null;
687     if (fs.exists(editsdir)) {
688       files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
689         @Override
690         public boolean accept(Path p) {
691           return isSequenceIdFile(p);
692         }
693       });
694       if (files != null) {
695         for (FileStatus status : files) {
696           String fileName = status.getPath().getName();
697           try {
698             Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
699                 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
700             maxSeqId = Math.max(tmpSeqId, maxSeqId);
701           } catch (NumberFormatException ex) {
702             LOG.warn("Invalid SeqId File Name=" + fileName);
703           }
704         }
705       }
706     }
707     if (maxSeqId > newSeqId) {
708       newSeqId = maxSeqId;
709     }
710     newSeqId += saftyBumper; // bump up SeqId
711 
712     // write a new seqId file
713     Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
714     if (newSeqId != maxSeqId) {
715       try {
716         if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
717           throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
718         }
719         if (LOG.isDebugEnabled()) {
720           LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
721               + ", maxSeqId=" + maxSeqId);
722         }
723       } catch (FileAlreadyExistsException ignored) {
724         // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
725       }
726     }
727     // remove old ones
728     if (files != null) {
729       for (FileStatus status : files) {
730         if (newSeqIdFile.equals(status.getPath())) {
731           continue;
732         }
733         fs.delete(status.getPath(), false);
734       }
735     }
736     return newSeqId;
737   }
738 
739   /**
740    * Create a new {@link Reader} for reading logs to split.
741    *
742    * @param file
743    * @return A new Reader instance, caller should close
744    * @throws IOException
745    * @throws CorruptedLogFileException
746    */
747   protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
748       throws IOException, CorruptedLogFileException {
749     Path path = file.getPath();
750     long length = file.getLen();
751     Reader in;
752 
753     // Check for possibly empty file. With appends, currently Hadoop reports a
754     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
755     // HDFS-878 is committed.
756     if (length <= 0) {
757       LOG.warn("File " + path + " might be still open, length is 0");
758     }
759 
760     try {
761       FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
762       try {
763         in = getReader(path, reporter);
764       } catch (EOFException e) {
765         if (length <= 0) {
766           // TODO should we ignore an empty, not-last log file if skip.errors
767           // is false? Either way, the caller should decide what to do. E.g.
768           // ignore if this is the last log in sequence.
769           // TODO is this scenario still possible if the log has been
770           // recovered (i.e. closed)
771           LOG.warn("Could not open " + path + " for reading. File is empty", e);
772           return null;
773         } else {
774           // EOFException being ignored
775           return null;
776         }
777       }
778     } catch (IOException e) {
779       if (e instanceof FileNotFoundException) {
780         // A wal file may not exist anymore. Nothing can be recovered so move on
781         LOG.warn("File " + path + " doesn't exist anymore.", e);
782         return null;
783       }
784       if (!skipErrors || e instanceof InterruptedIOException) {
785         throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
786       }
787       CorruptedLogFileException t =
788         new CorruptedLogFileException("skipErrors=true Could not open wal " +
789             path + " ignoring");
790       t.initCause(e);
791       throw t;
792     }
793     return in;
794   }
795 
796   static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
797   throws CorruptedLogFileException, IOException {
798     try {
799       return in.next();
800     } catch (EOFException eof) {
801       // truncated files are expected if a RS crashes (see HBASE-2643)
802       LOG.info("EOF from wal " + path + ".  continuing");
803       return null;
804     } catch (IOException e) {
805       // If the IOE resulted from bad file format,
806       // then this problem is idempotent and retrying won't help
807       if (e.getCause() != null &&
808           (e.getCause() instanceof ParseException ||
809            e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
810         LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
811            + path + ".  continuing");
812         return null;
813       }
814       if (!skipErrors) {
815         throw e;
816       }
817       CorruptedLogFileException t =
818         new CorruptedLogFileException("skipErrors=true Ignoring exception" +
819             " while parsing wal " + path + ". Marking as corrupted");
820       t.initCause(e);
821       throw t;
822     }
823   }
824 
825   /**
826    * Create a new {@link Writer} for writing log splits.
827    * @return a new Writer instance, caller should close
828    */
829   protected Writer createWriter(Path logfile)
830       throws IOException {
831     return walFactory.createRecoveredEditsWriter(fs, logfile);
832   }
833 
834   /**
835    * Create a new {@link Reader} for reading logs to split.
836    * @return new Reader instance, caller should close
837    */
838   protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
839     return walFactory.createReader(fs, curLogFile, reporter);
840   }
841 
842   /**
843    * Get current open writers
844    */
845   private int getNumOpenWriters() {
846     int result = 0;
847     if (this.outputSink != null) {
848       result += this.outputSink.getNumOpenWriters();
849     }
850     return result;
851   }
852 
853   /**
854    * Contains some methods to control WAL-entries producer / consumer interactions
855    */
856   public static class PipelineController {
857     // If an exception is thrown by one of the other threads, it will be
858     // stored here.
859     AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
860 
861     // Wait/notify for when data has been produced by the writer thread,
862     // consumed by the reader thread, or an exception occurred
863     public final Object dataAvailable = new Object();
864 
865     void writerThreadError(Throwable t) {
866       thrown.compareAndSet(null, t);
867     }
868 
869     /**
870      * Check for errors in the writer threads. If any is found, rethrow it.
871      */
872     void checkForErrors() throws IOException {
873       Throwable thrown = this.thrown.get();
874       if (thrown == null) return;
875       if (thrown instanceof IOException) {
876         throw new IOException(thrown);
877       } else {
878         throw new RuntimeException(thrown);
879       }
880     }
881   }
882 
883   /**
884    * Class which accumulates edits and separates them into a buffer per region
885    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
886    * a predefined threshold.
887    *
888    * Writer threads then pull region-specific buffers from this class.
889    */
890   public static class EntryBuffers {
891     PipelineController controller;
892 
893     Map<byte[], RegionEntryBuffer> buffers =
894       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
895 
896     /* Track which regions are currently in the middle of writing. We don't allow
897        an IO thread to pick up bytes from a region if we're already writing
898        data for that region in a different IO thread. */
899     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
900 
901     long totalBuffered = 0;
902     long maxHeapUsage;
903 
904     public EntryBuffers(PipelineController controller, long maxHeapUsage) {
905       this.controller = controller;
906       this.maxHeapUsage = maxHeapUsage;
907     }
908 
909     /**
910      * Append a log entry into the corresponding region buffer.
911      * Blocks if the total heap usage has crossed the specified threshold.
912      *
913      * @throws InterruptedException
914      * @throws IOException
915      */
916     public void appendEntry(Entry entry) throws InterruptedException, IOException {
917       WALKey key = entry.getKey();
918 
919       RegionEntryBuffer buffer;
920       long incrHeap;
921       synchronized (this) {
922         buffer = buffers.get(key.getEncodedRegionName());
923         if (buffer == null) {
924           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
925           buffers.put(key.getEncodedRegionName(), buffer);
926         }
927         incrHeap= buffer.appendEntry(entry);
928       }
929 
930       // If we crossed the chunk threshold, wait for more space to be available
931       synchronized (controller.dataAvailable) {
932         totalBuffered += incrHeap;
933         while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
934           LOG.debug("Used " + totalBuffered +
935               " bytes of buffered edits, waiting for IO threads...");
936           controller.dataAvailable.wait(2000);
937         }
938         controller.dataAvailable.notifyAll();
939       }
940       controller.checkForErrors();
941     }
942 
943     /**
944      * @return RegionEntryBuffer a buffer of edits to be written or replayed.
945      */
946     synchronized RegionEntryBuffer getChunkToWrite() {
947       long biggestSize = 0;
948       byte[] biggestBufferKey = null;
949 
950       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
951         long size = entry.getValue().heapSize();
952         if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
953           biggestSize = size;
954           biggestBufferKey = entry.getKey();
955         }
956       }
957       if (biggestBufferKey == null) {
958         return null;
959       }
960 
961       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
962       currentlyWriting.add(biggestBufferKey);
963       return buffer;
964     }
965 
966     void doneWriting(RegionEntryBuffer buffer) {
967       synchronized (this) {
968         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
969         assert removed;
970       }
971       long size = buffer.heapSize();
972 
973       synchronized (controller.dataAvailable) {
974         totalBuffered -= size;
975         // We may unblock writers
976         controller.dataAvailable.notifyAll();
977       }
978     }
979 
980     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
981       return currentlyWriting.contains(region);
982     }
983 
984     public void waitUntilDrained() {
985       synchronized (controller.dataAvailable) {
986         while (totalBuffered > 0) {
987           try {
988             controller.dataAvailable.wait(2000);
989           } catch (InterruptedException e) {
990             LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
991             Thread.interrupted();
992             break;
993           }
994         }
995       }
996     }
997   }
998 
999   /**
1000    * A buffer of some number of edits for a given region.
1001    * This accumulates edits and also provides a memory optimization in order to
1002    * share a single byte array instance for the table and region name.
1003    * Also tracks memory usage of the accumulated edits.
1004    */
1005   public static class RegionEntryBuffer implements HeapSize {
1006     long heapInBuffer = 0;
1007     List<Entry> entryBuffer;
1008     TableName tableName;
1009     byte[] encodedRegionName;
1010 
1011     RegionEntryBuffer(TableName tableName, byte[] region) {
1012       this.tableName = tableName;
1013       this.encodedRegionName = region;
1014       this.entryBuffer = new LinkedList<Entry>();
1015     }
1016 
1017     long appendEntry(Entry entry) {
1018       internify(entry);
1019       entryBuffer.add(entry);
1020       long incrHeap = entry.getEdit().heapSize() +
1021         ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
1022         0; // TODO linkedlist entry
1023       heapInBuffer += incrHeap;
1024       return incrHeap;
1025     }
1026 
1027     private void internify(Entry entry) {
1028       WALKey k = entry.getKey();
1029       k.internTableName(this.tableName);
1030       k.internEncodedRegionName(this.encodedRegionName);
1031     }
1032 
1033     @Override
1034     public long heapSize() {
1035       return heapInBuffer;
1036     }
1037 
1038     public byte[] getEncodedRegionName() {
1039       return encodedRegionName;
1040     }
1041 
1042     public List<Entry> getEntryBuffer() {
1043       return entryBuffer;
1044     }
1045 
1046     public TableName getTableName() {
1047       return tableName;
1048     }
1049   }
1050 
1051   public static class WriterThread extends Thread {
1052     private volatile boolean shouldStop = false;
1053     private PipelineController controller;
1054     private EntryBuffers entryBuffers;
1055     private OutputSink outputSink = null;
1056 
1057     WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1058       super(Thread.currentThread().getName() + "-Writer-" + i);
1059       this.controller = controller;
1060       this.entryBuffers = entryBuffers;
1061       outputSink = sink;
1062     }
1063 
1064     @Override
1065     public void run()  {
1066       try {
1067         doRun();
1068       } catch (Throwable t) {
1069         LOG.error("Exiting thread", t);
1070         controller.writerThreadError(t);
1071       }
1072     }
1073 
1074     private void doRun() throws IOException {
1075       if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
1076       while (true) {
1077         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1078         if (buffer == null) {
1079           // No data currently available, wait on some more to show up
1080           synchronized (controller.dataAvailable) {
1081             if (shouldStop && !this.outputSink.flush()) {
1082               return;
1083             }
1084             try {
1085               controller.dataAvailable.wait(500);
1086             } catch (InterruptedException ie) {
1087               if (!shouldStop) {
1088                 throw new RuntimeException(ie);
1089               }
1090             }
1091           }
1092           continue;
1093         }
1094 
1095         assert buffer != null;
1096         try {
1097           writeBuffer(buffer);
1098         } finally {
1099           entryBuffers.doneWriting(buffer);
1100         }
1101       }
1102     }
1103 
1104     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1105       outputSink.append(buffer);
1106     }
1107 
1108     void finish() {
1109       synchronized (controller.dataAvailable) {
1110         shouldStop = true;
1111         controller.dataAvailable.notifyAll();
1112       }
1113     }
1114   }
1115 
1116   /**
1117    * The following class is an abstraction class to provide a common interface to support both
1118    * existing recovered edits file sink and region server WAL edits replay sink
1119    */
1120   public static abstract class OutputSink {
1121 
1122     protected PipelineController controller;
1123     protected EntryBuffers entryBuffers;
1124 
1125     protected Map<byte[], SinkWriter> writers = Collections
1126         .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1127 
1128     protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1129         .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1130 
1131     protected final List<WriterThread> writerThreads = Lists.newArrayList();
1132 
1133     /* Set of regions which we've decided should not output edits */
1134     protected final Set<byte[]> blacklistedRegions = Collections
1135         .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1136 
1137     protected boolean closeAndCleanCompleted = false;
1138 
1139     protected boolean writersClosed = false;
1140 
1141     protected final int numThreads;
1142 
1143     protected CancelableProgressable reporter = null;
1144 
1145     protected AtomicLong skippedEdits = new AtomicLong();
1146 
1147     protected List<Path> splits = null;
1148 
1149     public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1150       numThreads = numWriters;
1151       this.controller = controller;
1152       this.entryBuffers = entryBuffers;
1153     }
1154 
1155     void setReporter(CancelableProgressable reporter) {
1156       this.reporter = reporter;
1157     }
1158 
1159     /**
1160      * Start the threads that will pump data from the entryBuffers to the output files.
1161      */
1162     public synchronized void startWriterThreads() {
1163       for (int i = 0; i < numThreads; i++) {
1164         WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1165         t.start();
1166         writerThreads.add(t);
1167       }
1168     }
1169 
1170     /**
1171      *
1172      * Update region's maximum edit log SeqNum.
1173      */
1174     void updateRegionMaximumEditLogSeqNum(Entry entry) {
1175       synchronized (regionMaximumEditLogSeqNum) {
1176         Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1177             .getEncodedRegionName());
1178         if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1179           regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1180               .getLogSeqNum());
1181         }
1182       }
1183     }
1184 
1185     Long getRegionMaximumEditLogSeqNum(byte[] region) {
1186       return regionMaximumEditLogSeqNum.get(region);
1187     }
1188 
1189     /**
1190      * @return the number of currently opened writers
1191      */
1192     int getNumOpenWriters() {
1193       return this.writers.size();
1194     }
1195 
1196     long getSkippedEdits() {
1197       return this.skippedEdits.get();
1198     }
1199 
1200     /**
1201      * Wait for writer threads to dump all info to the sink
1202      * @return true when there is no error
1203      * @throws IOException
1204      */
1205     protected boolean finishWriting(boolean interrupt) throws IOException {
1206       LOG.debug("Waiting for split writer threads to finish");
1207       boolean progress_failed = false;
1208       for (WriterThread t : writerThreads) {
1209         t.finish();
1210       }
1211       if (interrupt) {
1212         for (WriterThread t : writerThreads) {
1213           t.interrupt(); // interrupt the writer threads. We are stopping now.
1214         }
1215       }
1216 
1217       for (WriterThread t : writerThreads) {
1218         if (!progress_failed && reporter != null && !reporter.progress()) {
1219           progress_failed = true;
1220         }
1221         try {
1222           t.join();
1223         } catch (InterruptedException ie) {
1224           IOException iie = new InterruptedIOException();
1225           iie.initCause(ie);
1226           throw iie;
1227         }
1228       }
1229       controller.checkForErrors();
1230       LOG.info(this.writerThreads.size() + " split writers finished; closing...");
1231       return (!progress_failed);
1232     }
1233 
1234     public abstract List<Path> finishWritingAndClose() throws IOException;
1235 
1236     /**
1237      * @return a map from encoded region ID to the number of edits written out for that region.
1238      */
1239     public abstract Map<byte[], Long> getOutputCounts();
1240 
1241     /**
1242      * @return number of regions we've recovered
1243      */
1244     public abstract int getNumberOfRecoveredRegions();
1245 
1246     /**
1247      * @param buffer A WAL Edit Entry
1248      * @throws IOException
1249      */
1250     public abstract void append(RegionEntryBuffer buffer) throws IOException;
1251 
1252     /**
1253      * WriterThread call this function to help flush internal remaining edits in buffer before close
1254      * @return true when underlying sink has something to flush
1255      */
1256     public boolean flush() throws IOException {
1257       return false;
1258     }
1259   }
1260 
1261   /**
1262    * Class that manages the output streams from the log splitting process.
1263    */
1264   class LogRecoveredEditsOutputSink extends OutputSink {
1265 
1266     public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1267         int numWriters) {
1268       // More threads could potentially write faster at the expense
1269       // of causing more disk seeks as the logs are split.
1270       // 3. After a certain setting (probably around 3) the
1271       // process will be bound on the reader in the current
1272       // implementation anyway.
1273       super(controller, entryBuffers, numWriters);
1274     }
1275 
1276     /**
1277      * @return null if failed to report progress
1278      * @throws IOException
1279      */
1280     @Override
1281     public List<Path> finishWritingAndClose() throws IOException {
1282       boolean isSuccessful = false;
1283       List<Path> result = null;
1284       try {
1285         isSuccessful = finishWriting(false);
1286       } finally {
1287         result = close();
1288         List<IOException> thrown = closeLogWriters(null);
1289         if (thrown != null && !thrown.isEmpty()) {
1290           throw MultipleIOException.createIOException(thrown);
1291         }
1292       }
1293       if (isSuccessful) {
1294         splits = result;
1295       }
1296       return splits;
1297     }
1298 
1299     /**
1300      * Close all of the output streams.
1301      * @return the list of paths written.
1302      */
1303     private List<Path> close() throws IOException {
1304       Preconditions.checkState(!closeAndCleanCompleted);
1305 
1306       final List<Path> paths = new ArrayList<Path>();
1307       final List<IOException> thrown = Lists.newArrayList();
1308       ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1309         TimeUnit.SECONDS, new ThreadFactory() {
1310           private int count = 1;
1311 
1312           @Override
1313           public Thread newThread(Runnable r) {
1314             Thread t = new Thread(r, "split-log-closeStream-" + count++);
1315             return t;
1316           }
1317         });
1318       CompletionService<Void> completionService =
1319         new ExecutorCompletionService<Void>(closeThreadPool);
1320       for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1321         if (LOG.isTraceEnabled()) {
1322           LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1323         }
1324         completionService.submit(new Callable<Void>() {
1325           @Override
1326           public Void call() throws Exception {
1327             WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1328             if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
1329             try {
1330               wap.w.close();
1331             } catch (IOException ioe) {
1332               LOG.error("Couldn't close log at " + wap.p, ioe);
1333               thrown.add(ioe);
1334               return null;
1335             }
1336             if (LOG.isDebugEnabled()) {
1337               LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1338                 + " edits, skipped " + wap.editsSkipped + " edits in "
1339                 + (wap.nanosSpent / 1000 / 1000) + "ms");
1340             }
1341             if (wap.editsWritten == 0) {
1342               // just remove the empty recovered.edits file
1343               if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1344                 LOG.warn("Failed deleting empty " + wap.p);
1345                 throw new IOException("Failed deleting empty  " + wap.p);
1346               }
1347               return null;
1348             }
1349 
1350             Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1351               regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1352             try {
1353               if (!dst.equals(wap.p) && fs.exists(dst)) {
1354                 LOG.warn("Found existing old edits file. It could be the "
1355                     + "result of a previous failed split attempt. Deleting " + dst + ", length="
1356                     + fs.getFileStatus(dst).getLen());
1357                 if (!fs.delete(dst, false)) {
1358                   LOG.warn("Failed deleting of old " + dst);
1359                   throw new IOException("Failed deleting of old " + dst);
1360                 }
1361               }
1362               // Skip the unit tests which create a splitter that reads and
1363               // writes the data without touching disk.
1364               // TestHLogSplit#testThreading is an example.
1365               if (fs.exists(wap.p)) {
1366                 if (!fs.rename(wap.p, dst)) {
1367                   throw new IOException("Failed renaming " + wap.p + " to " + dst);
1368                 }
1369                 LOG.info("Rename " + wap.p + " to " + dst);
1370               }
1371             } catch (IOException ioe) {
1372               LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1373               thrown.add(ioe);
1374               return null;
1375             }
1376             paths.add(dst);
1377             return null;
1378           }
1379         });
1380       }
1381 
1382       boolean progress_failed = false;
1383       try {
1384         for (int i = 0, n = this.writers.size(); i < n; i++) {
1385           Future<Void> future = completionService.take();
1386           future.get();
1387           if (!progress_failed && reporter != null && !reporter.progress()) {
1388             progress_failed = true;
1389           }
1390         }
1391       } catch (InterruptedException e) {
1392         IOException iie = new InterruptedIOException();
1393         iie.initCause(e);
1394         throw iie;
1395       } catch (ExecutionException e) {
1396         throw new IOException(e.getCause());
1397       } finally {
1398         closeThreadPool.shutdownNow();
1399       }
1400 
1401       if (!thrown.isEmpty()) {
1402         throw MultipleIOException.createIOException(thrown);
1403       }
1404       writersClosed = true;
1405       closeAndCleanCompleted = true;
1406       if (progress_failed) {
1407         return null;
1408       }
1409       return paths;
1410     }
1411 
1412     private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1413       if (writersClosed) {
1414         return thrown;
1415       }
1416 
1417       if (thrown == null) {
1418         thrown = Lists.newArrayList();
1419       }
1420       try {
1421         for (WriterThread t : writerThreads) {
1422           while (t.isAlive()) {
1423             t.shouldStop = true;
1424             t.interrupt();
1425             try {
1426               t.join(10);
1427             } catch (InterruptedException e) {
1428               IOException iie = new InterruptedIOException();
1429               iie.initCause(e);
1430               throw iie;
1431             }
1432           }
1433         }
1434       } finally {
1435         synchronized (writers) {
1436           WriterAndPath wap = null;
1437           for (SinkWriter tmpWAP : writers.values()) {
1438             try {
1439               wap = (WriterAndPath) tmpWAP;
1440               wap.w.close();
1441             } catch (IOException ioe) {
1442               LOG.error("Couldn't close log at " + wap.p, ioe);
1443               thrown.add(ioe);
1444               continue;
1445             }
1446             LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1447                 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1448           }
1449         }
1450         writersClosed = true;
1451       }
1452 
1453       return thrown;
1454     }
1455 
1456     /**
1457      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
1458      * long as multiple threads are always acting on different regions.
1459      * @return null if this region shouldn't output any logs
1460      */
1461     private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1462       byte region[] = entry.getKey().getEncodedRegionName();
1463       WriterAndPath ret = (WriterAndPath) writers.get(region);
1464       if (ret != null) {
1465         return ret;
1466       }
1467       // If we already decided that this region doesn't get any output
1468       // we don't need to check again.
1469       if (blacklistedRegions.contains(region)) {
1470         return null;
1471       }
1472       ret = createWAP(region, entry, rootDir);
1473       if (ret == null) {
1474         blacklistedRegions.add(region);
1475         return null;
1476       }
1477       writers.put(region, ret);
1478       return ret;
1479     }
1480 
1481     /**
1482      * @return a path with a write for that path. caller should close.
1483      */
1484     private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1485       Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1486       if (regionedits == null) {
1487         return null;
1488       }
1489       if (fs.exists(regionedits)) {
1490         LOG.warn("Found old edits file. It could be the "
1491             + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1492             + fs.getFileStatus(regionedits).getLen());
1493         if (!fs.delete(regionedits, false)) {
1494           LOG.warn("Failed delete of old " + regionedits);
1495         }
1496       }
1497       Writer w = createWriter(regionedits);
1498       LOG.debug("Creating writer path=" + regionedits);
1499       return new WriterAndPath(regionedits, w);
1500     }
1501 
1502     private void filterCellByStore(Entry logEntry) {
1503       Map<byte[], Long> maxSeqIdInStores =
1504           regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1505       if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1506         return;
1507       }
1508       List<Cell> skippedCells = new ArrayList<Cell>();
1509       for (Cell cell : logEntry.getEdit().getCells()) {
1510         if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1511           byte[] family = CellUtil.cloneFamily(cell);
1512           Long maxSeqId = maxSeqIdInStores.get(family);
1513           // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
1514           // or the master was crashed before and we can not get the information.
1515           if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) {
1516             skippedCells.add(cell);
1517           }
1518         }
1519       }
1520       if (!skippedCells.isEmpty()) {
1521         logEntry.getEdit().getCells().removeAll(skippedCells);
1522       }
1523     }
1524 
1525     @Override
1526     public void append(RegionEntryBuffer buffer) throws IOException {
1527       List<Entry> entries = buffer.entryBuffer;
1528       if (entries.isEmpty()) {
1529         LOG.warn("got an empty buffer, skipping");
1530         return;
1531       }
1532 
1533       WriterAndPath wap = null;
1534 
1535       long startTime = System.nanoTime();
1536       try {
1537         int editsCount = 0;
1538 
1539         for (Entry logEntry : entries) {
1540           if (wap == null) {
1541             wap = getWriterAndPath(logEntry);
1542             if (wap == null) {
1543               if (LOG.isDebugEnabled()) {
1544                 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1545               }
1546               return;
1547             }
1548           }
1549           filterCellByStore(logEntry);
1550           if (!logEntry.getEdit().isEmpty()) {
1551             wap.w.append(logEntry);
1552             this.updateRegionMaximumEditLogSeqNum(logEntry);
1553             editsCount++;
1554           } else {
1555             wap.incrementSkippedEdits(1);
1556           }
1557         }
1558         // Pass along summary statistics
1559         wap.incrementEdits(editsCount);
1560         wap.incrementNanoTime(System.nanoTime() - startTime);
1561       } catch (IOException e) {
1562           e = e instanceof RemoteException ?
1563                   ((RemoteException)e).unwrapRemoteException() : e;
1564         LOG.fatal(" Got while writing log entry to log", e);
1565         throw e;
1566       }
1567     }
1568 
1569     /**
1570      * @return a map from encoded region ID to the number of edits written out for that region.
1571      */
1572     @Override
1573     public Map<byte[], Long> getOutputCounts() {
1574       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1575       synchronized (writers) {
1576         for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1577           ret.put(entry.getKey(), entry.getValue().editsWritten);
1578         }
1579       }
1580       return ret;
1581     }
1582 
1583     @Override
1584     public int getNumberOfRecoveredRegions() {
1585       return writers.size();
1586     }
1587   }
1588 
1589   /**
1590    * Class wraps the actual writer which writes data out and related statistics
1591    */
1592   public abstract static class SinkWriter {
1593     /* Count of edits written to this path */
1594     long editsWritten = 0;
1595     /* Count of edits skipped to this path */
1596     long editsSkipped = 0;
1597     /* Number of nanos spent writing to this log */
1598     long nanosSpent = 0;
1599 
1600     void incrementEdits(int edits) {
1601       editsWritten += edits;
1602     }
1603 
1604     void incrementSkippedEdits(int skipped) {
1605       editsSkipped += skipped;
1606     }
1607 
1608     void incrementNanoTime(long nanos) {
1609       nanosSpent += nanos;
1610     }
1611   }
1612 
1613   /**
1614    * Private data structure that wraps a Writer and its Path, also collecting statistics about the
1615    * data written to this output.
1616    */
1617   private final static class WriterAndPath extends SinkWriter {
1618     final Path p;
1619     final Writer w;
1620 
1621     WriterAndPath(final Path p, final Writer w) {
1622       this.p = p;
1623       this.w = w;
1624     }
1625   }
1626 
1627   /**
1628    * Class that manages to replay edits from WAL files directly to assigned fail over region servers
1629    */
1630   class LogReplayOutputSink extends OutputSink {
1631     private static final double BUFFER_THRESHOLD = 0.35;
1632     private static final String KEY_DELIMITER = "#";
1633 
1634     private long waitRegionOnlineTimeOut;
1635     private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1636     private final Map<String, RegionServerWriter> writers =
1637         new ConcurrentHashMap<String, RegionServerWriter>();
1638     // online encoded region name -> region location map
1639     private final Map<String, HRegionLocation> onlineRegions =
1640         new ConcurrentHashMap<String, HRegionLocation>();
1641 
1642     private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1643         .synchronizedMap(new TreeMap<TableName, HConnection>());
1644     /**
1645      * Map key -> value layout
1646      * <servername>:<table name> -> Queue<Row>
1647      */
1648     private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1649         new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1650     private List<Throwable> thrown = new ArrayList<Throwable>();
1651 
1652     // The following sink is used in distrubitedLogReplay mode for entries of regions in a disabling
1653     // table. It's a limitation of distributedLogReplay. Because log replay needs a region is
1654     // assigned and online before it can replay wal edits while regions of disabling/disabled table
1655     // won't be assigned by AM. We can retire this code after HBASE-8234.
1656     private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1657     private boolean hasEditsInDisablingOrDisabledTables = false;
1658 
1659     public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1660         int numWriters) {
1661       super(controller, entryBuffers, numWriters);
1662       this.waitRegionOnlineTimeOut =
1663           conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1664             ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1665       this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1666         entryBuffers, numWriters);
1667       this.logRecoveredEditsOutputSink.setReporter(reporter);
1668     }
1669 
1670     @Override
1671     public void append(RegionEntryBuffer buffer) throws IOException {
1672       List<Entry> entries = buffer.entryBuffer;
1673       if (entries.isEmpty()) {
1674         LOG.warn("got an empty buffer, skipping");
1675         return;
1676       }
1677 
1678       // check if current region in a disabling or disabled table
1679       if (isTableDisabledOrDisabling(buffer.tableName)) {
1680         // need fall back to old way
1681         logRecoveredEditsOutputSink.append(buffer);
1682         hasEditsInDisablingOrDisabledTables = true;
1683         // store regions we have recovered so far
1684         addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1685         return;
1686       }
1687 
1688       // group entries by region servers
1689       groupEditsByServer(entries);
1690 
1691       // process workitems
1692       String maxLocKey = null;
1693       int maxSize = 0;
1694       List<Pair<HRegionLocation, Entry>> maxQueue = null;
1695       synchronized (this.serverToBufferQueueMap) {
1696         for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
1697             this.serverToBufferQueueMap.entrySet()) {
1698           List<Pair<HRegionLocation, Entry>> curQueue = entry.getValue();
1699           if (curQueue.size() > maxSize) {
1700             maxSize = curQueue.size();
1701             maxQueue = curQueue;
1702             maxLocKey = entry.getKey();
1703           }
1704         }
1705         if (maxSize < minBatchSize
1706             && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1707           // buffer more to process
1708           return;
1709         } else if (maxSize > 0) {
1710           this.serverToBufferQueueMap.remove(maxLocKey);
1711         }
1712       }
1713 
1714       if (maxSize > 0) {
1715         processWorkItems(maxLocKey, maxQueue);
1716       }
1717     }
1718 
1719     private void addToRecoveredRegions(String encodedRegionName) {
1720       if (!recoveredRegions.contains(encodedRegionName)) {
1721         recoveredRegions.add(encodedRegionName);
1722       }
1723     }
1724 
1725     /**
1726      * Helper function to group WALEntries to individual region servers
1727      * @throws IOException
1728      */
1729     private void groupEditsByServer(List<Entry> entries) throws IOException {
1730       Set<TableName> nonExistentTables = null;
1731       Long cachedLastFlushedSequenceId = -1l;
1732       for (Entry entry : entries) {
1733         WALEdit edit = entry.getEdit();
1734         TableName table = entry.getKey().getTablename();
1735         // clear scopes which isn't needed for recovery
1736         entry.getKey().setScopes(null);
1737         String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1738         // skip edits of non-existent tables
1739         if (nonExistentTables != null && nonExistentTables.contains(table)) {
1740           this.skippedEdits.incrementAndGet();
1741           continue;
1742         }
1743 
1744         Map<byte[], Long> maxStoreSequenceIds = null;
1745         boolean needSkip = false;
1746         HRegionLocation loc = null;
1747         String locKey = null;
1748         List<Cell> cells = edit.getCells();
1749         List<Cell> skippedCells = new ArrayList<Cell>();
1750         HConnection hconn = this.getConnectionByTableName(table);
1751 
1752         for (Cell cell : cells) {
1753           byte[] row = CellUtil.cloneRow(cell);
1754           byte[] family = CellUtil.cloneFamily(cell);
1755           boolean isCompactionEntry = false;
1756           if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1757             CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1758             if (compaction != null && compaction.hasRegionName()) {
1759               try {
1760                 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1761                   .toByteArray());
1762                 row = regionName[1]; // startKey of the region
1763                 family = compaction.getFamilyName().toByteArray();
1764                 isCompactionEntry = true;
1765               } catch (Exception ex) {
1766                 LOG.warn("Unexpected exception received, ignoring " + ex);
1767                 skippedCells.add(cell);
1768                 continue;
1769               }
1770             } else {
1771               skippedCells.add(cell);
1772               continue;
1773             }
1774           }
1775 
1776           try {
1777             loc =
1778                 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1779                   encodeRegionNameStr);
1780             // skip replaying the compaction if the region is gone
1781             if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1782               loc.getRegionInfo().getEncodedName())) {
1783               LOG.info("Not replaying a compaction marker for an older region: "
1784                   + encodeRegionNameStr);
1785               needSkip = true;
1786             }
1787           } catch (TableNotFoundException ex) {
1788             // table has been deleted so skip edits of the table
1789             LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1790                 + encodeRegionNameStr);
1791             lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1792             if (nonExistentTables == null) {
1793               nonExistentTables = new TreeSet<TableName>();
1794             }
1795             nonExistentTables.add(table);
1796             this.skippedEdits.incrementAndGet();
1797             needSkip = true;
1798             break;
1799           }
1800 
1801           cachedLastFlushedSequenceId =
1802               lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1803           if (cachedLastFlushedSequenceId != null
1804               && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1805             // skip the whole WAL entry
1806             this.skippedEdits.incrementAndGet();
1807             needSkip = true;
1808             break;
1809           } else {
1810             if (maxStoreSequenceIds == null) {
1811               maxStoreSequenceIds =
1812                   regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1813             }
1814             if (maxStoreSequenceIds != null) {
1815               Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1816               if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1817                 // skip current kv if column family doesn't exist anymore or already flushed
1818                 skippedCells.add(cell);
1819                 continue;
1820               }
1821             }
1822           }
1823         }
1824 
1825         // skip the edit
1826         if (loc == null || needSkip) continue;
1827 
1828         if (!skippedCells.isEmpty()) {
1829           cells.removeAll(skippedCells);
1830         }
1831 
1832         synchronized (serverToBufferQueueMap) {
1833           locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1834           List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1835           if (queue == null) {
1836             queue =
1837                 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1838             serverToBufferQueueMap.put(locKey, queue);
1839           }
1840           queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1841         }
1842         // store regions we have recovered so far
1843         addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1844       }
1845     }
1846 
1847     /**
1848      * Locate destination region based on table name & row. This function also makes sure the
1849      * destination region is online for replay.
1850      * @throws IOException
1851      */
1852     private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1853         TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1854       // fetch location from cache
1855       HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1856       if(loc != null) return loc;
1857       // fetch location from hbase:meta directly without using cache to avoid hit old dead server
1858       loc = hconn.getRegionLocation(table, row, true);
1859       if (loc == null) {
1860         throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1861             + " of table:" + table);
1862       }
1863       // check if current row moves to a different region due to region merge/split
1864       if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1865         // originalEncodedRegionName should have already flushed
1866         lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1867         HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1868         if (tmpLoc != null) return tmpLoc;
1869       }
1870 
1871       Long lastFlushedSequenceId = -1l;
1872       AtomicBoolean isRecovering = new AtomicBoolean(true);
1873       loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1874       if (!isRecovering.get()) {
1875         // region isn't in recovering at all because WAL file may contain a region that has
1876         // been moved to somewhere before hosting RS fails
1877         lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1878         LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1879             + " because it's not in recovering.");
1880       } else {
1881         Long cachedLastFlushedSequenceId =
1882             lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1883 
1884         // retrieve last flushed sequence Id from ZK. Because region postOpenDeployTasks will
1885         // update the value for the region
1886         RegionStoreSequenceIds ids =
1887             csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1888               loc.getRegionInfo().getEncodedName());
1889         if (ids != null) {
1890           lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1891           Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1892           List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1893           for (StoreSequenceId id : maxSeqIdInStores) {
1894             storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1895           }
1896           regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1897         }
1898 
1899         if (cachedLastFlushedSequenceId == null
1900             || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1901           lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1902         }
1903       }
1904 
1905       onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1906       return loc;
1907     }
1908 
1909     private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1910         throws IOException {
1911       RegionServerWriter rsw = null;
1912 
1913       long startTime = System.nanoTime();
1914       try {
1915         rsw = getRegionServerWriter(key);
1916         rsw.sink.replayEntries(actions);
1917 
1918         // Pass along summary statistics
1919         rsw.incrementEdits(actions.size());
1920         rsw.incrementNanoTime(System.nanoTime() - startTime);
1921       } catch (IOException e) {
1922         e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
1923         LOG.fatal(" Got while writing log entry to log", e);
1924         throw e;
1925       }
1926     }
1927 
1928     /**
1929      * Wait until region is online on the destination region server
1930      * @param loc
1931      * @param row
1932      * @param timeout How long to wait
1933      * @param isRecovering Recovering state of the region interested on destination region server.
1934      * @return True when region is online on the destination region server
1935      * @throws InterruptedException
1936      */
1937     private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1938         final long timeout, AtomicBoolean isRecovering)
1939         throws IOException {
1940       final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1941       final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1942         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1943       boolean reloadLocation = false;
1944       TableName tableName = loc.getRegionInfo().getTable();
1945       int tries = 0;
1946       Throwable cause = null;
1947       while (endTime > EnvironmentEdgeManager.currentTime()) {
1948         try {
1949           // Try and get regioninfo from the hosting server.
1950           HConnection hconn = getConnectionByTableName(tableName);
1951           if(reloadLocation) {
1952             loc = hconn.getRegionLocation(tableName, row, true);
1953           }
1954           BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1955           HRegionInfo region = loc.getRegionInfo();
1956           try {
1957             GetRegionInfoRequest request =
1958                 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1959             GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1960             if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1961               isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1962               return loc;
1963             }
1964           } catch (ServiceException se) {
1965             throw ProtobufUtil.getRemoteException(se);
1966           }
1967         } catch (IOException e) {
1968           cause = e.getCause();
1969           if(!(cause instanceof RegionOpeningException)) {
1970             reloadLocation = true;
1971           }
1972         }
1973         long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1974         try {
1975           Thread.sleep(expectedSleep);
1976         } catch (InterruptedException e) {
1977           throw new IOException("Interrupted when waiting region " +
1978               loc.getRegionInfo().getEncodedName() + " online.", e);
1979         }
1980         tries++;
1981       }
1982 
1983       throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1984         " online for " + timeout + " milliseconds.", cause);
1985     }
1986 
1987     @Override
1988     public boolean flush() throws IOException {
1989       String curLoc = null;
1990       int curSize = 0;
1991       List<Pair<HRegionLocation, Entry>> curQueue = null;
1992       synchronized (this.serverToBufferQueueMap) {
1993         for (Map.Entry<String, List<Pair<HRegionLocation, Entry>>> entry :
1994                 this.serverToBufferQueueMap.entrySet()) {
1995           curQueue = entry.getValue();
1996           if (!curQueue.isEmpty()) {
1997             curSize = curQueue.size();
1998             curLoc = entry.getKey();
1999             break;
2000           }
2001         }
2002         if (curSize > 0) {
2003           this.serverToBufferQueueMap.remove(curLoc);
2004         }
2005       }
2006 
2007       if (curSize > 0) {
2008         this.processWorkItems(curLoc, curQueue);
2009         // We should already have control of the monitor; ensure this is the case.
2010         synchronized(controller.dataAvailable) {
2011           controller.dataAvailable.notifyAll();
2012         }
2013         return true;
2014       }
2015       return false;
2016     }
2017 
2018     void addWriterError(Throwable t) {
2019       thrown.add(t);
2020     }
2021 
2022     @Override
2023     public List<Path> finishWritingAndClose() throws IOException {
2024       try {
2025         if (!finishWriting(false)) {
2026           return null;
2027         }
2028         if (hasEditsInDisablingOrDisabledTables) {
2029           splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2030         } else {
2031           splits = new ArrayList<Path>();
2032         }
2033         // returns an empty array in order to keep interface same as old way
2034         return splits;
2035       } finally {
2036         List<IOException> thrown = closeRegionServerWriters();
2037         if (thrown != null && !thrown.isEmpty()) {
2038           throw MultipleIOException.createIOException(thrown);
2039         }
2040       }
2041     }
2042 
2043     @Override
2044     int getNumOpenWriters() {
2045       return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2046     }
2047 
2048     private List<IOException> closeRegionServerWriters() throws IOException {
2049       List<IOException> result = null;
2050       if (!writersClosed) {
2051         result = Lists.newArrayList();
2052         try {
2053           for (WriterThread t : writerThreads) {
2054             while (t.isAlive()) {
2055               t.shouldStop = true;
2056               t.interrupt();
2057               try {
2058                 t.join(10);
2059               } catch (InterruptedException e) {
2060                 IOException iie = new InterruptedIOException();
2061                 iie.initCause(e);
2062                 throw iie;
2063               }
2064             }
2065           }
2066         } finally {
2067           synchronized (writers) {
2068             for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2069               RegionServerWriter tmpW = entry.getValue();
2070               try {
2071                 tmpW.close();
2072               } catch (IOException ioe) {
2073                 LOG.error("Couldn't close writer for region server:" + entry.getKey(), ioe);
2074                 result.add(ioe);
2075               }
2076             }
2077           }
2078 
2079           // close connections
2080           synchronized (this.tableNameToHConnectionMap) {
2081             for (Map.Entry<TableName,HConnection> entry :
2082                     this.tableNameToHConnectionMap.entrySet()) {
2083               HConnection hconn = entry.getValue();
2084               try {
2085                 hconn.clearRegionCache();
2086                 hconn.close();
2087               } catch (IOException ioe) {
2088                 result.add(ioe);
2089               }
2090             }
2091           }
2092           writersClosed = true;
2093         }
2094       }
2095       return result;
2096     }
2097 
2098     @Override
2099     public Map<byte[], Long> getOutputCounts() {
2100       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2101       synchronized (writers) {
2102         for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2103           ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2104         }
2105       }
2106       return ret;
2107     }
2108 
2109     @Override
2110     public int getNumberOfRecoveredRegions() {
2111       return this.recoveredRegions.size();
2112     }
2113 
2114     private boolean isTableDisabledOrDisabling(TableName tableName) {
2115       if (csm == null)
2116         return false; // we can't get state without CoordinatedStateManager
2117       if (tableName.isSystemTable())
2118         return false; // assume that system tables never can be disabled
2119       TableState tableState = tableStatesCache.get(tableName);
2120       if (tableState == null) {
2121         try {
2122           tableState =
2123               MetaTableAccessor.getTableState(csm.getServer().getConnection(), tableName);
2124           if (tableState != null)
2125             tableStatesCache.put(tableName, tableState);
2126         } catch (IOException e) {
2127           LOG.warn("State is not accessible for table " + tableName, e);
2128         }
2129       }
2130       return tableState != null && tableState
2131           .inStates(TableState.State.DISABLED, TableState.State.DISABLING);
2132     }
2133 
2134     /**
2135      * Get a writer and path for a log starting at the given entry. This function is threadsafe so
2136      * long as multiple threads are always acting on different regions.
2137      * @return null if this region shouldn't output any logs
2138      */
2139     private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2140       RegionServerWriter ret = writers.get(loc);
2141       if (ret != null) {
2142         return ret;
2143       }
2144 
2145       TableName tableName = getTableFromLocationStr(loc);
2146       if(tableName == null){
2147         throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2148       }
2149 
2150       HConnection hconn = getConnectionByTableName(tableName);
2151       synchronized (writers) {
2152         ret = writers.get(loc);
2153         if (ret == null) {
2154           ret = new RegionServerWriter(conf, tableName, hconn);
2155           writers.put(loc, ret);
2156         }
2157       }
2158       return ret;
2159     }
2160 
2161     private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2162       HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2163       if (hconn == null) {
2164         synchronized (this.tableNameToHConnectionMap) {
2165           hconn = this.tableNameToHConnectionMap.get(tableName);
2166           if (hconn == null) {
2167             hconn = (HConnection) ConnectionFactory.createConnection(conf);
2168             this.tableNameToHConnectionMap.put(tableName, hconn);
2169           }
2170         }
2171       }
2172       return hconn;
2173     }
2174     private TableName getTableFromLocationStr(String loc) {
2175       /**
2176        * location key is in format <server name:port>#<table name>
2177        */
2178       String[] splits = loc.split(KEY_DELIMITER);
2179       if (splits.length != 2) {
2180         return null;
2181       }
2182       return TableName.valueOf(splits[1]);
2183     }
2184   }
2185 
2186   /**
2187    * Private data structure that wraps a receiving RS and collecting statistics about the data
2188    * written to this newly assigned RS.
2189    */
2190   private final static class RegionServerWriter extends SinkWriter {
2191     final WALEditsReplaySink sink;
2192 
2193     RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2194         throws IOException {
2195       this.sink = new WALEditsReplaySink(conf, tableName, conn);
2196     }
2197 
2198     void close() throws IOException {
2199     }
2200   }
2201 
2202   static class CorruptedLogFileException extends Exception {
2203     private static final long serialVersionUID = 1L;
2204 
2205     CorruptedLogFileException(String s) {
2206       super(s);
2207     }
2208   }
2209 
2210   /** A struct used by getMutationsFromWALEntry */
2211   public static class MutationReplay {
2212     public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2213       this.type = type;
2214       this.mutation = mutation;
2215       if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2216         // using ASYNC_WAL for relay
2217         this.mutation.setDurability(Durability.ASYNC_WAL);
2218       }
2219       this.nonceGroup = nonceGroup;
2220       this.nonce = nonce;
2221     }
2222 
2223     public final MutationType type;
2224     public final Mutation mutation;
2225     public final long nonceGroup;
2226     public final long nonce;
2227   }
2228 
2229   /**
2230    * This function is used to construct mutations from a WALEntry. It also
2231    * reconstructs WALKey &amp; WALEdit from the passed in WALEntry
2232    * @param entry
2233    * @param cells
2234    * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
2235    *          extracted from the passed in WALEntry.
2236    * @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
2237    * @throws IOException
2238    */
2239   public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2240       Pair<WALKey, WALEdit> logEntry, Durability durability)
2241           throws IOException {
2242 
2243     if (entry == null) {
2244       // return an empty array
2245       return new ArrayList<MutationReplay>();
2246     }
2247 
2248     long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2249       entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2250     int count = entry.getAssociatedCellCount();
2251     List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2252     Cell previousCell = null;
2253     Mutation m = null;
2254     WALKey key = null;
2255     WALEdit val = null;
2256     if (logEntry != null) val = new WALEdit();
2257 
2258     for (int i = 0; i < count; i++) {
2259       // Throw index out of bounds if our cell count is off
2260       if (!cells.advance()) {
2261         throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2262       }
2263       Cell cell = cells.current();
2264       if (val != null) val.add(cell);
2265 
2266       boolean isNewRowOrType =
2267           previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2268               || !CellUtil.matchingRow(previousCell, cell);
2269       if (isNewRowOrType) {
2270         // Create new mutation
2271         if (CellUtil.isDelete(cell)) {
2272           m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2273           // Deletes don't have nonces.
2274           mutations.add(new MutationReplay(
2275               MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2276         } else {
2277           m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2278           // Puts might come from increment or append, thus we need nonces.
2279           long nonceGroup = entry.getKey().hasNonceGroup()
2280               ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2281           long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2282           mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2283         }
2284       }
2285       if (CellUtil.isDelete(cell)) {
2286         ((Delete) m).addDeleteMarker(cell);
2287       } else {
2288         ((Put) m).add(cell);
2289       }
2290       m.setDurability(durability);
2291       previousCell = cell;
2292     }
2293 
2294     // reconstruct WALKey
2295     if (logEntry != null) {
2296       org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2297       List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2298       for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2299         clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2300       }
2301       // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
2302       key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2303               walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2304               clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
2305       logEntry.setFirst(key);
2306       logEntry.setSecond(val);
2307     }
2308 
2309     return mutations;
2310   }
2311 }