View Javadoc

1   /**
2    * Copyright The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements. See the NOTICE file distributed with this
6    * work for additional information regarding copyright ownership. The ASF
7    * licenses this file to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance with the License.
9    * You may obtain a copy of the License at
10   *
11   * http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16   * License for the specific language governing permissions and limitationsME
17   * under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.security.PrivilegedExceptionAction;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.ListIterator;
27  import java.util.Map;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.fs.Path;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.MetaMutationAnnotation;
36  import org.apache.hadoop.hbase.MetaTableAccessor;
37  import org.apache.hadoop.hbase.Server;
38  import org.apache.hadoop.hbase.ServerName;
39  import org.apache.hadoop.hbase.client.Delete;
40  import org.apache.hadoop.hbase.client.Mutation;
41  import org.apache.hadoop.hbase.client.Put;
42  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
43  import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable;
44  import org.apache.hadoop.hbase.security.User;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
47  import org.apache.hadoop.hbase.util.Pair;
48  
49  import com.google.common.annotations.VisibleForTesting;
50  
51  @InterfaceAudience.Private
52  public class RegionMergeTransactionImpl implements RegionMergeTransaction {
53    private static final Log LOG = LogFactory.getLog(RegionMergeTransactionImpl.class);
54  
55    // Merged region info
56    private HRegionInfo mergedRegionInfo;
57    // region_a sorts before region_b
58    private final HRegion region_a;
59    private final HRegion region_b;
60    // merges dir is under region_a
61    private final Path mergesdir;
62    // We only merge adjacent regions if forcible is false
63    private final boolean forcible;
64    private final long masterSystemTime;
65  
66    /*
67     * Transaction state for listener, only valid during execute and
68     * rollback
69     */
70    private RegionMergeTransactionPhase currentPhase = RegionMergeTransactionPhase.STARTED;
71    private Server server;
72    private RegionServerServices rsServices;
73  
74    public static class JournalEntryImpl implements JournalEntry {
75      private RegionMergeTransactionPhase type;
76      private long timestamp;
77  
78      public JournalEntryImpl(RegionMergeTransactionPhase type) {
79        this(type, EnvironmentEdgeManager.currentTime());
80      }
81  
82      public JournalEntryImpl(RegionMergeTransactionPhase type, long timestamp) {
83        this.type = type;
84        this.timestamp = timestamp;
85      }
86  
87      @Override
88      public String toString() {
89        StringBuilder sb = new StringBuilder();
90        sb.append(type);
91        sb.append(" at ");
92        sb.append(timestamp);
93        return sb.toString();
94      }
95  
96      @Override
97      public RegionMergeTransactionPhase getPhase() {
98        return type;
99      }
100 
101     @Override
102     public long getTimeStamp() {
103       return timestamp;
104     }
105   }
106 
107   /*
108    * Journal of how far the merge transaction has progressed.
109    */
110   private final List<JournalEntry> journal = new ArrayList<JournalEntry>();
111 
112   /**
113    * Listeners
114    */
115   private final ArrayList<TransactionListener> listeners = new ArrayList<TransactionListener>();
116 
117   private static IOException closedByOtherException = new IOException(
118       "Failed to close region: already closed by another thread");
119 
120   private RegionServerCoprocessorHost rsCoprocessorHost = null;
121 
122   /**
123    * Constructor
124    * @param a region a to merge
125    * @param b region b to merge
126    * @param forcible if false, we will only merge adjacent regions
127    */
128   public RegionMergeTransactionImpl(final Region a, final Region b,
129       final boolean forcible) {
130     this(a, b, forcible, EnvironmentEdgeManager.currentTime());
131   }
132   /**
133    * Constructor
134    * @param a region a to merge
135    * @param b region b to merge
136    * @param forcible if false, we will only merge adjacent regions
137    * @param masterSystemTime the time at the master side
138    */
139   public RegionMergeTransactionImpl(final Region a, final Region b,
140       final boolean forcible, long masterSystemTime) {
141     if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) {
142       this.region_a = (HRegion)a;
143       this.region_b = (HRegion)b;
144     } else {
145       this.region_a = (HRegion)b;
146       this.region_b = (HRegion)a;
147     }
148     this.forcible = forcible;
149     this.masterSystemTime = masterSystemTime;
150     this.mergesdir = region_a.getRegionFileSystem().getMergesDir();
151   }
152 
153   private void transition(RegionMergeTransactionPhase nextPhase) throws IOException {
154     transition(nextPhase, false);
155   }
156 
157   private void transition(RegionMergeTransactionPhase nextPhase, boolean isRollback)
158       throws IOException {
159     if (!isRollback) {
160       // Add to the journal first, because if the listener throws an exception
161       // we need to roll back starting at 'nextPhase'
162       this.journal.add(new JournalEntryImpl(nextPhase));
163     }
164     for (int i = 0; i < listeners.size(); i++) {
165       TransactionListener listener = listeners.get(i);
166       if (!isRollback) {
167         listener.transition(this, currentPhase, nextPhase);
168       } else {
169         listener.rollback(this, currentPhase, nextPhase);
170       }
171     }
172     currentPhase = nextPhase;
173   }
174 
175   @Override
176   public boolean prepare(final RegionServerServices services) throws IOException {
177     if (!region_a.getTableDesc().getTableName()
178         .equals(region_b.getTableDesc().getTableName())) {
179       LOG.info("Can't merge regions " + region_a + "," + region_b
180           + " because they do not belong to the same table");
181       return false;
182     }
183     if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) {
184       LOG.info("Can't merge the same region " + region_a);
185       return false;
186     }
187     if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(),
188             region_b.getRegionInfo())) {
189       String msg = "Skip merging " + region_a.getRegionInfo().getRegionNameAsString()
190           + " and " + region_b.getRegionInfo().getRegionNameAsString()
191           + ", because they are not adjacent.";
192       LOG.info(msg);
193       return false;
194     }
195     if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) {
196       return false;
197     }
198     try {
199       boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services,
200           region_a.getRegionInfo().getRegionName());
201       if (regionAHasMergeQualifier ||
202           hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) {
203         LOG.debug("Region " + (regionAHasMergeQualifier ? 
204             region_a.getRegionInfo().getRegionNameAsString()
205                 : region_b.getRegionInfo().getRegionNameAsString())
206             + " is not mergeable because it has merge qualifier in META");
207         return false;
208       }
209     } catch (IOException e) {
210       LOG.warn("Failed judging whether merge transaction is available for "
211               + region_a.getRegionInfo().getRegionNameAsString() + " and "
212               + region_b.getRegionInfo().getRegionNameAsString(), e);
213       return false;
214     }
215 
216     // WARN: make sure there is no parent region of the two merging regions in
217     // hbase:meta If exists, fixing up daughters would cause daughter regions(we
218     // have merged one) online again when we restart master, so we should clear
219     // the parent region to prevent the above case
220     // Since HBASE-7721, we don't need fix up daughters any more. so here do
221     // nothing
222 
223     this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(),
224         region_b.getRegionInfo());
225 
226     transition(RegionMergeTransactionPhase.PREPARED);
227     return true;
228   }
229 
230   @Override
231   public Region execute(final Server server, final RegionServerServices services)
232       throws IOException {
233     if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
234       LOG.warn("Should use execute(Server, RegionServerServices, User)");
235     }
236     return execute(server, services, null);
237   }
238 
239   @Override
240   public Region execute(final Server server, final RegionServerServices services, User user)
241       throws IOException {
242     this.server = server;
243     this.rsServices = services;
244     if (rsCoprocessorHost == null) {
245       rsCoprocessorHost = server != null ?
246         ((HRegionServer) server).getRegionServerCoprocessorHost() : null;
247     }
248     final HRegion mergedRegion = createMergedRegion(server, services, user);
249     if (rsCoprocessorHost != null) {
250       if (user == null) {
251         rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
252       } else {
253         try {
254           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
255             @Override
256             public Void run() throws Exception {
257               rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion);
258               return null;
259             }
260           });
261         } catch (InterruptedException ie) {
262           InterruptedIOException iioe = new InterruptedIOException();
263           iioe.initCause(ie);
264           throw iioe;
265         }
266       }
267     }
268     stepsAfterPONR(server, services, mergedRegion, user);
269 
270     transition(RegionMergeTransactionPhase.COMPLETED);
271 
272     return mergedRegion;
273   }
274 
275   @VisibleForTesting
276   public void stepsAfterPONR(final Server server, final RegionServerServices services,
277       final HRegion mergedRegion, User user) throws IOException {
278     openMergedRegion(server, services, mergedRegion);
279     if (rsCoprocessorHost != null) {
280       if (user == null) {
281         rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
282       } else {
283         try {
284           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
285             @Override
286             public Void run() throws Exception {
287               rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
288               return null;
289             }
290           });
291         } catch (InterruptedException ie) {
292           InterruptedIOException iioe = new InterruptedIOException();
293           iioe.initCause(ie);
294           throw iioe;
295         }
296       }
297     }
298   }
299 
300   /**
301    * Prepare the merged region and region files.
302    * @param server Hosting server instance. Can be null when testing
303    * @param services Used to online/offline regions.
304    * @return merged region
305    * @throws IOException If thrown, transaction failed. Call
306    *           {@link #rollback(Server, RegionServerServices)}
307    */
308   private HRegion createMergedRegion(final Server server, final RegionServerServices services,
309       User user) throws IOException {
310     LOG.info("Starting merge of " + region_a + " and "
311         + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible);
312     if ((server != null && server.isStopped())
313         || (services != null && services.isStopping())) {
314       throw new IOException("Server is stopped or stopping");
315     }
316 
317     if (rsCoprocessorHost != null) {
318       boolean ret = false;
319       if (user == null) {
320         ret = rsCoprocessorHost.preMerge(region_a, region_b);
321       } else {
322         try {
323           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
324             @Override
325             public Boolean run() throws Exception {
326               return rsCoprocessorHost.preMerge(region_a, region_b);
327             }
328           });
329         } catch (InterruptedException ie) {
330           InterruptedIOException iioe = new InterruptedIOException();
331           iioe.initCause(ie);
332           throw iioe;
333         }
334       }
335       if (ret) {
336         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
337             + this.region_b + " merge.");
338       }
339     }
340 
341     // If true, no cluster to write meta edits to or to use coordination.
342     boolean testing = server == null ? true : server.getConfiguration()
343         .getBoolean("hbase.testing.nocluster", false);
344 
345     HRegion mergedRegion = stepsBeforePONR(server, services, testing);
346 
347     @MetaMutationAnnotation
348     final List<Mutation> metaEntries = new ArrayList<Mutation>();
349     if (rsCoprocessorHost != null) {
350       boolean ret = false;
351       if (user == null) {
352         ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
353       } else {
354         try {
355           ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
356             @Override
357             public Boolean run() throws Exception {
358               return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
359             }
360           });
361         } catch (InterruptedException ie) {
362           InterruptedIOException iioe = new InterruptedIOException();
363           iioe.initCause(ie);
364           throw iioe;
365         }
366       }
367 
368       if (ret) {
369         throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
370             + this.region_b + " merge.");
371       }
372       try {
373         for (Mutation p : metaEntries) {
374           HRegionInfo.parseRegionName(p.getRow());
375         }
376       } catch (IOException e) {
377         LOG.error("Row key of mutation from coprocessor is not parsable as region name."
378             + "Mutations from coprocessor should only be for hbase:meta table.", e);
379         throw e;
380       }
381     }
382 
383     // This is the point of no return. Similar with SplitTransaction.
384     // IF we reach the PONR then subsequent failures need to crash out this
385     // regionserver
386     transition(RegionMergeTransactionPhase.PONR);
387 
388     // Add merged region and delete region_a and region_b
389     // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region
390     // will determine whether the region is merged or not in case of failures.
391     // If it is successful, master will roll-forward, if not, master will
392     // rollback
393     if (services != null && !services.reportRegionStateTransition(TransitionCode.MERGE_PONR,
394         mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
395       // Passed PONR, let SSH clean it up
396       throw new IOException("Failed to notify master that merge passed PONR: "
397         + region_a.getRegionInfo().getRegionNameAsString() + " and "
398         + region_b.getRegionInfo().getRegionNameAsString());
399     }
400     return mergedRegion;
401   }
402 
403   @VisibleForTesting
404   public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA,
405       HRegionInfo regionB, ServerName serverName, List<Mutation> mutations) throws IOException {
406     HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion);
407 
408     // use the maximum of what master passed us vs local time.
409     long time = Math.max(EnvironmentEdgeManager.currentTime(), masterSystemTime);
410 
411     // Put for parent
412     Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged, time);
413     putOfMerged.addColumn(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER,
414         regionA.toByteArray());
415     putOfMerged.addColumn(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER,
416         regionB.toByteArray());
417     mutations.add(putOfMerged);
418     // Deletes for merging regions
419     Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA, time);
420     Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB, time);
421     mutations.add(deleteA);
422     mutations.add(deleteB);
423     // The merged is a new region, openSeqNum = 1 is fine.
424     addLocation(putOfMerged, serverName, 1);
425   }
426 
427   @VisibleForTesting
428   Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
429     p.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes
430             .toBytes(sn.getHostAndPort()));
431     p.addColumn(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn
432             .getStartcode()));
433     p.addColumn(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum));
434     return p;
435   }
436 
437   @VisibleForTesting
438   public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
439       boolean testing) throws IOException {
440     if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE,
441         mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
442       throw new IOException("Failed to get ok from master to merge "
443         + region_a.getRegionInfo().getRegionNameAsString() + " and "
444         + region_b.getRegionInfo().getRegionNameAsString());
445     }
446 
447     transition(RegionMergeTransactionPhase.SET_MERGING);
448 
449     this.region_a.getRegionFileSystem().createMergesDir();
450 
451     transition(RegionMergeTransactionPhase.CREATED_MERGE_DIR);
452 
453     Map<byte[], List<StoreFile>> hstoreFilesOfRegionA = closeAndOfflineRegion(
454         services, this.region_a, true, testing);
455     Map<byte[], List<StoreFile>> hstoreFilesOfRegionB = closeAndOfflineRegion(
456         services, this.region_b, false, testing);
457 
458     assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null;
459 
460     // mergeStoreFiles creates merged region dirs under the region_a merges dir
461     // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will
462     // clean this up.
463     mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
464 
465     // Log to the journal that we are creating merged region. We could fail
466     // halfway through. If we do, we could have left
467     // stuff in fs that needs cleanup -- a storefile or two. Thats why we
468     // add entry to journal BEFORE rather than AFTER the change.
469 
470     transition(RegionMergeTransactionPhase.STARTED_MERGED_REGION_CREATION);
471 
472     HRegion mergedRegion = createMergedRegionFromMerges(this.region_a,
473         this.region_b, this.mergedRegionInfo);
474     return mergedRegion;
475   }
476 
477   /**
478    * Create a merged region from the merges directory under region a. In order
479    * to mock it for tests, place it with a new method.
480    * @param a hri of region a
481    * @param b hri of region b
482    * @param mergedRegion hri of merged region
483    * @return merged HRegion.
484    * @throws IOException
485    */
486   @VisibleForTesting
487   HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b,
488       final HRegionInfo mergedRegion) throws IOException {
489     return a.createMergedRegionFromMerges(mergedRegion, b);
490   }
491 
492   /**
493    * Close the merging region and offline it in regionserver
494    * @param services
495    * @param region
496    * @param isRegionA true if it is merging region a, false if it is region b
497    * @param testing true if it is testing
498    * @return a map of family name to list of store files
499    * @throws IOException
500    */
501   private Map<byte[], List<StoreFile>> closeAndOfflineRegion(
502       final RegionServerServices services, final HRegion region,
503       final boolean isRegionA, final boolean testing) throws IOException {
504     Map<byte[], List<StoreFile>> hstoreFilesToMerge = null;
505     Exception exceptionToThrow = null;
506     try {
507       hstoreFilesToMerge = region.close(false);
508     } catch (Exception e) {
509       exceptionToThrow = e;
510     }
511     if (exceptionToThrow == null && hstoreFilesToMerge == null) {
512       // The region was closed by a concurrent thread. We can't continue
513       // with the merge, instead we must just abandon the merge. If we
514       // reopen or merge this could cause problems because the region has
515       // probably already been moved to a different server, or is in the
516       // process of moving to a different server.
517       exceptionToThrow = closedByOtherException;
518     }
519     if (exceptionToThrow != closedByOtherException) {
520       transition(isRegionA ? RegionMergeTransactionPhase.CLOSED_REGION_A
521           : RegionMergeTransactionPhase.CLOSED_REGION_B);
522     }
523     if (exceptionToThrow != null) {
524       if (exceptionToThrow instanceof IOException)
525         throw (IOException) exceptionToThrow;
526       throw new IOException(exceptionToThrow);
527     }
528     if (!testing) {
529       services.removeFromOnlineRegions(region, null);
530     }
531 
532     transition(isRegionA ? RegionMergeTransactionPhase.OFFLINED_REGION_A
533         : RegionMergeTransactionPhase.OFFLINED_REGION_B);
534 
535     return hstoreFilesToMerge;
536   }
537 
538   /**
539    * Get merged region info through the specified two regions
540    * @param a merging region A
541    * @param b merging region B
542    * @return the merged region info
543    */
544   @VisibleForTesting
545   static HRegionInfo getMergedRegionInfo(final HRegionInfo a, final HRegionInfo b) {
546     long rid = EnvironmentEdgeManager.currentTime();
547     // Regionid is timestamp. Merged region's id can't be less than that of
548     // merging regions else will insert at wrong location in hbase:meta
549     if (rid < a.getRegionId() || rid < b.getRegionId()) {
550       LOG.warn("Clock skew; merging regions id are " + a.getRegionId()
551           + " and " + b.getRegionId() + ", but current time here is " + rid);
552       rid = Math.max(a.getRegionId(), b.getRegionId()) + 1;
553     }
554 
555     byte[] startKey = null;
556     byte[] endKey = null;
557     // Choose the smaller as start key
558     if (a.compareTo(b) <= 0) {
559       startKey = a.getStartKey();
560     } else {
561       startKey = b.getStartKey();
562     }
563     // Choose the bigger as end key
564     if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
565         || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY)
566             && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) {
567       endKey = a.getEndKey();
568     } else {
569       endKey = b.getEndKey();
570     }
571 
572     // Merged region is sorted between two merging regions in META
573     HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey,
574         endKey, false, rid);
575     return mergedRegionInfo;
576   }
577 
578   /**
579    * Perform time consuming opening of the merged region.
580    * @param server Hosting server instance. Can be null when testing
581    * @param services Used to online/offline regions.
582    * @param merged the merged region
583    * @throws IOException If thrown, transaction failed. Call
584    *           {@link #rollback(Server, RegionServerServices)}
585    */
586   @VisibleForTesting
587   void openMergedRegion(final Server server,  final RegionServerServices services,
588       HRegion merged) throws IOException {
589     boolean stopped = server != null && server.isStopped();
590     boolean stopping = services != null && services.isStopping();
591     if (stopped || stopping) {
592       LOG.info("Not opening merged region  " + merged.getRegionInfo().getRegionNameAsString()
593           + " because stopping=" + stopping + ", stopped=" + stopped);
594       return;
595     }
596     HRegionInfo hri = merged.getRegionInfo();
597     LoggingProgressable reporter = server == null ? null
598         : new LoggingProgressable(hri, server.getConfiguration().getLong(
599             "hbase.regionserver.regionmerge.open.log.interval", 10000));
600     merged.openHRegion(reporter);
601 
602     if (services != null) {
603       if (!services.reportRegionStateTransition(TransitionCode.MERGED,
604           mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
605         throw new IOException("Failed to report merged region to master: "
606           + mergedRegionInfo.getShortNameToLog());
607       }
608       services.addToOnlineRegions(merged);
609     }
610   }
611 
612   /**
613    * Create reference file(s) of merging regions under the region_a merges dir
614    * @param hstoreFilesOfRegionA
615    * @param hstoreFilesOfRegionB
616    * @throws IOException
617    */
618   private void mergeStoreFiles(
619       Map<byte[], List<StoreFile>> hstoreFilesOfRegionA,
620       Map<byte[], List<StoreFile>> hstoreFilesOfRegionB)
621       throws IOException {
622     // Create reference file(s) of region A in mergdir
623     HRegionFileSystem fs_a = this.region_a.getRegionFileSystem();
624     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionA
625         .entrySet()) {
626       String familyName = Bytes.toString(entry.getKey());
627       for (StoreFile storeFile : entry.getValue()) {
628         fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
629             this.mergesdir);
630       }
631     }
632     // Create reference file(s) of region B in mergedir
633     HRegionFileSystem fs_b = this.region_b.getRegionFileSystem();
634     for (Map.Entry<byte[], List<StoreFile>> entry : hstoreFilesOfRegionB
635         .entrySet()) {
636       String familyName = Bytes.toString(entry.getKey());
637       for (StoreFile storeFile : entry.getValue()) {
638         fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile,
639             this.mergesdir);
640       }
641     }
642   }
643 
644   @Override
645   public boolean rollback(final Server server,
646       final RegionServerServices services) throws IOException {
647     if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
648       LOG.warn("Should use execute(Server, RegionServerServices, User)");
649     }
650     return rollback(server, services, null);
651   }
652 
653   @Override
654   public boolean rollback(final Server server,
655       final RegionServerServices services, User user) throws IOException {
656     assert this.mergedRegionInfo != null;
657     this.server = server;
658     this.rsServices = services;
659     // Coprocessor callback
660     if (rsCoprocessorHost != null) {
661       if (user == null) {
662         rsCoprocessorHost.preRollBackMerge(region_a, region_b);
663       } else {
664         try {
665           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
666             @Override
667             public Void run() throws Exception {
668               rsCoprocessorHost.preRollBackMerge(region_a, region_b);
669               return null;
670             }
671           });
672         } catch (InterruptedException ie) {
673           InterruptedIOException iioe = new InterruptedIOException();
674           iioe.initCause(ie);
675           throw iioe;
676         }
677       }
678     }
679 
680     boolean result = true;
681     ListIterator<JournalEntry> iterator = this.journal
682         .listIterator(this.journal.size());
683     // Iterate in reverse.
684     while (iterator.hasPrevious()) {
685       JournalEntry je = iterator.previous();
686 
687       transition(je.getPhase(), true);
688 
689       switch (je.getPhase()) {
690 
691         case SET_MERGING:
692           if (services != null
693               && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED,
694                   mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
695             return false;
696         }
697           break;
698 
699         case CREATED_MERGE_DIR:
700           this.region_a.writestate.writesEnabled = true;
701           this.region_b.writestate.writesEnabled = true;
702           this.region_a.getRegionFileSystem().cleanupMergesDir();
703           break;
704 
705         case CLOSED_REGION_A:
706           try {
707             // So, this returns a seqid but if we just closed and then reopened,
708             // we should be ok. On close, we flushed using sequenceid obtained
709             // from hosting regionserver so no need to propagate the sequenceid
710             // returned out of initialize below up into regionserver as we
711             // normally do.
712             this.region_a.initialize();
713           } catch (IOException e) {
714             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
715                 + region_a.getRegionInfo().getRegionNameAsString(), e);
716             throw new RuntimeException(e);
717           }
718           break;
719 
720         case OFFLINED_REGION_A:
721           if (services != null)
722             services.addToOnlineRegions(this.region_a);
723           break;
724 
725         case CLOSED_REGION_B:
726           try {
727             this.region_b.initialize();
728           } catch (IOException e) {
729             LOG.error("Failed rollbacking CLOSED_REGION_A of region "
730                 + region_b.getRegionInfo().getRegionNameAsString(), e);
731             throw new RuntimeException(e);
732           }
733           break;
734 
735         case OFFLINED_REGION_B:
736           if (services != null)
737             services.addToOnlineRegions(this.region_b);
738           break;
739 
740         case STARTED_MERGED_REGION_CREATION:
741           this.region_a.getRegionFileSystem().cleanupMergedRegion(
742               this.mergedRegionInfo);
743           break;
744 
745         case PONR:
746           // We got to the point-of-no-return so we need to just abort. Return
747           // immediately. Do not clean up created merged regions.
748           return false;
749 
750          // Informational states only
751         case STARTED:
752         case PREPARED:
753         case COMPLETED:
754           break;
755 
756         default:
757           throw new RuntimeException("Unhandled journal entry: " + je);
758       }
759     }
760     // Coprocessor callback
761     if (rsCoprocessorHost != null) {
762       if (user == null) {
763         rsCoprocessorHost.postRollBackMerge(region_a, region_b);
764       } else {
765         try {
766           user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
767             @Override
768             public Void run() throws Exception {
769               rsCoprocessorHost.postRollBackMerge(region_a, region_b);
770               return null;
771             }
772           });
773         } catch (InterruptedException ie) {
774           InterruptedIOException iioe = new InterruptedIOException();
775           iioe.initCause(ie);
776           throw iioe;
777         }
778       }
779     }
780 
781     return result;
782   }
783 
784   @Override
785   public HRegionInfo getMergedRegionInfo() {
786     return this.mergedRegionInfo;
787   }
788 
789   @VisibleForTesting
790   Path getMergesDir() {
791     return this.mergesdir;
792   }
793 
794   /**
795    * Checks if the given region has merge qualifier in hbase:meta
796    * @param services
797    * @param regionName name of specified region
798    * @return true if the given region has merge qualifier in META.(It will be
799    *         cleaned by CatalogJanitor)
800    * @throws IOException
801    */
802   @VisibleForTesting
803   boolean hasMergeQualifierInMeta(final RegionServerServices services, final byte[] regionName)
804       throws IOException {
805     if (services == null) return false;
806     // Get merge regions if it is a merged region and already has merge
807     // qualifier
808     Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaTableAccessor
809         .getRegionsFromMergeQualifier(services.getConnection(), regionName);
810     if (mergeRegions != null &&
811         (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) {
812       // It has merge qualifier
813       return true;
814     }
815     return false;
816   }
817 
818   @Override
819   public List<JournalEntry> getJournal() {
820     return journal;
821   }
822 
823   @Override
824   public RegionMergeTransaction registerTransactionListener(TransactionListener listener) {
825     listeners.add(listener);
826     return this;
827   }
828 
829   @Override
830   public Server getServer() {
831     return server;
832   }
833 
834   @Override
835   public RegionServerServices getRegionServerServices() {
836     return rsServices;
837   }
838 }