View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.accumulo.core.util;
18  
19  import java.util.ArrayList;
20  import java.util.Iterator;
21  import java.util.List;
22  import java.util.Map.Entry;
23  
24  import org.apache.accumulo.core.Constants;
25  import org.apache.accumulo.core.cli.ClientOnRequiredTable;
26  import org.apache.accumulo.core.client.Connector;
27  import org.apache.accumulo.core.client.Scanner;
28  import org.apache.accumulo.core.client.impl.Tables;
29  import org.apache.accumulo.core.conf.AccumuloConfiguration;
30  import org.apache.accumulo.core.conf.ConfigurationCopy;
31  import org.apache.accumulo.core.conf.Property;
32  import org.apache.accumulo.core.data.Key;
33  import org.apache.accumulo.core.data.KeyExtent;
34  import org.apache.accumulo.core.data.Value;
35  import org.apache.commons.cli.ParseException;
36  import org.apache.hadoop.io.Text;
37  import org.apache.log4j.Logger;
38  
39  import com.beust.jcommander.IStringConverter;
40  import com.beust.jcommander.Parameter;
41  
42  public class Merge {
43    
44    public static class MergeException extends Exception {
45      private static final long serialVersionUID = 1L;
46      
47      MergeException(Exception ex) {
48        super(ex);
49      }
50    };
51    
52    private static final Logger log = Logger.getLogger(Merge.class);
53    
54    protected void message(String format, Object... args) {
55      log.info(String.format(format, args));
56    }
57    
58    static class MemoryConverter implements IStringConverter<Long> {
59      @Override
60      public Long convert(String value) {
61        return AccumuloConfiguration.getMemoryInBytes(value);
62      }
63    }
64    static class TextConverter implements IStringConverter<Text> {
65      @Override
66      public Text convert(String value) {
67        return new Text(value);
68      }
69    }
70    
71    static class Opts extends ClientOnRequiredTable {
72      @Parameter(names={"-s", "--size"}, description="merge goal size", converter=MemoryConverter.class)
73      Long goalSize = null;
74      @Parameter(names={"-f", "--force"}, description="merge small tablets even if merging them to larger tablets might cause a split")
75      boolean force = false;
76      @Parameter(names={"-b", "--begin"}, description="start tablet", converter=TextConverter.class)
77      Text begin = null;
78      @Parameter(names={"-e", "--end"}, description="end tablet", converter=TextConverter.class)
79      Text end = null;
80    }
81    
82    public void start(String[] args) throws MergeException, ParseException {
83      Opts opts = new Opts();
84      opts.parseArgs(Merge.class.getCanonicalName(), args);
85      
86      try {
87        Connector conn = opts.getConnector();
88        
89        if (!conn.tableOperations().exists(opts.tableName)) {
90          System.err.println("table " + opts.tableName + " does not exist");
91          return;
92        }
93        if (opts.goalSize == null || opts.goalSize < 1) {
94          AccumuloConfiguration tableConfig = new ConfigurationCopy(conn.tableOperations().getProperties(opts.tableName));
95          opts.goalSize = tableConfig.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
96        }
97        
98        message("Merging tablets in table %s to %d bytes", opts.tableName, opts.goalSize);
99        mergomatic(conn, opts.tableName, opts.begin, opts.end, opts.goalSize, opts.force);
100     } catch (Exception ex) {
101       throw new MergeException(ex);
102     }
103   }
104   
105   public static void main(String[] args) throws MergeException, ParseException {
106     Merge merge = new Merge();
107     merge.start(args);
108   }
109   
110   public static class Size {
111     public Size(KeyExtent extent, long size) {
112       this.extent = extent;
113       this.size = size;
114     }
115     
116     KeyExtent extent;
117     long size;
118   }
119   
120   public void mergomatic(Connector conn, String table, Text start, Text end, long goalSize, boolean force) throws MergeException {
121     try {
122       if (table.equals(Constants.METADATA_TABLE_NAME)) {
123         throw new IllegalArgumentException("cannot merge tablets on the metadata table");
124       }
125       List<Size> sizes = new ArrayList<Size>();
126       long totalSize = 0;
127       // Merge any until you get larger than the goal size, and then merge one less tablet
128       Iterator<Size> sizeIterator = getSizeIterator(conn, table, start, end);
129       while (sizeIterator.hasNext()) {
130         Size next = sizeIterator.next();
131         totalSize += next.size;
132         sizes.add(next);
133         if (totalSize > goalSize) {
134           totalSize = mergeMany(conn, table, sizes, goalSize, force, false);
135         }
136       }
137       if (sizes.size() > 1)
138         mergeMany(conn, table, sizes, goalSize, force, true);
139     } catch (Exception ex) {
140       throw new MergeException(ex);
141     }
142   }
143   
144   protected long mergeMany(Connector conn, String table, List<Size> sizes, long goalSize, boolean force, boolean last) throws MergeException {
145     // skip the big tablets, which will be the typical case
146     while (!sizes.isEmpty()) {
147       if (sizes.get(0).size < goalSize)
148         break;
149       sizes.remove(0);
150     }
151     if (sizes.isEmpty()) {
152       return 0;
153     }
154     
155     // collect any small ones
156     long mergeSize = 0;
157     int numToMerge = 0;
158     for (int i = 0; i < sizes.size(); i++) {
159       if (mergeSize + sizes.get(i).size > goalSize) {
160         numToMerge = i;
161         break;
162       }
163       mergeSize += sizes.get(i).size;
164     }
165     
166     if (numToMerge > 1) {
167       mergeSome(conn, table, sizes, numToMerge);
168     } else {
169       if (numToMerge == 1 && sizes.size() > 1) {
170         // here we have the case of a merge candidate that is surrounded by candidates that would split
171         if (force) {
172           mergeSome(conn, table, sizes, 2);
173         } else {
174           sizes.remove(0);
175         }
176       }
177     }
178     if (numToMerge == 0 && sizes.size() > 1 && last) {
179       // That's the last tablet, and we have a bunch to merge
180       mergeSome(conn, table, sizes, sizes.size());
181     }
182     long result = 0;
183     for (Size s : sizes) {
184       result += s.size;
185     }
186     return result;
187   }
188   
189   protected void mergeSome(Connector conn, String table, List<Size> sizes, int numToMerge) throws MergeException {
190     merge(conn, table, sizes, numToMerge);
191     for (int i = 0; i < numToMerge; i++) {
192       sizes.remove(0);
193     }
194   }
195   
196   protected void merge(Connector conn, String table, List<Size> sizes, int numToMerge) throws MergeException {
197     try {
198       Text start = sizes.get(0).extent.getPrevEndRow();
199       Text end = sizes.get(numToMerge - 1).extent.getEndRow();
200       message("Merging %d tablets from (%s to %s]", numToMerge, start == null ? "-inf" : start, end == null ? "+inf" : end);
201       conn.tableOperations().merge(table, start, end);
202     } catch (Exception ex) {
203       throw new MergeException(ex);
204     }
205   }
206   
207   protected Iterator<Size> getSizeIterator(Connector conn, String tablename, Text start, Text end) throws MergeException {
208     // open up the !METADATA table, walk through the tablets.
209     String tableId;
210     Scanner scanner;
211     try {
212       tableId = Tables.getTableId(conn.getInstance(), tablename);
213       scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
214     } catch (Exception e) {
215       throw new MergeException(e);
216     }
217     scanner.setRange(new KeyExtent(new Text(tableId), end, start).toMetadataRange());
218     scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
219     Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);
220     final Iterator<Entry<Key,Value>> iterator = scanner.iterator();
221     
222     Iterator<Size> result = new Iterator<Size>() {
223       Size next = fetch();
224       
225       @Override
226       public boolean hasNext() {
227         return next != null;
228       }
229       
230       private Size fetch() {
231         long tabletSize = 0;
232         while (iterator.hasNext()) {
233           Entry<Key,Value> entry = iterator.next();
234           Key key = entry.getKey();
235           if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
236             String[] sizeEntries = new String(entry.getValue().get()).split(",");
237             if (sizeEntries.length == 2) {
238               tabletSize += Long.parseLong(sizeEntries[0]);
239             }
240           } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) {
241             KeyExtent extent = new KeyExtent(key.getRow(), entry.getValue());
242             return new Size(extent, tabletSize);
243           }
244         }
245         return null;
246       }
247       
248       @Override
249       public Size next() {
250         Size result = next;
251         next = fetch();
252         return result;
253       }
254       
255       @Override
256       public void remove() {
257         throw new UnsupportedOperationException();
258       }
259     };
260     return result;
261   }
262   
263 }