1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Random;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configured;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.HBaseConfiguration;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35 import org.apache.hadoop.hbase.classification.InterfaceStability;
36 import org.apache.hadoop.hbase.client.Admin;
37 import org.apache.hadoop.hbase.client.Connection;
38 import org.apache.hadoop.hbase.client.ConnectionFactory;
39 import org.apache.hadoop.hbase.client.Scan;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.mapreduce.Job;
42 import org.apache.hadoop.util.Tool;
43 import org.apache.hadoop.util.ToolRunner;
44
45
46
47
48
49
50 @InterfaceAudience.Public
51 @InterfaceStability.Stable
52 public class CopyTable extends Configured implements Tool {
53 private static final Log LOG = LogFactory.getLog(CopyTable.class);
54
55 final static String NAME = "copytable";
56 long startTime = 0;
57 long endTime = 0;
58 int versions = -1;
59 String tableName = null;
60 String startRow = null;
61 String stopRow = null;
62 String dstTableName = null;
63 String peerAddress = null;
64 String families = null;
65 boolean allCells = false;
66 static boolean shuffle = false;
67
68 boolean bulkload = false;
69 Path bulkloadDir = null;
70
71 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
72
73
74
75
76
77
78
79
80 public Job createSubmittableJob(String[] args)
81 throws IOException {
82 if (!doCommandLine(args)) {
83 return null;
84 }
85
86 Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
87 job.setJarByClass(CopyTable.class);
88 Scan scan = new Scan();
89 scan.setCacheBlocks(false);
90 if (startTime != 0) {
91 scan.setTimeRange(startTime,
92 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
93 }
94 if (allCells) {
95 scan.setRaw(true);
96 }
97 if (shuffle) {
98 job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true");
99 }
100 if (versions >= 0) {
101 scan.setMaxVersions(versions);
102 }
103
104 if (startRow != null) {
105 scan.setStartRow(Bytes.toBytes(startRow));
106 }
107
108 if (stopRow != null) {
109 scan.setStopRow(Bytes.toBytes(stopRow));
110 }
111
112 if(families != null) {
113 String[] fams = families.split(",");
114 Map<String,String> cfRenameMap = new HashMap<String,String>();
115 for(String fam : fams) {
116 String sourceCf;
117 if(fam.contains(":")) {
118
119 String[] srcAndDest = fam.split(":", 2);
120 sourceCf = srcAndDest[0];
121 String destCf = srcAndDest[1];
122 cfRenameMap.put(sourceCf, destCf);
123 } else {
124
125 sourceCf = fam;
126 }
127 scan.addFamily(Bytes.toBytes(sourceCf));
128 }
129 Import.configureCfRenaming(job.getConfiguration(), cfRenameMap);
130 }
131 job.setNumReduceTasks(0);
132
133 if (bulkload) {
134 TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.KeyValueImporter.class, null,
135 null, job);
136
137
138 TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName));
139
140 FileSystem fs = FileSystem.get(getConf());
141 Random rand = new Random();
142 Path root = new Path(fs.getWorkingDirectory(), "copytable");
143 fs.mkdirs(root);
144 while (true) {
145 bulkloadDir = new Path(root, "" + rand.nextLong());
146 if (!fs.exists(bulkloadDir)) {
147 break;
148 }
149 }
150
151 System.out.println("HFiles will be stored at " + this.bulkloadDir);
152 HFileOutputFormat2.setOutputPath(job, bulkloadDir);
153 try (Connection conn = ConnectionFactory.createConnection(getConf());
154 Admin admin = conn.getAdmin()) {
155 HFileOutputFormat2.configureIncrementalLoadMap(job,
156 admin.getTableDescriptor((TableName.valueOf(dstTableName))));
157 }
158 } else {
159 TableMapReduceUtil.initTableMapperJob(tableName, scan,
160 Import.Importer.class, null, null, job);
161
162 TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null,
163 null);
164 }
165
166 return job;
167 }
168
169
170
171
172 private static void printUsage(final String errorMsg) {
173 if (errorMsg != null && errorMsg.length() > 0) {
174 System.err.println("ERROR: " + errorMsg);
175 }
176 System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " +
177 "[--new.name=NEW] [--peer.adr=ADR] <tablename>");
178 System.err.println();
179 System.err.println("Options:");
180 System.err.println(" rs.class hbase.regionserver.class of the peer cluster");
181 System.err.println(" specify if different from current cluster");
182 System.err.println(" rs.impl hbase.regionserver.impl of the peer cluster");
183 System.err.println(" startrow the start row");
184 System.err.println(" stoprow the stop row");
185 System.err.println(" starttime beginning of the time range (unixtime in millis)");
186 System.err.println(" without endtime means from starttime to forever");
187 System.err.println(" endtime end of the time range. Ignored if no starttime specified.");
188 System.err.println(" versions number of cell versions to copy");
189 System.err.println(" new.name new table's name");
190 System.err.println(" peer.adr Address of the peer cluster given in the format");
191 System.err.println(" hbase.zookeeper.quorum:hbase.zookeeper.client"
192 + ".port:zookeeper.znode.parent");
193 System.err.println(" families comma-separated list of families to copy");
194 System.err.println(" To copy from cf1 to cf2, give sourceCfName:destCfName. ");
195 System.err.println(" To keep the same name, just give \"cfName\"");
196 System.err.println(" all.cells also copy delete markers and deleted cells");
197 System.err.println(" bulkload Write input into HFiles and bulk load to the destination "
198 + "table");
199 System.err.println();
200 System.err.println("Args:");
201 System.err.println(" tablename Name of the table to copy");
202 System.err.println();
203 System.err.println("Examples:");
204 System.err.println(" To copy 'TestTable' to a cluster that uses replication for a 1 hour window:");
205 System.err.println(" $ bin/hbase " +
206 "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " +
207 "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable ");
208 System.err.println("For performance consider the following general option:\n"
209 + " It is recommended that you set the following to >=100. A higher value uses more memory but\n"
210 + " decreases the round trip time to the server and may increase performance.\n"
211 + " -Dhbase.client.scanner.caching=100\n"
212 + " The following should always be set to false, to prevent writing data twice, which may produce \n"
213 + " inaccurate results.\n"
214 + " -Dmapreduce.map.speculative=false");
215 }
216
217 private boolean doCommandLine(final String[] args) {
218
219
220 if (args.length < 1) {
221 printUsage(null);
222 return false;
223 }
224 try {
225 for (int i = 0; i < args.length; i++) {
226 String cmd = args[i];
227 if (cmd.equals("-h") || cmd.startsWith("--h")) {
228 printUsage(null);
229 return false;
230 }
231
232 final String startRowArgKey = "--startrow=";
233 if (cmd.startsWith(startRowArgKey)) {
234 startRow = cmd.substring(startRowArgKey.length());
235 continue;
236 }
237
238 final String stopRowArgKey = "--stoprow=";
239 if (cmd.startsWith(stopRowArgKey)) {
240 stopRow = cmd.substring(stopRowArgKey.length());
241 continue;
242 }
243
244 final String startTimeArgKey = "--starttime=";
245 if (cmd.startsWith(startTimeArgKey)) {
246 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
247 continue;
248 }
249
250 final String endTimeArgKey = "--endtime=";
251 if (cmd.startsWith(endTimeArgKey)) {
252 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
253 continue;
254 }
255
256 final String versionsArgKey = "--versions=";
257 if (cmd.startsWith(versionsArgKey)) {
258 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
259 continue;
260 }
261
262 final String newNameArgKey = "--new.name=";
263 if (cmd.startsWith(newNameArgKey)) {
264 dstTableName = cmd.substring(newNameArgKey.length());
265 continue;
266 }
267
268 final String peerAdrArgKey = "--peer.adr=";
269 if (cmd.startsWith(peerAdrArgKey)) {
270 peerAddress = cmd.substring(peerAdrArgKey.length());
271 continue;
272 }
273
274 final String familiesArgKey = "--families=";
275 if (cmd.startsWith(familiesArgKey)) {
276 families = cmd.substring(familiesArgKey.length());
277 continue;
278 }
279
280 if (cmd.startsWith("--all.cells")) {
281 allCells = true;
282 continue;
283 }
284
285 if (cmd.startsWith("--bulkload")) {
286 bulkload = true;
287 continue;
288 }
289
290 if (cmd.startsWith("--shuffle")) {
291 shuffle = true;
292 continue;
293 }
294
295 if (i == args.length-1) {
296 tableName = cmd;
297 } else {
298 printUsage("Invalid argument '" + cmd + "'");
299 return false;
300 }
301 }
302 if (dstTableName == null && peerAddress == null) {
303 printUsage("At least a new table name or a " +
304 "peer address must be specified");
305 return false;
306 }
307 if ((endTime != 0) && (startTime > endTime)) {
308 printUsage("Invalid time range filter: starttime=" + startTime + " > endtime=" + endTime);
309 return false;
310 }
311
312 if (bulkload && peerAddress != null) {
313 printUsage("Remote bulkload is not supported!");
314 return false;
315 }
316
317
318 if (dstTableName == null) {
319 dstTableName = tableName;
320 }
321 } catch (Exception e) {
322 e.printStackTrace();
323 printUsage("Can't start because " + e.getMessage());
324 return false;
325 }
326 return true;
327 }
328
329
330
331
332
333
334
335 public static void main(String[] args) throws Exception {
336 int ret = ToolRunner.run(HBaseConfiguration.create(), new CopyTable(), args);
337 System.exit(ret);
338 }
339
340 @Override
341 public int run(String[] args) throws Exception {
342 Job job = createSubmittableJob(args);
343 if (job == null) return 1;
344 if (!job.waitForCompletion(true)) {
345 LOG.info("Map-reduce job failed!");
346 if (bulkload) {
347 LOG.info("Files are not bulkloaded!");
348 }
349 return 1;
350 }
351 int code = 0;
352 if (bulkload) {
353 code = new LoadIncrementalHFiles(this.getConf()).run(new String[]{this.bulkloadDir.toString(),
354 this.dstTableName});
355 if (code == 0) {
356
357
358 FileSystem fs = FileSystem.get(this.getConf());
359 if (!fs.delete(this.bulkloadDir, true)) {
360 LOG.error("Deleting folder " + bulkloadDir + " failed!");
361 code = 1;
362 }
363 }
364 }
365 return code;
366 }
367 }