1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.accumulo.core.file.rfile;
18
19 import java.util.ArrayList;
20 import java.util.List;
21
22 import org.apache.accumulo.core.cli.Help;
23 import org.apache.accumulo.core.conf.DefaultConfiguration;
24 import org.apache.accumulo.core.conf.Property;
25 import org.apache.accumulo.core.data.ByteSequence;
26 import org.apache.accumulo.core.data.Key;
27 import org.apache.accumulo.core.data.Range;
28 import org.apache.accumulo.core.data.Value;
29 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
30 import org.apache.accumulo.core.file.rfile.RFile.Reader;
31 import org.apache.accumulo.core.file.rfile.RFile.Writer;
32 import org.apache.accumulo.core.util.CachedConfiguration;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36
37 import com.beust.jcommander.Parameter;
38
39 /**
40 * Split an RFile into large and small key/value files.
41 *
42 */
43 public class SplitLarge {
44
45 static class Opts extends Help {
46 @Parameter(names="-m", description="the maximum size of the key/value pair to shunt to the small file")
47 long maxSize = 10 * 1024 * 1024;
48 @Parameter(description="<file.rf> { <file.rf> ... }")
49 List<String> files = new ArrayList<String>();
50 }
51
52
53 public static void main(String[] args) throws Exception {
54 Configuration conf = CachedConfiguration.getInstance();
55 FileSystem fs = FileSystem.get(conf);
56 long maxSize = 10 * 1024 * 1024;
57 Opts opts = new Opts();
58 opts.parseArgs(SplitLarge.class.getName(), args);
59
60 for (String file : opts.files) {
61 Path path = new Path(file);
62 CachableBlockFile.Reader rdr = new CachableBlockFile.Reader(fs, path, conf, null, null);
63 Reader iter = new RFile.Reader(rdr);
64
65 if (!file.endsWith(".rf")) {
66 throw new IllegalArgumentException("File must end with .rf");
67 }
68 String smallName = file.substring(0, file.length() - 3) + "_small.rf";
69 String largeName = file.substring(0, file.length() - 3) + "_large.rf";
70
71 int blockSize = (int) DefaultConfiguration.getDefaultConfiguration().getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
72 Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", conf), blockSize);
73 small.startDefaultLocalityGroup();
74 Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", conf), blockSize);
75 large.startDefaultLocalityGroup();
76
77 iter.seek(new Range(), new ArrayList<ByteSequence>(), false);
78 while (iter.hasTop()) {
79 Key key = iter.getTopKey();
80 Value value = iter.getTopValue();
81 if (key.getSize() + value.getSize() < maxSize) {
82 small.append(key, value);
83 } else {
84 large.append(key, value);
85 }
86 iter.next();
87 }
88
89 iter.close();
90 large.close();
91 small.close();
92 }
93 }
94
95 }