View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.writer.hbase;
20  
21  import java.util.List;
22  import java.util.Timer;
23  import java.util.TimerTask;
24  
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
27  import org.apache.hadoop.chukwa.Chunk;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
30  import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
31  import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
32  import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus;
33  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
34  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
35  import org.apache.hadoop.chukwa.util.ClassUtils;
36  import org.apache.hadoop.chukwa.util.DaemonWatcher;
37  import org.apache.hadoop.chukwa.util.ExceptionUtil;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HColumnDescriptor;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.client.HBaseAdmin;
42  import org.apache.hadoop.hbase.client.HTableInterface;
43  import org.apache.hadoop.hbase.client.HTablePool;
44  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
45  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
46  import org.apache.log4j.Logger;
47  
48  public class HBaseWriter extends PipelineableWriter {
49    static Logger log = Logger.getLogger(HBaseWriter.class);
50    boolean reportStats;
51    volatile long dataSize = 0;
52    final Timer statTimer;
53    private OutputCollector output;
54    private Reporter reporter;
55    private ChukwaConfiguration conf = new ChukwaConfiguration();
56    String defaultProcessor = conf.get(
57        "chukwa.demux.mapper.default.processor",
58        "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
59    private HTablePool pool;
60    private Configuration hconf;
61    
62    private class StatReportingTask extends TimerTask {
63      private long lastTs = System.currentTimeMillis();
64      private long lastDataSize = 0;
65  
66      public void run() {
67        long time = System.currentTimeMillis();
68        long interval = time - lastTs;
69        lastTs = time;
70  
71        long ds = dataSize;
72        long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
73        // refers only to data field, not including http or chukwa headers
74        lastDataSize = ds;
75  
76        log.info("stat=HBaseWriter|dataRate="
77            + dataRate);
78      }
79    };
80  
81    public HBaseWriter() {
82      this(true);
83    }
84  
85    public HBaseWriter(boolean reportStats) {
86      this.reportStats = reportStats;
87      statTimer = new Timer();
88      /* HBase Version 0.20.x */
89      //hconf = new HBaseConfiguration();
90      
91      /* HBase Version 0.89.x */
92      hconf = HBaseConfiguration.create();
93    }
94  
95    public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
96      this(true);
97      this.conf = conf;
98      this.hconf = hconf;
99    }
100 
101   public void close() {
102     statTimer.cancel();
103   }
104 
105   public void init(Configuration conf) throws WriterException {
106     statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
107     output = new OutputCollector();
108     reporter = new Reporter();
109     if(conf.getBoolean("hbase.writer.verify.schema", false)) {
110       verifyHbaseSchema();      
111     }
112     pool = new HTablePool(hconf, 60);
113   }
114 
115   private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
116     boolean status = false;
117     try {
118       if(admin.tableExists(table.name())) {
119         HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
120         HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
121         for(HColumnDescriptor cd : columnDescriptors) {
122           if(cd.getNameAsString().equals(table.columnFamily())) {
123             log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
124             status = true;
125           }
126         }
127       } else {
128         throw new Exception("HBase table: "+table.name()+ " does not exist.");
129       }
130     } catch(Exception e) {
131       log.error(ExceptionUtil.getStackTrace(e));
132       status = false;
133     }
134     return status;    
135   }
136   
137   private void verifyHbaseSchema() {
138     log.debug("Verify Demux parser with HBase schema");
139     boolean schemaVerified = true;
140     try {
141       HBaseAdmin admin = new HBaseAdmin(hconf);
142       List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
143       for(Class<?> x : demuxParsers) {
144         if(x.isAnnotationPresent(Tables.class)) {
145           Tables list = x.getAnnotation(Tables.class);
146           for(Table table : list.annotations()) {
147             if(!verifyHbaseTable(admin, table)) {
148               schemaVerified = false;
149               log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");              
150             }
151           }
152         } else if(x.isAnnotationPresent(Table.class)) {
153           Table table = x.getAnnotation(Table.class);
154           if(!verifyHbaseTable(admin, table)) {
155             schemaVerified = false;
156             log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
157           }
158         }
159       }
160     } catch (Exception e) {
161       schemaVerified = false;
162       log.error(ExceptionUtil.getStackTrace(e));
163     }
164     if(!schemaVerified) {
165       log.error("Hbase schema mismatch with demux parser.");
166       if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) {
167         log.error("Exiting...");
168         DaemonWatcher.bailout(-1);
169       }
170     }
171   }
172 
173   @Override
174   public CommitStatus add(List<Chunk> chunks) throws WriterException {
175     CommitStatus rv = ChukwaWriter.COMMIT_OK;
176     try {
177       for(Chunk chunk : chunks) {
178         String processorClass = conf.get(chunk.getDataType(),
179                 defaultProcessor);
180         synchronized (this) {
181           MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
182           try {
183             Table table = null;
184             if(processor.getClass().isAnnotationPresent(Table.class)) {
185               table = processor.getClass().getAnnotation(Table.class);
186             } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
187               Tables tables = processor.getClass().getAnnotation(Tables.class);
188               for(Table t : tables.annotations()) {
189                 table = t;
190               }
191             }
192             if(table!=null) {
193               HTableInterface hbase = pool.getTable(table.name().getBytes());  
194               processor.process(new ChukwaArchiveKey(), chunk, output, reporter);
195               hbase.put(output.getKeyValues());
196               pool.putTable(hbase);
197             }
198           } catch (Exception e) {
199             log.warn(output.getKeyValues());
200             log.warn(ExceptionUtil.getStackTrace(e));
201           }
202           dataSize += chunk.getData().length;
203           output.clear();
204           reporter.clear();
205         }
206       }
207     } catch (Exception e) {
208       log.error(ExceptionUtil.getStackTrace(e));
209       throw new WriterException("Failed to store data to HBase.");
210     }    
211     if (next != null) {
212       rv = next.add(chunks); //pass data through
213     }
214     return rv;
215   }
216 
217 }