1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce.replication;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.conf.Configured;
27 import org.apache.hadoop.hbase.Abortable;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.Connection;
32 import org.apache.hadoop.hbase.client.ConnectionFactory;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.ResultScanner;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
40 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
41 import org.apache.hadoop.hbase.mapreduce.TableMapper;
42 import org.apache.hadoop.hbase.mapreduce.TableSplit;
43 import org.apache.hadoop.hbase.replication.ReplicationException;
44 import org.apache.hadoop.hbase.replication.ReplicationFactory;
45 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
46 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
47 import org.apache.hadoop.hbase.replication.ReplicationPeers;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.Pair;
50 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
51 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
52 import org.apache.hadoop.mapreduce.Job;
53 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
54 import org.apache.hadoop.util.Tool;
55 import org.apache.hadoop.util.ToolRunner;
56
57
58
59
60
61
62
63
64
65
66
67 public class VerifyReplication extends Configured implements Tool {
68
69 private static final Log LOG =
70 LogFactory.getLog(VerifyReplication.class);
71
72 public final static String NAME = "verifyrep";
73 static long startTime = 0;
74 static long endTime = Long.MAX_VALUE;
75 static int versions = -1;
76 static String tableName = null;
77 static String families = null;
78 static String peerId = null;
79
80 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
81
82
83
84
85 public static class Verifier
86 extends TableMapper<ImmutableBytesWritable, Put> {
87
88
89
90 public static enum Counters {
91 GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
92
93 private Connection connection;
94 private Table replicatedTable;
95 private ResultScanner replicatedScanner;
96 private Result currentCompareRowInPeerTable;
97
98
99
100
101
102
103
104
105
106 @Override
107 public void map(ImmutableBytesWritable row, final Result value,
108 Context context)
109 throws IOException {
110 if (replicatedScanner == null) {
111 Configuration conf = context.getConfiguration();
112 final Scan scan = new Scan();
113 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
114 long startTime = conf.getLong(NAME + ".startTime", 0);
115 long endTime = conf.getLong(NAME + ".endTime", Long.MAX_VALUE);
116 String families = conf.get(NAME + ".families", null);
117 if(families != null) {
118 String[] fams = families.split(",");
119 for(String fam : fams) {
120 scan.addFamily(Bytes.toBytes(fam));
121 }
122 }
123 scan.setTimeRange(startTime, endTime);
124 if (versions >= 0) {
125 scan.setMaxVersions(versions);
126 }
127
128 final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
129
130 String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
131 Configuration peerConf = HBaseConfiguration.create(conf);
132 ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
133
134 TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
135 connection = ConnectionFactory.createConnection(peerConf);
136 replicatedTable = connection.getTable(tableName);
137 scan.setStartRow(value.getRow());
138 scan.setStopRow(tableSplit.getEndRow());
139 replicatedScanner = replicatedTable.getScanner(scan);
140 currentCompareRowInPeerTable = replicatedScanner.next();
141 }
142 while (true) {
143 if (currentCompareRowInPeerTable == null) {
144
145 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
146 break;
147 }
148 int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
149 if (rowCmpRet == 0) {
150
151 try {
152 Result.compareResults(value, currentCompareRowInPeerTable);
153 context.getCounter(Counters.GOODROWS).increment(1);
154 } catch (Exception e) {
155 logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
156 }
157 currentCompareRowInPeerTable = replicatedScanner.next();
158 break;
159 } else if (rowCmpRet < 0) {
160
161 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
162 break;
163 } else {
164
165 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
166 currentCompareRowInPeerTable);
167 currentCompareRowInPeerTable = replicatedScanner.next();
168 }
169 }
170 }
171
172 private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
173 context.getCounter(counter).increment(1);
174 context.getCounter(Counters.BADROWS).increment(1);
175 LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
176 }
177
178 @Override
179 protected void cleanup(Context context) {
180 if (replicatedScanner != null) {
181 try {
182 while (currentCompareRowInPeerTable != null) {
183 logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
184 currentCompareRowInPeerTable);
185 currentCompareRowInPeerTable = replicatedScanner.next();
186 }
187 } catch (Exception e) {
188 LOG.error("fail to scan peer table in cleanup", e);
189 } finally {
190 replicatedScanner.close();
191 replicatedScanner = null;
192 }
193 }
194 if(replicatedTable != null){
195 try{
196 replicatedTable.close();
197 } catch (Exception e) {
198 LOG.error("fail to close table in cleanup", e);
199 }
200 }
201 if(connection != null){
202 try {
203 connection.close();
204 } catch (Exception e) {
205 LOG.error("fail to close connection in cleanup", e);
206 }
207 }
208 }
209 }
210
211 private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
212 ZooKeeperWatcher localZKW = null;
213 ReplicationPeerZKImpl peer = null;
214 try {
215 localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
216 new Abortable() {
217 @Override public void abort(String why, Throwable e) {}
218 @Override public boolean isAborted() {return false;}
219 });
220
221 ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
222 rp.init();
223
224 Pair<ReplicationPeerConfig, Configuration> pair = rp.getPeerConf(peerId);
225 if (pair == null) {
226 throw new IOException("Couldn't get peer conf!");
227 }
228 Configuration peerConf = rp.getPeerConf(peerId).getSecond();
229 return ZKUtil.getZooKeeperClusterKey(peerConf);
230 } catch (ReplicationException e) {
231 throw new IOException(
232 "An error occured while trying to connect to the remove peer cluster", e);
233 } finally {
234 if (peer != null) {
235 peer.close();
236 }
237 if (localZKW != null) {
238 localZKW.close();
239 }
240 }
241 }
242
243
244
245
246
247
248
249
250
251 public static Job createSubmittableJob(Configuration conf, String[] args)
252 throws IOException {
253 if (!doCommandLine(args)) {
254 return null;
255 }
256 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
257 HConstants.REPLICATION_ENABLE_DEFAULT)) {
258 throw new IOException("Replication needs to be enabled to verify it.");
259 }
260 conf.set(NAME+".peerId", peerId);
261 conf.set(NAME+".tableName", tableName);
262 conf.setLong(NAME+".startTime", startTime);
263 conf.setLong(NAME+".endTime", endTime);
264 if (families != null) {
265 conf.set(NAME+".families", families);
266 }
267
268 String peerQuorumAddress = getPeerQuorumAddress(conf);
269 conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
270 LOG.info("Peer Quorum Address: " + peerQuorumAddress);
271
272 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
273 job.setJarByClass(VerifyReplication.class);
274
275 Scan scan = new Scan();
276 scan.setTimeRange(startTime, endTime);
277 if (versions >= 0) {
278 scan.setMaxVersions(versions);
279 }
280 if(families != null) {
281 String[] fams = families.split(",");
282 for(String fam : fams) {
283 scan.addFamily(Bytes.toBytes(fam));
284 }
285 }
286 TableMapReduceUtil.initTableMapperJob(tableName, scan,
287 Verifier.class, null, null, job);
288
289
290 TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
291
292 job.setOutputFormatClass(NullOutputFormat.class);
293 job.setNumReduceTasks(0);
294 return job;
295 }
296
297 private static boolean doCommandLine(final String[] args) {
298 if (args.length < 2) {
299 printUsage(null);
300 return false;
301 }
302 try {
303 for (int i = 0; i < args.length; i++) {
304 String cmd = args[i];
305 if (cmd.equals("-h") || cmd.startsWith("--h")) {
306 printUsage(null);
307 return false;
308 }
309
310 final String startTimeArgKey = "--starttime=";
311 if (cmd.startsWith(startTimeArgKey)) {
312 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
313 continue;
314 }
315
316 final String endTimeArgKey = "--endtime=";
317 if (cmd.startsWith(endTimeArgKey)) {
318 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
319 continue;
320 }
321
322 final String versionsArgKey = "--versions=";
323 if (cmd.startsWith(versionsArgKey)) {
324 versions = Integer.parseInt(cmd.substring(versionsArgKey.length()));
325 continue;
326 }
327
328 final String familiesArgKey = "--families=";
329 if (cmd.startsWith(familiesArgKey)) {
330 families = cmd.substring(familiesArgKey.length());
331 continue;
332 }
333
334 if (i == args.length-2) {
335 peerId = cmd;
336 }
337
338 if (i == args.length-1) {
339 tableName = cmd;
340 }
341 }
342 } catch (Exception e) {
343 e.printStackTrace();
344 printUsage("Can't start because " + e.getMessage());
345 return false;
346 }
347 return true;
348 }
349
350
351
352
353 private static void printUsage(final String errorMsg) {
354 if (errorMsg != null && errorMsg.length() > 0) {
355 System.err.println("ERROR: " + errorMsg);
356 }
357 System.err.println("Usage: verifyrep [--starttime=X]" +
358 " [--stoptime=Y] [--families=A] <peerid> <tablename>");
359 System.err.println();
360 System.err.println("Options:");
361 System.err.println(" starttime beginning of the time range");
362 System.err.println(" without endtime means from starttime to forever");
363 System.err.println(" endtime end of the time range");
364 System.err.println(" versions number of cell versions to verify");
365 System.err.println(" families comma-separated list of families to copy");
366 System.err.println();
367 System.err.println("Args:");
368 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
369 System.err.println(" tablename Name of the table to verify");
370 System.err.println();
371 System.err.println("Examples:");
372 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
373 System.err.println(" $ bin/hbase " +
374 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
375 " --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
376 }
377
378 @Override
379 public int run(String[] args) throws Exception {
380 Configuration conf = this.getConf();
381 Job job = createSubmittableJob(conf, args);
382 if (job != null) {
383 return job.waitForCompletion(true) ? 0 : 1;
384 }
385 return 1;
386 }
387
388
389
390
391
392
393
394 public static void main(String[] args) throws Exception {
395 int res = ToolRunner.run(HBaseConfiguration.create(), new VerifyReplication(), args);
396 System.exit(res);
397 }
398 }