1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer.test.demux;
20
21 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
22 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.AbstractProcessor;
23 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
24 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
25 import org.apache.hadoop.mapred.OutputCollector;
26 import org.apache.hadoop.mapred.Reporter;
27 import org.apache.log4j.Logger;
28
29 @Table(name="Test",columnFamily="TestColumnFamily")
30 public class TextParser extends AbstractProcessor {
31 static Logger log = Logger.getLogger(TextParser.class);
32 public static final String reduceType = "TestColumnFamily";
33 public final String recordType = this.getClass().getName();
34
35 public TextParser() {
36 }
37
38 public String getDataType() {
39 return recordType;
40 }
41
42 @Override
43 protected void parse(String recordEntry,
44 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
45 throws Throwable {
46 ChukwaRecord record = new ChukwaRecord();
47 String[] parts = recordEntry.split("\\s");
48 record.add("timestamp", parts[0]);
49 record.add(parts[1], parts[2]);
50 key.setKey(parts[0]+"/"+parts[1]+"/"+parts[0]);
51 long timestamp = Long.parseLong(parts[0]);
52 this.buildGenericRecord(record, null, timestamp, reduceType);
53 output.collect(key, record);
54 }
55 }