View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.ListIterator;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.classification.InterfaceAudience;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.Server;
43  import org.apache.hadoop.hbase.ServerName;
44  import org.apache.hadoop.hbase.client.Mutation;
45  import org.apache.hadoop.hbase.client.Put;
46  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
47  import org.apache.hadoop.hbase.security.User;
48  import org.apache.hadoop.hbase.util.Bytes;
49  import org.apache.hadoop.hbase.util.CancelableProgressable;
50  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51  import org.apache.hadoop.hbase.util.FSUtils;
52  import org.apache.hadoop.hbase.util.HasThread;
53  import org.apache.hadoop.hbase.util.Pair;
54  import org.apache.hadoop.hbase.util.PairOfSameType;
55  import org.apache.zookeeper.KeeperException;
56  
57  import com.google.common.annotations.VisibleForTesting;
58  import com.google.common.util.concurrent.ThreadFactoryBuilder;
59  
60  @InterfaceAudience.Private
61  public class SplitTransactionImpl implements SplitTransaction {
62    private static final Log LOG = LogFactory.getLog(SplitTransactionImpl.class);
63  
64    /*
65     * Region to split
66     */
67    private final HRegion parent;
68    private HRegionInfo hri_a;
69    private HRegionInfo hri_b;
70    private long fileSplitTimeout = 30000;
71  
72    /*
73     * Row to split around
74     */
75    private final byte [] splitrow;
76  
77    /*
78     * Transaction state for listener, only valid during execute and
79     * rollback
80     */
81    private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED;
82    private Server server;
83    private RegionServerServices rsServices;
84  
85    public static class JournalEntryImpl implements JournalEntry {
86      private SplitTransactionPhase type;
87      private long timestamp;
88  
89      public JournalEntryImpl(SplitTransactionPhase type) {
90        this(type, EnvironmentEdgeManager.currentTime());
91      }
92  
93      public JournalEntryImpl(SplitTransactionPhase type, long timestamp) {
94        this.type = type;
95        this.timestamp = timestamp;
96      }
97  
98      @Override
99      public String toString() {
100       StringBuilder sb = new StringBuilder();
101       sb.append(type);
102       sb.append(" at ");
103       sb.append(timestamp);
104       return sb.toString();
105     }
106 
107     @Override
108     public SplitTransactionPhase getPhase() {
109       return type;
110     }
111 
112     @Override
113     public long getTimeStamp() {
114       return timestamp;
115     }
116   }
117 
118   /*
119    * Journal of how far the split transaction has progressed.
120    */
121   private final ArrayList<JournalEntry> journal = new ArrayList<JournalEntry>();
122 
123   /**
124    * Listeners
125    */
126   private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
127 
128   /**
129    * Constructor
130    * @param r Region to split
131    * @param splitrow Row to split around
132    */
133   public SplitTransactionImpl(final Region r, final byte [] splitrow) {
134     this.parent = (HRegion)r;
135     this.splitrow = splitrow;
136     this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED));
137   }
138 
139   private void transition(SplitTransactionPhase nextPhase) throws IOException {
140     transition(nextPhase, false);
141   }
142 
143   private void transition(SplitTransactionPhase nextPhase, boolean isRollback)
144       throws IOException {
145     if (!isRollback) {
146       // Add to the journal first, because if the listener throws an exception
147       // we need to roll back starting at 'nextPhase'
148       this.journal.add(new JournalEntryImpl(nextPhase));
149     }
150     for (int i = 0; i < listeners.size(); i++) {
151       TransactionListener listener = listeners.get(i);
152       if (!isRollback) {
153         listener.transition(this, currentPhase, nextPhase);
154       } else {
155         listener.rollback(this, currentPhase, nextPhase);
156       }
157     }
158     currentPhase = nextPhase;
159   }
160 
161   @Override
162   public boolean prepare() throws IOException {
163     if (!this.parent.isSplittable()) return false;
164     // Split key can be null if this region is unsplittable; i.e. has refs.
165     if (this.splitrow == null) return false;
166     HRegionInfo hri = this.parent.getRegionInfo();
167     parent.prepareToSplit();
168     // Check splitrow.
169     byte [] startKey = hri.getStartKey();
170     byte [] endKey = hri.getEndKey();
171     if (Bytes.equals(startKey, splitrow) ||
172         !this.parent.getRegionInfo().containsRow(splitrow)) {
173       LOG.info("Split row is not inside region key range or is equal to " +
174           "startkey: " + Bytes.toStringBinary(this.splitrow));
175       return false;
176     }
177     long rid = getDaughterRegionIdTimestamp(hri);
178     this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
179     this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
180 
181     transition(SplitTransactionPhase.PREPARED);
182 
183     return true;
184   }
185 
186   /**
187    * Calculate daughter regionid to use.
188    * @param hri Parent {@link HRegionInfo}
189    * @return Daughter region id (timestamp) to use.
190    */
191   private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) {
192     long rid = EnvironmentEdgeManager.currentTime();
193     // Regionid is timestamp.  Can't be less than that of parent else will insert
194     // at wrong location in hbase:meta (See HBASE-710).
195     if (rid < hri.getRegionId()) {
196       LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() +
197         " but current time here is " + rid);
198       rid = hri.getRegionId() + 1;
199     }
200     return rid;
201   }
202 
203   private static IOException closedByOtherException = new IOException(
204       "Failed to close region: already closed by another thread");
205 
206   /**
207    * Prepare the regions and region files.
208    * @param server Hosting server instance.  Can be null when testing (won't try
209    * and update in zk if a null server)
210    * @param services Used to online/offline regions.
211    * @param user
212    * @throws IOException If thrown, transaction failed.
213    *    Call {@link #rollback(Server, RegionServerServices)}
214    * @return Regions created
215    */
216   @VisibleForTesting
217   PairOfSameType<Region> createDaughters(final Server server,
218       final RegionServerServices services, User user) throws IOException {
219     LOG.info("Starting split of region " + this.parent);
220     if ((server != null && server.isStopped()) ||
221         (services != null && services.isStopping())) {
222       throw new IOException("Server is stopped or stopping");
223     }
224     assert !this.parent.lock.writeLock().isHeldByCurrentThread():
225       "Unsafe to hold write lock while performing RPCs";
226 
227     transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK);
228 
229     // Coprocessor callback
230     if (this.parent.getCoprocessorHost() != null) {
231       if (user == null) {
232         // TODO: Remove one of these
233         parent.getCoprocessorHost().preSplit();
234         parent.getCoprocessorHost().preSplit(splitrow);
235       } else {
236         try {
237           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
238             @Override
239             public Void run() throws Exception {
240               parent.getCoprocessorHost().preSplit();
241               parent.getCoprocessorHost().preSplit(splitrow);
242               return null;
243             }
244           });
245         } catch (InterruptedException ie) {
246           InterruptedIOException iioe = new InterruptedIOException();
247           iioe.initCause(ie);
248           throw iioe;
249         }
250       }
251     }
252 
253     transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK);
254 
255     // If true, no cluster to write meta edits to or to update znodes in.
256     boolean testing = server == null? true:
257         server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
258     this.fileSplitTimeout = testing ? this.fileSplitTimeout :
259         server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout",
260           this.fileSplitTimeout);
261 
262     PairOfSameType<Region> daughterRegions = stepsBeforePONR(server, services, testing);
263 
264     final List<Mutation> metaEntries = new ArrayList<Mutation>();
265     boolean ret = false;
266     if (this.parent.getCoprocessorHost() != null) {
267       if (user == null) {
268         ret = parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
269       } else {
270         try {
271           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
272             @Override
273             public Boolean run() throws Exception {
274               return parent.getCoprocessorHost().preSplitBeforePONR(splitrow, metaEntries);
275             }
276           });
277         } catch (InterruptedException ie) {
278           InterruptedIOException iioe = new InterruptedIOException();
279           iioe.initCause(ie);
280           throw iioe;
281         }
282       }
283       if (ret) {
284           throw new IOException("Coprocessor bypassing region "
285             + parent.getRegionInfo().getRegionNameAsString() + " split.");
286       }
287       try {
288         for (Mutation p : metaEntries) {
289           HRegionInfo.parseRegionName(p.getRow());
290         }
291       } catch (IOException e) {
292         LOG.error("Row key of mutation from coprossor is not parsable as region name."
293             + "Mutations from coprocessor should only for hbase:meta table.");
294         throw e;
295       }
296     }
297 
298     // This is the point of no return.  Adding subsequent edits to .META. as we
299     // do below when we do the daughter opens adding each to .META. can fail in
300     // various interesting ways the most interesting of which is a timeout
301     // BUT the edits all go through (See HBASE-3872).  IF we reach the PONR
302     // then subsequent failures need to crash out this regionserver; the
303     // server shutdown processing should be able to fix-up the incomplete split.
304     // The offlined parent will have the daughters as extra columns.  If
305     // we leave the daughter regions in place and do not remove them when we
306     // crash out, then they will have their references to the parent in place
307     // still and the server shutdown fixup of .META. will point to these
308     // regions.
309     // We should add PONR JournalEntry before offlineParentInMeta,so even if
310     // OfflineParentInMeta timeout,this will cause regionserver exit,and then
311     // master ServerShutdownHandler will fix daughter & avoid data loss. (See
312     // HBase-4562).
313 
314     transition(SplitTransactionPhase.PONR);
315 
316     // Edit parent in meta.  Offlines parent region and adds splita and splitb
317     // as an atomic update. See HBASE-7721. This update to META makes the region
318     // will determine whether the region is split or not in case of failures.
319     // If it is successful, master will roll-forward, if not, master will rollback
320     // and assign the parent region.
321     if (services != null && !services.reportRegionStateTransition(TransitionCode.SPLIT_PONR,
322         parent.getRegionInfo(), hri_a, hri_b)) {
323       // Passed PONR, let SSH clean it up
324       throw new IOException("Failed to notify master that split passed PONR: "
325         + parent.getRegionInfo().getRegionNameAsString());
326     }
327     return daughterRegions;
328   }
329 
330   @VisibleForTesting
331   Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
332     p.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
333             .toBytes(sn.getHostAndPort()));
334     p.addColumn(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
335             .getStartcode()));
336     p.addColumn(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
337     return p;
338   }
339 
340   @VisibleForTesting
341   public PairOfSameType<Region> stepsBeforePONR(final Server server,
342       final RegionServerServices services, boolean testing) throws IOException {
343     if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT,
344         parent.getRegionInfo(), hri_a, hri_b)) {
345       throw new IOException("Failed to get ok from master to split "
346         + parent.getRegionInfo().getRegionNameAsString());
347     }
348 
349     transition(SplitTransactionPhase.SET_SPLITTING);
350 
351     this.parent.getRegionFileSystem().createSplitsDir();
352 
353     transition(SplitTransactionPhase.CREATE_SPLIT_DIR);
354 
355     Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
356     Exception exceptionToThrow = null;
357     try{
358       hstoreFilesToSplit = this.parent.close(false);
359     } catch (Exception e) {
360       exceptionToThrow = e;
361     }
362     if (exceptionToThrow == null && hstoreFilesToSplit == null) {
363       // The region was closed by a concurrent thread.  We can't continue
364       // with the split, instead we must just abandon the split.  If we
365       // reopen or split this could cause problems because the region has
366       // probably already been moved to a different server, or is in the
367       // process of moving to a different server.
368       exceptionToThrow = closedByOtherException;
369     }
370     if (exceptionToThrow != closedByOtherException) {
371       transition(SplitTransactionPhase.CLOSED_PARENT_REGION);
372     }
373     if (exceptionToThrow != null) {
374       if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
375       throw new IOException(exceptionToThrow);
376     }
377     if (!testing) {
378       services.removeFromOnlineRegions(this.parent, null);
379     }
380 
381     transition(SplitTransactionPhase.OFFLINED_PARENT);
382 
383     // TODO: If splitStoreFiles were multithreaded would we complete steps in
384     // less elapsed time?  St.Ack 20100920
385     //
386     // splitStoreFiles creates daughter region dirs under the parent splits dir
387     // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
388     // clean this up.
389     Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
390 
391     // Log to the journal that we are creating region A, the first daughter
392     // region.  We could fail halfway through.  If we do, we could have left
393     // stuff in fs that needs cleanup -- a storefile or two.  Thats why we
394     // add entry to journal BEFORE rather than AFTER the change.
395 
396     transition(SplitTransactionPhase.STARTED_REGION_A_CREATION);
397 
398     assertReferenceFileCount(expectedReferences.getFirst(),
399         this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
400     HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
401     assertReferenceFileCount(expectedReferences.getFirst(),
402         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
403 
404     // Ditto
405 
406     transition(SplitTransactionPhase.STARTED_REGION_B_CREATION);
407 
408     assertReferenceFileCount(expectedReferences.getSecond(),
409         this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
410     HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
411     assertReferenceFileCount(expectedReferences.getSecond(),
412         new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
413 
414     return new PairOfSameType<Region>(a, b);
415   }
416 
417   @VisibleForTesting
418   void assertReferenceFileCount(int expectedReferenceFileCount, Path dir)
419       throws IOException {
420     if (expectedReferenceFileCount != 0 &&
421         expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(),
422           dir)) {
423       throw new IOException("Failing split. Expected reference file count isn't equal.");
424     }
425   }
426 
427   /**
428    * Perform time consuming opening of the daughter regions.
429    * @param server Hosting server instance.  Can be null when testing
430    * @param services Used to online/offline regions.
431    * @param a first daughter region
432    * @param a second daughter region
433    * @throws IOException If thrown, transaction failed.
434    *          Call {@link #rollback(Server, RegionServerServices)}
435    */
436   @VisibleForTesting
437   void openDaughters(final Server server, final RegionServerServices services, Region a,
438       Region b) throws IOException {
439     boolean stopped = server != null && server.isStopped();
440     boolean stopping = services != null && services.isStopping();
441     // TODO: Is this check needed here?
442     if (stopped || stopping) {
443       LOG.info("Not opening daughters " +
444           b.getRegionInfo().getRegionNameAsString() +
445           " and " +
446           a.getRegionInfo().getRegionNameAsString() +
447           " because stopping=" + stopping + ", stopped=" + stopped);
448     } else {
449       // Open daughters in parallel.
450       DaughterOpener aOpener = new DaughterOpener(server, a);
451       DaughterOpener bOpener = new DaughterOpener(server, b);
452       aOpener.start();
453       bOpener.start();
454       try {
455         aOpener.join();
456         if (aOpener.getException() == null) {
457           transition(SplitTransactionPhase.OPENED_REGION_A);
458         }
459         bOpener.join();
460         if (bOpener.getException() == null) {
461           transition(SplitTransactionPhase.OPENED_REGION_B);
462         }
463       } catch (InterruptedException e) {
464         throw (InterruptedIOException)new InterruptedIOException().initCause(e);
465       }
466       if (aOpener.getException() != null) {
467         throw new IOException("Failed " +
468           aOpener.getName(), aOpener.getException());
469       }
470       if (bOpener.getException() != null) {
471         throw new IOException("Failed " +
472           bOpener.getName(), bOpener.getException());
473       }
474       if (services != null) {
475         if (!services.reportRegionStateTransition(TransitionCode.SPLIT,
476             parent.getRegionInfo(), hri_a, hri_b)) {
477           throw new IOException("Failed to report split region to master: "
478             + parent.getRegionInfo().getShortNameToLog());
479         }
480         // Should add it to OnlineRegions
481         services.addToOnlineRegions(b);
482         services.addToOnlineRegions(a);
483       }
484     }
485   }
486 
487   @Override
488   public PairOfSameType<Region> execute(final Server server,
489     final RegionServerServices services)
490         throws IOException {
491     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
492       LOG.warn("Should use execute(Server, RegionServerServices, User)");
493     }
494     return execute(server, services, null);
495   }
496 
497   @Override
498   public PairOfSameType<Region> execute(final Server server, final RegionServerServices services,
499     User user) throws IOException {
500     this.server = server;
501     this.rsServices = services;
502     PairOfSameType<Region> regions = createDaughters(server, services, user);
503     stepsAfterPONR(server, services, regions, user);
504     transition(SplitTransactionPhase.COMPLETED);
505     return regions;
506   }
507 
508   @VisibleForTesting
509   void stepsAfterPONR(final Server server,
510       final RegionServerServices services, final PairOfSameType<Region> regions, User user)
511       throws IOException {
512     if (this.parent.getCoprocessorHost() != null) {
513       if (user == null) {
514         parent.getCoprocessorHost().preSplitAfterPONR();
515       } else {
516         try {
517           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
518             @Override
519             public Void run() throws Exception {
520               parent.getCoprocessorHost().preSplitAfterPONR();
521               return null;
522             }
523           });
524         } catch (InterruptedException ie) {
525           InterruptedIOException iioe = new InterruptedIOException();
526           iioe.initCause(ie);
527           throw iioe;
528         }
529       }
530     }
531 
532     openDaughters(server, services, regions.getFirst(), regions.getSecond());
533 
534     transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK);
535 
536     // Coprocessor callback
537     if (parent.getCoprocessorHost() != null) {
538       if (user == null) {
539         this.parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
540       } else {
541         try {
542           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
543             @Override
544             public Void run() throws Exception {
545               parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond());
546               return null;
547             }
548           });
549         } catch (InterruptedException ie) {
550           InterruptedIOException iioe = new InterruptedIOException();
551           iioe.initCause(ie);
552           throw iioe;
553         }
554       }
555     }
556 
557     transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK);
558   }
559 
560   /*
561    * Open daughter region in its own thread.
562    * If we fail, abort this hosting server.
563    */
564   private class DaughterOpener extends HasThread {
565     private final Server server;
566     private final Region r;
567     private Throwable t = null;
568 
569     DaughterOpener(final Server s, final Region r) {
570       super((s == null? "null-services": s.getServerName()) +
571         "-daughterOpener=" + r.getRegionInfo().getEncodedName());
572       setDaemon(true);
573       this.server = s;
574       this.r = r;
575     }
576 
577     /**
578      * @return Null if open succeeded else exception that causes us fail open.
579      * Call it after this thread exits else you may get wrong view on result.
580      */
581     Throwable getException() {
582       return this.t;
583     }
584 
585     @Override
586     public void run() {
587       try {
588         openDaughterRegion(this.server, r);
589       } catch (Throwable t) {
590         this.t = t;
591       }
592     }
593   }
594 
595   /**
596    * Open daughter regions, add them to online list and update meta.
597    * @param server
598    * @param daughter
599    * @throws IOException
600    * @throws KeeperException
601    */
602   @VisibleForTesting
603   void openDaughterRegion(final Server server, final Region daughter)
604       throws IOException, KeeperException {
605     HRegionInfo hri = daughter.getRegionInfo();
606     LoggingProgressable reporter = server == null ? null
607         : new LoggingProgressable(hri, server.getConfiguration().getLong(
608             "hbase.regionserver.split.daughter.open.log.interval", 10000));
609     ((HRegion)daughter).openHRegion(reporter);
610   }
611 
612   static class LoggingProgressable implements CancelableProgressable {
613     private final HRegionInfo hri;
614     private long lastLog = -1;
615     private final long interval;
616 
617     LoggingProgressable(final HRegionInfo hri, final long interval) {
618       this.hri = hri;
619       this.interval = interval;
620     }
621 
622     @Override
623     public boolean progress() {
624       long now = EnvironmentEdgeManager.currentTime();
625       if (now - lastLog > this.interval) {
626         LOG.info("Opening " + this.hri.getRegionNameAsString());
627         this.lastLog = now;
628       }
629       return true;
630     }
631   }
632 
633   /**
634    * Creates reference files for top and bottom half of the
635    * @param hstoreFilesToSplit map of store files to create half file references for.
636    * @return the number of reference files that were created.
637    * @throws IOException
638    */
639   private Pair<Integer, Integer> splitStoreFiles(
640       final Map<byte[], List<StoreFile>> hstoreFilesToSplit)
641       throws IOException {
642     if (hstoreFilesToSplit == null) {
643       // Could be null because close didn't succeed -- for now consider it fatal
644       throw new IOException("Close returned empty list of StoreFiles");
645     }
646     // The following code sets up a thread pool executor with as many slots as
647     // there's files to split. It then fires up everything, waits for
648     // completion and finally checks for any exception
649     int nbFiles = 0;
650     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
651         nbFiles += entry.getValue().size();
652     }
653     if (nbFiles == 0) {
654       // no file needs to be splitted.
655       return new Pair<Integer, Integer>(0,0);
656     }
657     // Default max #threads to use is the smaller of table's configured number of blocking store
658     // files or the available number of logical cores.
659     int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
660                 HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
661             Runtime.getRuntime().availableProcessors());
662     // Max #threads is the smaller of the number of storefiles or the default max determined above.
663     int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX,
664                 defMaxThreads), nbFiles);
665     LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
666             " using " + maxThreads + " threads");
667     ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
668     builder.setNameFormat("StoreFileSplitter-%1$d");
669     ThreadFactory factory = builder.build();
670     ThreadPoolExecutor threadPool =
671       (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
672     List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
673 
674     // Split each store file.
675     for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
676       for (StoreFile sf: entry.getValue()) {
677         StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
678         futures.add(threadPool.submit(sfs));
679       }
680     }
681     // Shutdown the pool
682     threadPool.shutdown();
683 
684     // Wait for all the tasks to finish
685     try {
686       boolean stillRunning = !threadPool.awaitTermination(
687           this.fileSplitTimeout, TimeUnit.MILLISECONDS);
688       if (stillRunning) {
689         threadPool.shutdownNow();
690         // wait for the thread to shutdown completely.
691         while (!threadPool.isTerminated()) {
692           Thread.sleep(50);
693         }
694         throw new IOException("Took too long to split the" +
695             " files and create the references, aborting split");
696       }
697     } catch (InterruptedException e) {
698       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
699     }
700 
701     int created_a = 0;
702     int created_b = 0;
703     // Look for any exception
704     for (Future<Pair<Path, Path>> future : futures) {
705       try {
706         Pair<Path, Path> p = future.get();
707         created_a += p.getFirst() != null ? 1 : 0;
708         created_b += p.getSecond() != null ? 1 : 0;
709       } catch (InterruptedException e) {
710         throw (InterruptedIOException) new InterruptedIOException().initCause(e);
711       } catch (ExecutionException e) {
712         throw new IOException(e);
713       }
714     }
715 
716     if (LOG.isDebugEnabled()) {
717       LOG.debug("Split storefiles for region " + this.parent + " Daughter A: " + created_a
718           + " storefiles, Daughter B: " + created_b + " storefiles.");
719     }
720     return new Pair<Integer, Integer>(created_a, created_b);
721   }
722 
723   private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
724       throws IOException {
725     if (LOG.isDebugEnabled()) {
726         LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
727                   this.parent);
728     }
729     HRegionFileSystem fs = this.parent.getRegionFileSystem();
730     String familyName = Bytes.toString(family);
731     Path path_a =
732         fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
733           this.parent.getSplitPolicy());
734     Path path_b =
735         fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
736           this.parent.getSplitPolicy());
737     if (LOG.isDebugEnabled()) {
738         LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
739                   this.parent);
740     }
741     return new Pair<Path,Path>(path_a, path_b);
742   }
743 
744   /**
745    * Utility class used to do the file splitting / reference writing
746    * in parallel instead of sequentially.
747    */
748   private class StoreFileSplitter implements Callable<Pair<Path,Path>> {
749     private final byte[] family;
750     private final StoreFile sf;
751 
752     /**
753      * Constructor that takes what it needs to split
754      * @param family Family that contains the store file
755      * @param sf which file
756      */
757     public StoreFileSplitter(final byte[] family, final StoreFile sf) {
758       this.sf = sf;
759       this.family = family;
760     }
761 
762     public Pair<Path,Path> call() throws IOException {
763       return splitStoreFile(family, sf);
764     }
765   }
766 
767   @Override
768   public boolean rollback(final Server server, final RegionServerServices services)
769       throws IOException {
770     if (User.isHBaseSecurityEnabled(parent.getBaseConf())) {
771       LOG.warn("Should use rollback(Server, RegionServerServices, User)");
772     }
773     return rollback(server, services, null);
774   }
775 
776   @Override
777   public boolean rollback(final Server server, final RegionServerServices services, User user)
778       throws IOException {
779     this.server = server;
780     this.rsServices = services;
781     // Coprocessor callback
782     if (this.parent.getCoprocessorHost() != null) {
783       if (user == null) {
784         this.parent.getCoprocessorHost().preRollBackSplit();
785       } else {
786         try {
787           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
788             @Override
789             public Void run() throws Exception {
790               parent.getCoprocessorHost().preRollBackSplit();
791               return null;
792             }
793           });
794         } catch (InterruptedException ie) {
795           InterruptedIOException iioe = new InterruptedIOException();
796           iioe.initCause(ie);
797           throw iioe;
798         }
799       }
800     }
801 
802     boolean result = true;
803     ListIterator<JournalEntry> iterator =
804       this.journal.listIterator(this.journal.size());
805     // Iterate in reverse.
806     while (iterator.hasPrevious()) {
807       JournalEntry je = iterator.previous();
808 
809       transition(je.getPhase(), true);
810 
811       switch (je.getPhase()) {
812 
813       case SET_SPLITTING:
814         if (services != null
815             && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED,
816                 parent.getRegionInfo(), hri_a, hri_b)) {
817           return false;
818         }
819         break;
820 
821       case CREATE_SPLIT_DIR:
822         this.parent.writestate.writesEnabled = true;
823         this.parent.getRegionFileSystem().cleanupSplitsDir();
824         break;
825 
826       case CLOSED_PARENT_REGION:
827         try {
828           // So, this returns a seqid but if we just closed and then reopened, we
829           // should be ok. On close, we flushed using sequenceid obtained from
830           // hosting regionserver so no need to propagate the sequenceid returned
831           // out of initialize below up into regionserver as we normally do.
832           // TODO: Verify.
833           this.parent.initialize();
834         } catch (IOException e) {
835           LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " +
836             parent.getRegionInfo().getRegionNameAsString(), e);
837           throw new RuntimeException(e);
838         }
839         break;
840 
841       case STARTED_REGION_A_CREATION:
842         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a);
843         break;
844 
845       case STARTED_REGION_B_CREATION:
846         this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b);
847         break;
848 
849       case OFFLINED_PARENT:
850         if (services != null) services.addToOnlineRegions(this.parent);
851         break;
852 
853       case PONR:
854         // We got to the point-of-no-return so we need to just abort. Return
855         // immediately.  Do not clean up created daughter regions.  They need
856         // to be in place so we don't delete the parent region mistakenly.
857         // See HBASE-3872.
858         return false;
859 
860       // Informational only cases
861       case STARTED:
862       case PREPARED:
863       case BEFORE_PRE_SPLIT_HOOK:
864       case AFTER_PRE_SPLIT_HOOK:
865       case BEFORE_POST_SPLIT_HOOK:
866       case AFTER_POST_SPLIT_HOOK:
867       case OPENED_REGION_A:
868       case OPENED_REGION_B:
869       case COMPLETED:
870         break;
871 
872       default:
873         throw new RuntimeException("Unhandled journal entry: " + je);
874       }
875     }
876     // Coprocessor callback
877     if (this.parent.getCoprocessorHost() != null) {
878       if (user == null) {
879         this.parent.getCoprocessorHost().postRollBackSplit();
880       } else {
881         try {
882           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
883             @Override
884             public Void run() throws Exception {
885               parent.getCoprocessorHost().postRollBackSplit();
886               return null;
887             }
888           });
889         } catch (InterruptedException ie) {
890           InterruptedIOException iioe = new InterruptedIOException();
891           iioe.initCause(ie);
892           throw iioe;
893         }
894       }
895     }
896     return result;
897   }
898 
899   /* package */ HRegionInfo getFirstDaughter() {
900     return hri_a;
901   }
902 
903   /* package */ HRegionInfo getSecondDaughter() {
904     return hri_b;
905   }
906 
907   @Override
908   public List<JournalEntry> getJournal() {
909     return journal;
910   }
911 
912   @Override
913   public SplitTransaction registerTransactionListener(TransactionListener listener) {
914     listeners.add(listener);
915     return this;
916   }
917 
918   @Override
919   public Server getServer() {
920     return server;
921   }
922 
923   @Override
924   public RegionServerServices getRegionServerServices() {
925     return rsServices;
926   }
927 
928 }