1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.util;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.Random;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.MetaTableAccessor;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.TableNotDisabledException;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.Connection;
41 import org.apache.hadoop.hbase.client.ConnectionFactory;
42 import org.apache.hadoop.hbase.client.Delete;
43 import org.apache.hadoop.hbase.client.HConnection;
44 import org.apache.hadoop.hbase.client.Result;
45 import org.apache.hadoop.hbase.client.ResultScanner;
46 import org.apache.hadoop.hbase.client.Table;
47 import org.apache.hadoop.hbase.regionserver.HRegion;
48 import org.apache.hadoop.hbase.wal.WALFactory;
49 import org.apache.hadoop.ipc.RemoteException;
50
51
52
53
54
55 @InterfaceAudience.Private
56 class HMerge {
57
58 private static final Log LOG = LogFactory.getLog(HMerge.class);
59 static final Random rand = new Random();
60
61
62
63
64 private HMerge() {
65 super();
66 }
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public static void merge(Configuration conf, FileSystem fs,
82 final TableName tableName)
83 throws IOException {
84 merge(conf, fs, tableName, true);
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 public static void merge(Configuration conf, FileSystem fs,
103 final TableName tableName, final boolean testMasterRunning)
104 throws IOException {
105 boolean masterIsRunning = false;
106 HConnection hConnection = null;
107 if (testMasterRunning) {
108 try {
109 hConnection = (HConnection) ConnectionFactory.createConnection(conf);
110 masterIsRunning = hConnection.isMasterRunning();
111 } finally {
112 if (hConnection != null) {
113 hConnection.close();
114 }
115 }
116 }
117 if (tableName.equals(TableName.META_TABLE_NAME)) {
118 if (masterIsRunning) {
119 throw new IllegalStateException(
120 "Can not compact hbase:meta table if instance is on-line");
121 }
122
123 } else {
124 if(!masterIsRunning) {
125 throw new IllegalStateException(
126 "HBase instance must be running to merge a normal table");
127 }
128 try (Connection conn = ConnectionFactory.createConnection(conf);
129 Admin admin = conn.getAdmin()) {
130 if (!admin.isTableDisabled(tableName)) {
131 throw new TableNotDisabledException(tableName);
132 }
133 }
134 new OnlineMerger(conf, fs, tableName).process();
135 }
136 }
137
138 private static abstract class Merger {
139 protected final Configuration conf;
140 protected final FileSystem fs;
141 protected final Path rootDir;
142 protected final HTableDescriptor htd;
143 protected final WALFactory walFactory;
144 private final long maxFilesize;
145
146
147 protected Merger(Configuration conf, FileSystem fs, final TableName tableName)
148 throws IOException {
149 this.conf = conf;
150 this.fs = fs;
151 this.maxFilesize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
152 HConstants.DEFAULT_MAX_FILE_SIZE);
153
154 this.rootDir = FSUtils.getRootDir(conf);
155 Path tabledir = FSUtils.getTableDir(this.rootDir, tableName);
156 this.htd = FSTableDescriptors.getTableDescriptorFromFs(this.fs, tabledir)
157 .getHTableDescriptor();
158 String logname = "merge_" + System.currentTimeMillis() + HConstants.HREGION_LOGDIR_NAME;
159
160 final Configuration walConf = new Configuration(conf);
161 FSUtils.setRootDir(walConf, tabledir);
162 this.walFactory = new WALFactory(walConf, null, logname);
163 }
164
165 void process() throws IOException {
166 try {
167 for (HRegionInfo[] regionsToMerge = next();
168 regionsToMerge != null;
169 regionsToMerge = next()) {
170 if (!merge(regionsToMerge)) {
171 return;
172 }
173 }
174 } finally {
175 try {
176 walFactory.close();
177 } catch(IOException e) {
178 LOG.error(e);
179 }
180 }
181 }
182
183 protected boolean merge(final HRegionInfo[] info) throws IOException {
184 if (info.length < 2) {
185 LOG.info("only one region - nothing to merge");
186 return false;
187 }
188
189 HRegion currentRegion = null;
190 long currentSize = 0;
191 HRegion nextRegion = null;
192 long nextSize = 0;
193 for (int i = 0; i < info.length - 1; i++) {
194 if (currentRegion == null) {
195 currentRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i], this.htd,
196 walFactory.getWAL(info[i].getEncodedNameAsBytes(),
197 info[i].getTable().getNamespace()));
198 currentSize = currentRegion.getLargestHStoreSize();
199 }
200 nextRegion = HRegion.openHRegion(conf, fs, this.rootDir, info[i + 1], this.htd,
201 walFactory.getWAL(info[i + 1].getEncodedNameAsBytes(),
202 info[i + 1].getTable().getNamespace()));
203 nextSize = nextRegion.getLargestHStoreSize();
204
205 if ((currentSize + nextSize) <= (maxFilesize / 2)) {
206
207
208 LOG.info("Merging regions " + currentRegion.getRegionInfo().getRegionNameAsString() +
209 " and " + nextRegion.getRegionInfo().getRegionNameAsString());
210 HRegion mergedRegion =
211 HRegion.mergeAdjacent(currentRegion, nextRegion);
212 updateMeta(currentRegion.getRegionInfo().getRegionName(),
213 nextRegion.getRegionInfo().getRegionName(), mergedRegion);
214 break;
215 }
216 LOG.info("not merging regions " +
217 Bytes.toStringBinary(currentRegion.getRegionInfo().getRegionName()) +
218 " and " + Bytes.toStringBinary(nextRegion.getRegionInfo().getRegionName()));
219 currentRegion.close();
220 currentRegion = nextRegion;
221 currentSize = nextSize;
222 }
223 if(currentRegion != null) {
224 currentRegion.close();
225 }
226 return true;
227 }
228
229 protected abstract HRegionInfo[] next() throws IOException;
230
231 protected abstract void updateMeta(final byte [] oldRegion1,
232 final byte [] oldRegion2, HRegion newRegion)
233 throws IOException;
234
235 }
236
237
238 private static class OnlineMerger extends Merger {
239 private final TableName tableName;
240 private final Table table;
241 private final ResultScanner metaScanner;
242 private HRegionInfo latestRegion;
243
244 OnlineMerger(Configuration conf, FileSystem fs,
245 final TableName tableName)
246 throws IOException {
247 super(conf, fs, tableName);
248 this.tableName = tableName;
249 Connection connection = ConnectionFactory.createConnection(conf);
250 this.table = connection.getTable(TableName.META_TABLE_NAME);
251 this.metaScanner = table.getScanner(HConstants.CATALOG_FAMILY,
252 HConstants.REGIONINFO_QUALIFIER);
253 this.latestRegion = null;
254 }
255
256 private HRegionInfo nextRegion() throws IOException {
257 try {
258 Result results = getMetaRow();
259 if (results == null) {
260 return null;
261 }
262 HRegionInfo region = MetaTableAccessor.getHRegionInfo(results);
263 if (region == null) {
264 throw new NoSuchElementException("meta region entry missing " +
265 Bytes.toString(HConstants.CATALOG_FAMILY) + ":" +
266 Bytes.toString(HConstants.REGIONINFO_QUALIFIER));
267 }
268 if (!region.getTable().equals(this.tableName)) {
269 return null;
270 }
271 return region;
272 } catch (IOException e) {
273 e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
274 LOG.error("meta scanner error", e);
275 metaScanner.close();
276 throw e;
277 }
278 }
279
280
281
282
283
284
285 private Result getMetaRow() throws IOException {
286 Result currentRow = metaScanner.next();
287 boolean foundResult = false;
288 while (currentRow != null) {
289 LOG.info("Row: <" + Bytes.toStringBinary(currentRow.getRow()) + ">");
290 byte[] regionInfoValue = currentRow.getValue(HConstants.CATALOG_FAMILY,
291 HConstants.REGIONINFO_QUALIFIER);
292 if (regionInfoValue == null || regionInfoValue.length == 0) {
293 currentRow = metaScanner.next();
294 continue;
295 }
296 HRegionInfo region = MetaTableAccessor.getHRegionInfo(currentRow);
297 if (!region.getTable().equals(this.tableName)) {
298 currentRow = metaScanner.next();
299 continue;
300 }
301 foundResult = true;
302 break;
303 }
304 return foundResult ? currentRow : null;
305 }
306
307 @Override
308 protected HRegionInfo[] next() throws IOException {
309 List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
310 if(latestRegion == null) {
311 latestRegion = nextRegion();
312 }
313 if(latestRegion != null) {
314 regions.add(latestRegion);
315 }
316 latestRegion = nextRegion();
317 if(latestRegion != null) {
318 regions.add(latestRegion);
319 }
320 return regions.toArray(new HRegionInfo[regions.size()]);
321 }
322
323 @Override
324 protected void updateMeta(final byte [] oldRegion1,
325 final byte [] oldRegion2,
326 HRegion newRegion)
327 throws IOException {
328 byte[][] regionsToDelete = {oldRegion1, oldRegion2};
329 for (int r = 0; r < regionsToDelete.length; r++) {
330 if(Bytes.equals(regionsToDelete[r], latestRegion.getRegionName())) {
331 latestRegion = null;
332 }
333 Delete delete = new Delete(regionsToDelete[r]);
334 table.delete(delete);
335 if(LOG.isDebugEnabled()) {
336 LOG.debug("updated columns in row: " + Bytes.toStringBinary(regionsToDelete[r]));
337 }
338 }
339 newRegion.getRegionInfo().setOffline(true);
340
341 MetaTableAccessor.addRegionToMeta(table, newRegion.getRegionInfo());
342
343 if(LOG.isDebugEnabled()) {
344 LOG.debug("updated columns in row: "
345 + Bytes.toStringBinary(newRegion.getRegionInfo().getRegionName()));
346 }
347 }
348 }
349 }