1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer.hbase;
20
21 import java.util.List;
22 import java.util.Timer;
23 import java.util.TimerTask;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
27 import org.apache.hadoop.chukwa.Chunk;
28 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
30 import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
31 import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
32 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
33 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
34 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
35 import org.apache.hadoop.chukwa.util.ClassUtils;
36 import org.apache.hadoop.chukwa.util.DaemonWatcher;
37 import org.apache.hadoop.chukwa.util.ExceptionUtil;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HColumnDescriptor;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.client.HBaseAdmin;
42 import org.apache.hadoop.hbase.client.HTableInterface;
43 import org.apache.hadoop.hbase.client.HTablePool;
44 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
45 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
46 import org.apache.log4j.Logger;
47
48 public class HBaseWriter extends PipelineableWriter {
49 static Logger log = Logger.getLogger(HBaseWriter.class);
50 boolean reportStats;
51 volatile long dataSize = 0;
52 final Timer statTimer;
53 private OutputCollector output;
54 private Reporter reporter;
55 private ChukwaConfiguration conf = new ChukwaConfiguration();
56 String defaultProcessor = conf.get(
57 "chukwa.demux.mapper.default.processor",
58 "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
59 private HTablePool pool;
60 private Configuration hconf;
61
62 private class StatReportingTask extends TimerTask {
63 private long lastTs = System.currentTimeMillis();
64 private long lastDataSize = 0;
65
66 public void run() {
67 long time = System.currentTimeMillis();
68 long interval = time - lastTs;
69 lastTs = time;
70
71 long ds = dataSize;
72 long dataRate = 1000 * (ds - lastDataSize) / interval;
73
74 lastDataSize = ds;
75
76 log.info("stat=HBaseWriter|dataRate="
77 + dataRate);
78 }
79 };
80
81 public HBaseWriter() {
82 this(true);
83 }
84
85 public HBaseWriter(boolean reportStats) {
86 this.reportStats = reportStats;
87 statTimer = new Timer();
88
89
90
91
92 hconf = HBaseConfiguration.create();
93 }
94
95 public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
96 this(true);
97 this.conf = conf;
98 this.hconf = hconf;
99 }
100
101 public void close() {
102 statTimer.cancel();
103 }
104
105 public void init(Configuration conf) throws WriterException {
106 statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
107 output = new OutputCollector();
108 reporter = new Reporter();
109 if(conf.getBoolean("hbase.writer.verify.schema", false)) {
110 verifyHbaseSchema();
111 }
112 pool = new HTablePool(hconf, 60);
113 }
114
115 private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
116 boolean status = false;
117 try {
118 if(admin.tableExists(table.name())) {
119 HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
120 HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
121 for(HColumnDescriptor cd : columnDescriptors) {
122 if(cd.getNameAsString().equals(table.columnFamily())) {
123 log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
124 status = true;
125 }
126 }
127 } else {
128 throw new Exception("HBase table: "+table.name()+ " does not exist.");
129 }
130 } catch(Exception e) {
131 log.error(ExceptionUtil.getStackTrace(e));
132 status = false;
133 }
134 return status;
135 }
136
137 private void verifyHbaseSchema() {
138 log.debug("Verify Demux parser with HBase schema");
139 boolean schemaVerified = true;
140 try {
141 HBaseAdmin admin = new HBaseAdmin(hconf);
142 List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
143 for(Class<?> x : demuxParsers) {
144 if(x.isAnnotationPresent(Tables.class)) {
145 Tables list = x.getAnnotation(Tables.class);
146 for(Table table : list.annotations()) {
147 if(!verifyHbaseTable(admin, table)) {
148 schemaVerified = false;
149 log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
150 }
151 }
152 } else if(x.isAnnotationPresent(Table.class)) {
153 Table table = x.getAnnotation(Table.class);
154 if(!verifyHbaseTable(admin, table)) {
155 schemaVerified = false;
156 log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
157 }
158 }
159 }
160 } catch (Exception e) {
161 schemaVerified = false;
162 log.error(ExceptionUtil.getStackTrace(e));
163 }
164 if(!schemaVerified) {
165 log.error("Hbase schema mismatch with demux parser.");
166 if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) {
167 log.error("Exiting...");
168 DaemonWatcher.bailout(-1);
169 }
170 }
171 }
172
173 @Override
174 public CommitStatus add(List<Chunk> chunks) throws WriterException {
175 CommitStatus rv = ChukwaWriter.COMMIT_OK;
176 try {
177 for(Chunk chunk : chunks) {
178 String processorClass = conf.get(chunk.getDataType(),
179 defaultProcessor);
180 synchronized (this) {
181 MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
182 try {
183 Table table = null;
184 if(processor.getClass().isAnnotationPresent(Table.class)) {
185 table = processor.getClass().getAnnotation(Table.class);
186 } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
187 Tables tables = processor.getClass().getAnnotation(Tables.class);
188 for(Table t : tables.annotations()) {
189 table = t;
190 }
191 }
192 if(table!=null) {
193 HTableInterface hbase = pool.getTable(table.name().getBytes());
194 processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
195 hbase.put(output.getKeyValues());
196 pool.putTable(hbase);
197 }
198 } catch (Exception e) {
199 log.warn(output.getKeyValues());
200 log.warn(ExceptionUtil.getStackTrace(e));
201 }
202 dataSize += chunk.getData().length;
203 output.clear();
204 reporter.clear();
205 }
206 }
207 } catch (Exception e) {
208 log.error(ExceptionUtil.getStackTrace(e));
209 throw new WriterException("Failed to store data to HBase.");
210 }
211 if (next != null) {
212 rv = next.add(chunks);
213 }
214 return rv;
215 }
216
217 }