1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.HConstants;
26 import org.apache.hadoop.hbase.classification.InterfaceAudience;
27 import org.apache.hadoop.hbase.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.conf.Configured;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HBaseConfiguration;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.filter.CompareFilter;
37 import org.apache.hadoop.hbase.filter.Filter;
38 import org.apache.hadoop.hbase.filter.PrefixFilter;
39 import org.apache.hadoop.hbase.filter.RegexStringComparator;
40 import org.apache.hadoop.hbase.filter.RowFilter;
41 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
42 import org.apache.hadoop.hbase.util.Bytes;
43 import org.apache.hadoop.io.IntWritable;
44 import org.apache.hadoop.io.Text;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.mapreduce.Reducer;
47 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
48 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
49 import org.apache.hadoop.util.Tool;
50 import org.apache.hadoop.util.ToolRunner;
51
52 import com.google.common.base.Preconditions;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 @InterfaceAudience.Public
73 @InterfaceStability.Stable
74 public class CellCounter extends Configured implements Tool {
75 private static final Log LOG =
76 LogFactory.getLog(CellCounter.class.getName());
77
78
79
80
81
82 static final String NAME = "CellCounter";
83
84 private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
85
86
87
88
89 static class CellCounterMapper
90 extends TableMapper<Text, IntWritable> {
91
92
93
94 public static enum Counters {
95 ROWS
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109 @Override
110 public void map(ImmutableBytesWritable row, Result values,
111 Context context)
112 throws IOException {
113 Preconditions.checkState(values != null,
114 "values passed to the map is null");
115 String currentFamilyName = null;
116 String currentQualifierName = null;
117 String currentRowKey = null;
118 Configuration config = context.getConfiguration();
119 String separator = config.get("ReportSeparator",":");
120 try {
121 context.getCounter(Counters.ROWS).increment(1);
122 context.write(new Text("Total ROWS"), new IntWritable(1));
123
124 for (Cell value : values.listCells()) {
125 currentRowKey = Bytes.toStringBinary(CellUtil.cloneRow(value));
126 String thisRowFamilyName = Bytes.toStringBinary(CellUtil.cloneFamily(value));
127 if (!thisRowFamilyName.equals(currentFamilyName)) {
128 currentFamilyName = thisRowFamilyName;
129 context.getCounter("CF", thisRowFamilyName).increment(1);
130 if (1 == context.getCounter("CF", thisRowFamilyName).getValue()) {
131 context.write(new Text("Total Families Across all Rows"), new IntWritable(1));
132 context.write(new Text(thisRowFamilyName), new IntWritable(1));
133 }
134 }
135 String thisRowQualifierName = thisRowFamilyName + separator
136 + Bytes.toStringBinary(CellUtil.cloneQualifier(value));
137 if (!thisRowQualifierName.equals(currentQualifierName)) {
138 currentQualifierName = thisRowQualifierName;
139 context.getCounter("CFQL", thisRowQualifierName).increment(1);
140 context.write(new Text("Total Qualifiers across all Rows"),
141 new IntWritable(1));
142 context.write(new Text(thisRowQualifierName), new IntWritable(1));
143
144 context.getCounter("QL_VERSIONS", currentRowKey + separator +
145 thisRowQualifierName).increment(1);
146 context.write(new Text(currentRowKey + separator
147 + thisRowQualifierName + "_Versions"), new IntWritable(1));
148
149 } else {
150
151 currentQualifierName = thisRowQualifierName;
152 context.getCounter("QL_VERSIONS", currentRowKey + separator +
153 thisRowQualifierName).increment(1);
154 context.write(new Text(currentRowKey + separator
155 + thisRowQualifierName + "_Versions"), new IntWritable(1));
156 }
157 }
158 } catch (InterruptedException e) {
159 e.printStackTrace();
160 }
161 }
162 }
163
164 static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
165 Key, IntWritable> {
166
167 private IntWritable result = new IntWritable();
168 public void reduce(Key key, Iterable<IntWritable> values,
169 Context context)
170 throws IOException, InterruptedException {
171 int sum = 0;
172 for (IntWritable val : values) {
173 sum += val.get();
174 }
175 result.set(sum);
176 context.write(key, result);
177 }
178 }
179
180
181
182
183
184
185
186
187
188 public static Job createSubmittableJob(Configuration conf, String[] args)
189 throws IOException {
190 String tableName = args[0];
191 Path outputDir = new Path(args[1]);
192 String reportSeparatorString = (args.length > 2) ? args[2]: ":";
193 conf.set("ReportSeparator", reportSeparatorString);
194 Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
195 job.setJarByClass(CellCounter.class);
196 Scan scan = getConfiguredScanForJob(conf, args);
197 TableMapReduceUtil.initTableMapperJob(tableName, scan,
198 CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
199 job.setNumReduceTasks(1);
200 job.setMapOutputKeyClass(Text.class);
201 job.setMapOutputValueClass(IntWritable.class);
202 job.setOutputFormatClass(TextOutputFormat.class);
203 job.setOutputKeyClass(Text.class);
204 job.setOutputValueClass(IntWritable.class);
205 FileOutputFormat.setOutputPath(job, outputDir);
206 job.setReducerClass(IntSumReducer.class);
207 return job;
208 }
209
210 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
211 Scan s = new Scan();
212
213 s.setMaxVersions(Integer.MAX_VALUE);
214 s.setCacheBlocks(false);
215
216 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
217 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
218 }
219
220 Filter rowFilter = getRowFilter(args);
221 if (rowFilter!= null) {
222 LOG.info("Setting Row Filter for counter.");
223 s.setFilter(rowFilter);
224 }
225
226 long timeRange[] = getTimeRange(args);
227 if (timeRange != null) {
228 LOG.info("Setting TimeRange for counter.");
229 s.setTimeRange(timeRange[0], timeRange[1]);
230 }
231 return s;
232 }
233
234
235 private static Filter getRowFilter(String[] args) {
236 Filter rowFilter = null;
237 String filterCriteria = (args.length > 3) ? args[3]: null;
238 if (filterCriteria == null) return null;
239 if (filterCriteria.startsWith("^")) {
240 String regexPattern = filterCriteria.substring(1, filterCriteria.length());
241 rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
242 } else {
243 rowFilter = new PrefixFilter(Bytes.toBytes(filterCriteria));
244 }
245 return rowFilter;
246 }
247
248 private static long[] getTimeRange(String[] args) throws IOException {
249 final String startTimeArgKey = "--starttime=";
250 final String endTimeArgKey = "--endtime=";
251 long startTime = 0L;
252 long endTime = 0L;
253
254 for (int i = 1; i < args.length; i++) {
255 System.out.println("i:" + i + "arg[i]" + args[i]);
256 if (args[i].startsWith(startTimeArgKey)) {
257 startTime = Long.parseLong(args[i].substring(startTimeArgKey.length()));
258 }
259 if (args[i].startsWith(endTimeArgKey)) {
260 endTime = Long.parseLong(args[i].substring(endTimeArgKey.length()));
261 }
262 }
263
264 if (startTime == 0 && endTime == 0)
265 return null;
266
267 endTime = endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime;
268 return new long [] {startTime, endTime};
269 }
270
271 @Override
272 public int run(String[] args) throws Exception {
273 if (args.length < 2) {
274 System.err.println("ERROR: Wrong number of parameters: " + args.length);
275 System.err.println("Usage: CellCounter ");
276 System.err.println(" <tablename> <outputDir> <reportSeparator> [^[regex pattern] or " +
277 "[Prefix] for row filter]] --starttime=[starttime] --endtime=[endtime]");
278 System.err.println(" Note: -D properties will be applied to the conf used. ");
279 System.err.println(" Additionally, the following SCAN properties can be specified");
280 System.err.println(" to get fine grained control on what is counted..");
281 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
282 System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
283 "string : used to separate the rowId/column family name and qualifier name.");
284 System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
285 "operation to a limited subset of rows from the table based on regex or prefix pattern.");
286 return -1;
287 }
288 Job job = createSubmittableJob(getConf(), args);
289 return (job.waitForCompletion(true) ? 0 : 1);
290 }
291
292
293
294
295
296
297 public static void main(String[] args) throws Exception {
298 int errCode = ToolRunner.run(HBaseConfiguration.create(), new CellCounter(), args);
299 System.exit(errCode);
300 }
301
302 }