1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.util;
18
19 import java.util.ArrayList;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Map.Entry;
23
24 import org.apache.accumulo.core.Constants;
25 import org.apache.accumulo.core.cli.ClientOnRequiredTable;
26 import org.apache.accumulo.core.client.Connector;
27 import org.apache.accumulo.core.client.Scanner;
28 import org.apache.accumulo.core.client.impl.Tables;
29 import org.apache.accumulo.core.conf.AccumuloConfiguration;
30 import org.apache.accumulo.core.conf.ConfigurationCopy;
31 import org.apache.accumulo.core.conf.Property;
32 import org.apache.accumulo.core.data.Key;
33 import org.apache.accumulo.core.data.KeyExtent;
34 import org.apache.accumulo.core.data.Value;
35 import org.apache.commons.cli.ParseException;
36 import org.apache.hadoop.io.Text;
37 import org.apache.log4j.Logger;
38
39 import com.beust.jcommander.IStringConverter;
40 import com.beust.jcommander.Parameter;
41
42 public class Merge {
43
44 public static class MergeException extends Exception {
45 private static final long serialVersionUID = 1L;
46
47 MergeException(Exception ex) {
48 super(ex);
49 }
50 };
51
52 private static final Logger log = Logger.getLogger(Merge.class);
53
54 protected void message(String format, Object... args) {
55 log.info(String.format(format, args));
56 }
57
58 static class MemoryConverter implements IStringConverter<Long> {
59 @Override
60 public Long convert(String value) {
61 return AccumuloConfiguration.getMemoryInBytes(value);
62 }
63 }
64 static class TextConverter implements IStringConverter<Text> {
65 @Override
66 public Text convert(String value) {
67 return new Text(value);
68 }
69 }
70
71 static class Opts extends ClientOnRequiredTable {
72 @Parameter(names={"-s", "--size"}, description="merge goal size", converter=MemoryConverter.class)
73 Long goalSize = null;
74 @Parameter(names={"-f", "--force"}, description="merge small tablets even if merging them to larger tablets might cause a split")
75 boolean force = false;
76 @Parameter(names={"-b", "--begin"}, description="start tablet", converter=TextConverter.class)
77 Text begin = null;
78 @Parameter(names={"-e", "--end"}, description="end tablet", converter=TextConverter.class)
79 Text end = null;
80 }
81
82 public void start(String[] args) throws MergeException, ParseException {
83 Opts opts = new Opts();
84 opts.parseArgs(Merge.class.getCanonicalName(), args);
85
86 try {
87 Connector conn = opts.getConnector();
88
89 if (!conn.tableOperations().exists(opts.tableName)) {
90 System.err.println("table " + opts.tableName + " does not exist");
91 return;
92 }
93 if (opts.goalSize == null || opts.goalSize < 1) {
94 AccumuloConfiguration tableConfig = new ConfigurationCopy(conn.tableOperations().getProperties(opts.tableName));
95 opts.goalSize = tableConfig.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
96 }
97
98 message("Merging tablets in table %s to %d bytes", opts.tableName, opts.goalSize);
99 mergomatic(conn, opts.tableName, opts.begin, opts.end, opts.goalSize, opts.force);
100 } catch (Exception ex) {
101 throw new MergeException(ex);
102 }
103 }
104
105 public static void main(String[] args) throws MergeException, ParseException {
106 Merge merge = new Merge();
107 merge.start(args);
108 }
109
110 public static class Size {
111 public Size(KeyExtent extent, long size) {
112 this.extent = extent;
113 this.size = size;
114 }
115
116 KeyExtent extent;
117 long size;
118 }
119
120 public void mergomatic(Connector conn, String table, Text start, Text end, long goalSize, boolean force) throws MergeException {
121 try {
122 if (table.equals(Constants.METADATA_TABLE_NAME)) {
123 throw new IllegalArgumentException("cannot merge tablets on the metadata table");
124 }
125 List<Size> sizes = new ArrayList<Size>();
126 long totalSize = 0;
127
128 Iterator<Size> sizeIterator = getSizeIterator(conn, table, start, end);
129 while (sizeIterator.hasNext()) {
130 Size next = sizeIterator.next();
131 totalSize += next.size;
132 sizes.add(next);
133 if (totalSize > goalSize) {
134 totalSize = mergeMany(conn, table, sizes, goalSize, force, false);
135 }
136 }
137 if (sizes.size() > 1)
138 mergeMany(conn, table, sizes, goalSize, force, true);
139 } catch (Exception ex) {
140 throw new MergeException(ex);
141 }
142 }
143
144 protected long mergeMany(Connector conn, String table, List<Size> sizes, long goalSize, boolean force, boolean last) throws MergeException {
145
146 while (!sizes.isEmpty()) {
147 if (sizes.get(0).size < goalSize)
148 break;
149 sizes.remove(0);
150 }
151 if (sizes.isEmpty()) {
152 return 0;
153 }
154
155
156 long mergeSize = 0;
157 int numToMerge = 0;
158 for (int i = 0; i < sizes.size(); i++) {
159 if (mergeSize + sizes.get(i).size > goalSize) {
160 numToMerge = i;
161 break;
162 }
163 mergeSize += sizes.get(i).size;
164 }
165
166 if (numToMerge > 1) {
167 mergeSome(conn, table, sizes, numToMerge);
168 } else {
169 if (numToMerge == 1 && sizes.size() > 1) {
170
171 if (force) {
172 mergeSome(conn, table, sizes, 2);
173 } else {
174 sizes.remove(0);
175 }
176 }
177 }
178 if (numToMerge == 0 && sizes.size() > 1 && last) {
179
180 mergeSome(conn, table, sizes, sizes.size());
181 }
182 long result = 0;
183 for (Size s : sizes) {
184 result += s.size;
185 }
186 return result;
187 }
188
189 protected void mergeSome(Connector conn, String table, List<Size> sizes, int numToMerge) throws MergeException {
190 merge(conn, table, sizes, numToMerge);
191 for (int i = 0; i < numToMerge; i++) {
192 sizes.remove(0);
193 }
194 }
195
196 protected void merge(Connector conn, String table, List<Size> sizes, int numToMerge) throws MergeException {
197 try {
198 Text start = sizes.get(0).extent.getPrevEndRow();
199 Text end = sizes.get(numToMerge - 1).extent.getEndRow();
200 message("Merging %d tablets from (%s to %s]", numToMerge, start == null ? "-inf" : start, end == null ? "+inf" : end);
201 conn.tableOperations().merge(table, start, end);
202 } catch (Exception ex) {
203 throw new MergeException(ex);
204 }
205 }
206
207 protected Iterator<Size> getSizeIterator(Connector conn, String tablename, Text start, Text end) throws MergeException {
208
209 String tableId;
210 Scanner scanner;
211 try {
212 tableId = Tables.getTableId(conn.getInstance(), tablename);
213 scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
214 } catch (Exception e) {
215 throw new MergeException(e);
216 }
217 scanner.setRange(new KeyExtent(new Text(tableId), end, start).toMetadataRange());
218 scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
219 Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
220 final Iterator<Entry<Key,Value>> iterator = scanner.iterator();
221
222 Iterator<Size> result = new Iterator<Size>() {
223 Size next = fetch();
224
225 @Override
226 public boolean hasNext() {
227 return next != null;
228 }
229
230 private Size fetch() {
231 long tabletSize = 0;
232 while (iterator.hasNext()) {
233 Entry<Key,Value> entry = iterator.next();
234 Key key = entry.getKey();
235 if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
236 String[] sizeEntries = new String(entry.getValue().get()).split(",");
237 if (sizeEntries.length == 2) {
238 tabletSize += Long.parseLong(sizeEntries[0]);
239 }
240 } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
241 KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
242 return new Size(extent, tabletSize);
243 }
244 }
245 return null;
246 }
247
248 @Override
249 public Size next() {
250 Size result = next;
251 next = fetch();
252 return result;
253 }
254
255 @Override
256 public void remove() {
257 throw new UnsupportedOperationException();
258 }
259 };
260 return result;
261 }
262
263 }