1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.conf.Configured;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.client.Result;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
34 import org.apache.hadoop.hbase.filter.Filter;
35 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
36 import org.apache.hadoop.hbase.filter.PrefixFilter;
37 import org.apache.hadoop.hbase.filter.RegexStringComparator;
38 import org.apache.hadoop.hbase.filter.RowFilter;
39 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
43 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
44 import org.apache.hadoop.util.Tool;
45 import org.apache.hadoop.util.ToolRunner;
46
47
48
49
50
51
52 @InterfaceAudience.Public
53 @InterfaceStability.Stable
54 public class Export extends Configured implements Tool {
55 private static final Log LOG = LogFactory.getLog(Export.class);
56 final static String NAME = "export";
57 final static String RAW_SCAN = "hbase.mapreduce.include.deleted.rows";
58 final static String EXPORT_BATCHING = "hbase.export.scanner.batch";
59
60 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
61
62
63
64
65
66
67
68
69
70 public static Job createSubmittableJob(Configuration conf, String[] args)
71 throws IOException {
72 String tableName = args[0];
73 Path outputDir = new Path(args[1]);
74 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
75 job.setJobName(NAME + "_" + tableName);
76 job.setJarByClass(Export.class);
77
78 Scan s = getConfiguredScanForJob(conf, args);
79 IdentityTableMapper.initJob(tableName, s, IdentityTableMapper.class, job);
80
81 job.setNumReduceTasks(0);
82 job.setOutputFormatClass(SequenceFileOutputFormat.class);
83 job.setOutputKeyClass(ImmutableBytesWritable.class);
84 job.setOutputValueClass(Result.class);
85 FileOutputFormat.setOutputPath(job, outputDir);
86 return job;
87 }
88
89 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
90 Scan s = new Scan();
91
92
93 int versions = args.length > 2? Integer.parseInt(args[2]): 1;
94 s.setMaxVersions(versions);
95
96 long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
97 long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
98 s.setTimeRange(startTime, endTime);
99
100 s.setCacheBlocks(false);
101
102 if (conf.get(TableInputFormat.SCAN_ROW_START) != null) {
103 s.setStartRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_START)));
104 }
105 if (conf.get(TableInputFormat.SCAN_ROW_STOP) != null) {
106 s.setStopRow(Bytes.toBytes(conf.get(TableInputFormat.SCAN_ROW_STOP)));
107 }
108
109 boolean raw = Boolean.parseBoolean(conf.get(RAW_SCAN));
110 if (raw) {
111 s.setRaw(raw);
112 }
113
114 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
115 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
116 }
117
118 Filter exportFilter = getExportFilter(args);
119 if (exportFilter!= null) {
120 LOG.info("Setting Scan Filter for Export.");
121 s.setFilter(exportFilter);
122 }
123
124 int batching = conf.getInt(EXPORT_BATCHING, -1);
125 if (batching != -1){
126 try {
127 s.setBatch(batching);
128 } catch (IncompatibleFilterException e) {
129 LOG.error("Batching could not be set", e);
130 }
131 }
132 LOG.info("versions=" + versions + ", starttime=" + startTime +
133 ", endtime=" + endTime + ", keepDeletedCells=" + raw);
134 return s;
135 }
136
137 private static Filter getExportFilter(String[] args) {
138 Filter exportFilter = null;
139 String filterCriteria = (args.length > 5) ? args[5]: null;
140 if (filterCriteria == null) return null;
141 if (filterCriteria.startsWith("^")) {
142 String regexPattern = filterCriteria.substring(1, filterCriteria.length());
143 exportFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator(regexPattern));
144 } else {
145 exportFilter = new PrefixFilter(Bytes.toBytes(filterCriteria));
146 }
147 return exportFilter;
148 }
149
150
151
152
153 private static void usage(final String errorMsg) {
154 if (errorMsg != null && errorMsg.length() > 0) {
155 System.err.println("ERROR: " + errorMsg);
156 }
157 System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
158 "[<starttime> [<endtime>]] [^[regex pattern] or [Prefix] to filter]]\n");
159 System.err.println(" Note: -D properties will be applied to the conf used. ");
160 System.err.println(" For example: ");
161 System.err.println(" -D mapreduce.output.fileoutputformat.compress=true");
162 System.err.println(" -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec");
163 System.err.println(" -D mapreduce.output.fileoutputformat.compress.type=BLOCK");
164 System.err.println(" Additionally, the following SCAN properties can be specified");
165 System.err.println(" to control/limit what is exported..");
166 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
167 System.err.println(" -D " + RAW_SCAN + "=true");
168 System.err.println(" -D " + TableInputFormat.SCAN_ROW_START + "=<ROWSTART>");
169 System.err.println(" -D " + TableInputFormat.SCAN_ROW_STOP + "=<ROWSTOP>");
170 System.err.println(" -D " + JOB_NAME_CONF_KEY
171 + "=jobName - use the specified mapreduce job name for the export");
172 System.err.println("For performance consider the following properties:\n"
173 + " -Dhbase.client.scanner.caching=100\n"
174 + " -Dmapreduce.map.speculative=false\n"
175 + " -Dmapreduce.reduce.speculative=false");
176 System.err.println("For tables with very wide rows consider setting the batch size as below:\n"
177 + " -D" + EXPORT_BATCHING + "=10");
178 }
179
180
181 @Override
182 public int run(String[] args) throws Exception {
183 if (args.length < 2) {
184 usage("Wrong number of arguments: " + args.length);
185 return -1;
186 }
187 Job job = createSubmittableJob(getConf(), args);
188 return (job.waitForCompletion(true) ? 0 : 1);
189 }
190
191
192
193
194
195
196 public static void main(String[] args) throws Exception {
197 int errCode = ToolRunner.run(HBaseConfiguration.create(), new Export(), args);
198 System.exit(errCode);
199 }
200 }