View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datastore;
19  
20  import java.util.Calendar;
21  import java.util.HashSet;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.CopyOnWriteArraySet;
26  import java.util.regex.Matcher;
27  import java.util.regex.Pattern;
28  
29  import org.apache.hadoop.chukwa.hicc.bean.Series;
30  import org.apache.hadoop.chukwa.util.ExceptionUtil;
31  
32  import org.apache.hadoop.hbase.HBaseConfiguration;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.client.HBaseAdmin;
37  import org.apache.hadoop.hbase.client.HTableInterface;
38  import org.apache.hadoop.hbase.client.HTablePool;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.client.ResultScanner;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.filter.RowFilter;
43  import org.apache.hadoop.hbase.filter.RegexStringComparator;
44  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
45  import org.apache.log4j.Logger;
46  
47  public class ChukwaHBaseStore {
48    private static Configuration hconf = HBaseConfiguration.create();
49    private static HTablePool pool = new HTablePool(hconf, 60);
50    static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
51    
52    public static Series getSeries(String tableName, String rkey, String family, String column,
53        long startTime, long endTime, boolean filterByRowKey) {
54      StringBuilder seriesName = new StringBuilder();
55      seriesName.append(rkey);
56      seriesName.append(":");
57      seriesName.append(family);
58      seriesName.append(":");
59      seriesName.append(column);
60  
61      Series series = new Series(seriesName.toString());
62      try {
63        HTableInterface table = pool.getTable(tableName);
64        Calendar c = Calendar.getInstance();
65        c.setTimeInMillis(startTime);
66        c.set(Calendar.MINUTE, 0);
67        c.set(Calendar.SECOND, 0);
68        c.set(Calendar.MILLISECOND, 0);
69        String startRow = c.getTimeInMillis()+rkey;
70        Scan scan = new Scan();
71        scan.addColumn(family.getBytes(), column.getBytes());
72        scan.setStartRow(startRow.getBytes());
73        scan.setTimeRange(startTime, endTime);
74        scan.setMaxVersions();
75        if(filterByRowKey) {
76          RowFilter rf = new RowFilter(CompareOp.EQUAL, new 
77              RegexStringComparator("[0-9]+-"+rkey+"$")); 
78          scan.setFilter(rf);
79        }
80        ResultScanner results = table.getScanner(scan);
81        Iterator<Result> it = results.iterator();
82        // TODO: Apply discrete wavelet transformation to limit the output
83        // size to 1000 data points for graphing optimization. (i.e jwave)
84        while(it.hasNext()) {
85          Result result = it.next();
86          String temp = new String(result.getValue(family.getBytes(), column.getBytes()));
87          double value = Double.parseDouble(temp);
88          // TODO: Pig Store function does not honor HBase timestamp, hence need to parse rowKey for timestamp.
89          String buf = new String(result.getRow());
90          Long timestamp = Long.parseLong(buf.split("-")[0]);
91          // If Pig Store function can honor HBase timestamp, use the following line is better.
92          // series.add(result.getCellValue().getTimestamp(), value);
93          series.add(timestamp, value);
94        }
95        results.close();
96        table.close();
97      } catch(Exception e) {
98        log.error(ExceptionUtil.getStackTrace(e));
99      }
100     return series;
101   }
102 
103   public static Set<String> getFamilyNames(String tableName) {
104     Set<String> familyNames = new CopyOnWriteArraySet<String>();
105     try {
106       HTableInterface table = pool.getTable(tableName);
107       Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
108       for(byte[] name : families) {
109         familyNames.add(new String(name));
110       }
111       table.close();
112     } catch(Exception e) {
113       log.error(ExceptionUtil.getStackTrace(e));
114     }
115     return familyNames;
116     
117   }
118   
119   public static Set<String> getTableNames() {
120     Set<String> tableNames = new CopyOnWriteArraySet<String>();
121     try {
122       HBaseAdmin admin = new HBaseAdmin(hconf);
123       HTableDescriptor[] td = admin.listTables();
124       for(HTableDescriptor table : td) {
125         tableNames.add(new String(table.getName()));
126       }
127     } catch(Exception e) {
128       log.error(ExceptionUtil.getStackTrace(e));
129     }
130     return tableNames;
131   }
132 
133   public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
134     Result result = it.next();
135     if(result!=null) {
136       List<KeyValue> kvList = result.list();
137       for(KeyValue kv : kvList) {
138         columnNames.add(new String(kv.getQualifier()));
139       }
140     }
141   }
142   
143   public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
144     Set<String> columnNames = new CopyOnWriteArraySet<String>();
145     try {
146       HTableInterface table = pool.getTable(tableName);
147       Scan scan = new Scan();
148       if(!fullScan) {
149         // Take sample columns of the recent time.
150         StringBuilder temp = new StringBuilder();
151         temp.append(endTime-300000L);
152         scan.setStartRow(temp.toString().getBytes());
153         temp.setLength(0);
154         temp.append(endTime);
155         scan.setStopRow(temp.toString().getBytes());
156       } else {
157         StringBuilder temp = new StringBuilder();
158         temp.append(startTime);
159         scan.setStartRow(temp.toString().getBytes());
160         temp.setLength(0);
161         temp.append(endTime);
162         scan.setStopRow(temp.toString().getBytes());
163       }
164       scan.addFamily(family.getBytes());
165       ResultScanner results = table.getScanner(scan);
166       Iterator<Result> it = results.iterator();
167       if(fullScan) {
168         while(it.hasNext()) {
169           getColumnNamesHelper(columnNames, it);
170         }        
171       } else {
172         getColumnNamesHelper(columnNames, it);        
173       }
174       results.close();
175       table.close();
176     } catch(Exception e) {
177       log.error(ExceptionUtil.getStackTrace(e));
178     }
179     return columnNames;
180   }
181   
182   public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
183     Set<String> rows = new HashSet<String>();
184     HTableInterface table = pool.getTable(tableName);
185     try {
186       Scan scan = new Scan();
187       scan.addColumn(family.getBytes(), qualifier.getBytes());
188       if(!fullScan) {
189         // Take sample columns of the recent time.
190         StringBuilder temp = new StringBuilder();
191         temp.append(endTime-300000L);
192         scan.setStartRow(temp.toString().getBytes());
193         temp.setLength(0);
194         temp.append(endTime);
195         scan.setStopRow(temp.toString().getBytes());
196       } else {
197         StringBuilder temp = new StringBuilder();
198         temp.append(startTime);
199         scan.setStartRow(temp.toString().getBytes());
200         temp.setLength(0);
201         temp.append(endTime);
202         scan.setStopRow(temp.toString().getBytes());
203       }
204       ResultScanner results = table.getScanner(scan);
205       Iterator<Result> it = results.iterator();
206       while(it.hasNext()) {
207         Result result = it.next();
208         String buffer = new String(result.getRow());
209         String[] parts = buffer.split("-", 2);
210         if(!rows.contains(parts[1])) {
211           rows.add(parts[1]);
212         }    
213       }
214       results.close();
215       table.close();
216     } catch(Exception e) {
217       log.error(ExceptionUtil.getStackTrace(e));
218     }
219     return rows;    
220   }
221   
222   public static Set<String> getHostnames(String cluster, long startTime, long endTime, boolean fullScan) {
223     return getRowNames("SystemMetrics","system", "csource", startTime, endTime, fullScan);
224   }
225   
226   public static Set<String> getClusterNames(long startTime, long endTime) {
227     String tableName = "SystemMetrics";
228     String family = "system";
229     String column = "ctags";
230     Set<String> clusters = new HashSet<String>();
231     HTableInterface table = pool.getTable(tableName);
232     Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
233     try {
234       Scan scan = new Scan();
235       scan.addColumn(family.getBytes(), column.getBytes());
236       scan.setTimeRange(startTime, endTime);
237       ResultScanner results = table.getScanner(scan);
238       Iterator<Result> it = results.iterator();
239       while(it.hasNext()) {
240         Result result = it.next();
241         String buffer = new String(result.getValue(family.getBytes(), column.getBytes()));
242         Matcher m = p.matcher(buffer);
243         if(m.matches()) {
244           clusters.add(m.group(1));
245         }
246       }
247       results.close();
248       table.close();
249     } catch(Exception e) {
250       log.error(ExceptionUtil.getStackTrace(e));
251     }
252     return clusters;
253   }
254 }