1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datastore;
19
20 import java.util.Calendar;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Set;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.regex.Matcher;
27 import java.util.regex.Pattern;
28
29 import org.apache.hadoop.chukwa.hicc.bean.Series;
30 import org.apache.hadoop.chukwa.util.ExceptionUtil;
31
32 import org.apache.hadoop.hbase.HBaseConfiguration;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.client.HBaseAdmin;
37 import org.apache.hadoop.hbase.client.HTableInterface;
38 import org.apache.hadoop.hbase.client.HTablePool;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.ResultScanner;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.filter.RowFilter;
43 import org.apache.hadoop.hbase.filter.RegexStringComparator;
44 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
45 import org.apache.log4j.Logger;
46
47 public class ChukwaHBaseStore {
48 private static Configuration hconf = HBaseConfiguration.create();
49 private static HTablePool pool = new HTablePool(hconf, 60);
50 static Logger log = Logger.getLogger(ChukwaHBaseStore.class);
51
52 public static Series getSeries(String tableName, String rkey, String family, String column,
53 long startTime, long endTime, boolean filterByRowKey) {
54 StringBuilder seriesName = new StringBuilder();
55 seriesName.append(rkey);
56 seriesName.append(":");
57 seriesName.append(family);
58 seriesName.append(":");
59 seriesName.append(column);
60
61 Series series = new Series(seriesName.toString());
62 try {
63 HTableInterface table = pool.getTable(tableName);
64 Calendar c = Calendar.getInstance();
65 c.setTimeInMillis(startTime);
66 c.set(Calendar.MINUTE, 0);
67 c.set(Calendar.SECOND, 0);
68 c.set(Calendar.MILLISECOND, 0);
69 String startRow = c.getTimeInMillis()+rkey;
70 Scan scan = new Scan();
71 scan.addColumn(family.getBytes(), column.getBytes());
72 scan.setStartRow(startRow.getBytes());
73 scan.setTimeRange(startTime, endTime);
74 scan.setMaxVersions();
75 if(filterByRowKey) {
76 RowFilter rf = new RowFilter(CompareOp.EQUAL, new
77 RegexStringComparator("[0-9]+-"+rkey+"$"));
78 scan.setFilter(rf);
79 }
80 ResultScanner results = table.getScanner(scan);
81 Iterator<Result> it = results.iterator();
82
83
84 while(it.hasNext()) {
85 Result result = it.next();
86 String temp = new String(result.getValue(family.getBytes(), column.getBytes()));
87 double value = Double.parseDouble(temp);
88
89 String buf = new String(result.getRow());
90 Long timestamp = Long.parseLong(buf.split("-")[0]);
91
92
93 series.add(timestamp, value);
94 }
95 results.close();
96 table.close();
97 } catch(Exception e) {
98 log.error(ExceptionUtil.getStackTrace(e));
99 }
100 return series;
101 }
102
103 public static Set<String> getFamilyNames(String tableName) {
104 Set<String> familyNames = new CopyOnWriteArraySet<String>();
105 try {
106 HTableInterface table = pool.getTable(tableName);
107 Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys();
108 for(byte[] name : families) {
109 familyNames.add(new String(name));
110 }
111 table.close();
112 } catch(Exception e) {
113 log.error(ExceptionUtil.getStackTrace(e));
114 }
115 return familyNames;
116
117 }
118
119 public static Set<String> getTableNames() {
120 Set<String> tableNames = new CopyOnWriteArraySet<String>();
121 try {
122 HBaseAdmin admin = new HBaseAdmin(hconf);
123 HTableDescriptor[] td = admin.listTables();
124 for(HTableDescriptor table : td) {
125 tableNames.add(new String(table.getName()));
126 }
127 } catch(Exception e) {
128 log.error(ExceptionUtil.getStackTrace(e));
129 }
130 return tableNames;
131 }
132
133 public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) {
134 Result result = it.next();
135 if(result!=null) {
136 List<KeyValue> kvList = result.list();
137 for(KeyValue kv : kvList) {
138 columnNames.add(new String(kv.getQualifier()));
139 }
140 }
141 }
142
143 public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) {
144 Set<String> columnNames = new CopyOnWriteArraySet<String>();
145 try {
146 HTableInterface table = pool.getTable(tableName);
147 Scan scan = new Scan();
148 if(!fullScan) {
149
150 StringBuilder temp = new StringBuilder();
151 temp.append(endTime-300000L);
152 scan.setStartRow(temp.toString().getBytes());
153 temp.setLength(0);
154 temp.append(endTime);
155 scan.setStopRow(temp.toString().getBytes());
156 } else {
157 StringBuilder temp = new StringBuilder();
158 temp.append(startTime);
159 scan.setStartRow(temp.toString().getBytes());
160 temp.setLength(0);
161 temp.append(endTime);
162 scan.setStopRow(temp.toString().getBytes());
163 }
164 scan.addFamily(family.getBytes());
165 ResultScanner results = table.getScanner(scan);
166 Iterator<Result> it = results.iterator();
167 if(fullScan) {
168 while(it.hasNext()) {
169 getColumnNamesHelper(columnNames, it);
170 }
171 } else {
172 getColumnNamesHelper(columnNames, it);
173 }
174 results.close();
175 table.close();
176 } catch(Exception e) {
177 log.error(ExceptionUtil.getStackTrace(e));
178 }
179 return columnNames;
180 }
181
182 public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) {
183 Set<String> rows = new HashSet<String>();
184 HTableInterface table = pool.getTable(tableName);
185 try {
186 Scan scan = new Scan();
187 scan.addColumn(family.getBytes(), qualifier.getBytes());
188 if(!fullScan) {
189
190 StringBuilder temp = new StringBuilder();
191 temp.append(endTime-300000L);
192 scan.setStartRow(temp.toString().getBytes());
193 temp.setLength(0);
194 temp.append(endTime);
195 scan.setStopRow(temp.toString().getBytes());
196 } else {
197 StringBuilder temp = new StringBuilder();
198 temp.append(startTime);
199 scan.setStartRow(temp.toString().getBytes());
200 temp.setLength(0);
201 temp.append(endTime);
202 scan.setStopRow(temp.toString().getBytes());
203 }
204 ResultScanner results = table.getScanner(scan);
205 Iterator<Result> it = results.iterator();
206 while(it.hasNext()) {
207 Result result = it.next();
208 String buffer = new String(result.getRow());
209 String[] parts = buffer.split("-", 2);
210 if(!rows.contains(parts[1])) {
211 rows.add(parts[1]);
212 }
213 }
214 results.close();
215 table.close();
216 } catch(Exception e) {
217 log.error(ExceptionUtil.getStackTrace(e));
218 }
219 return rows;
220 }
221
222 public static Set<String> getHostnames(String cluster, long startTime, long endTime, boolean fullScan) {
223 return getRowNames("SystemMetrics","system", "csource", startTime, endTime, fullScan);
224 }
225
226 public static Set<String> getClusterNames(long startTime, long endTime) {
227 String tableName = "SystemMetrics";
228 String family = "system";
229 String column = "ctags";
230 Set<String> clusters = new HashSet<String>();
231 HTableInterface table = pool.getTable(tableName);
232 Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\"");
233 try {
234 Scan scan = new Scan();
235 scan.addColumn(family.getBytes(), column.getBytes());
236 scan.setTimeRange(startTime, endTime);
237 ResultScanner results = table.getScanner(scan);
238 Iterator<Result> it = results.iterator();
239 while(it.hasNext()) {
240 Result result = it.next();
241 String buffer = new String(result.getValue(family.getBytes(), column.getBytes()));
242 Matcher m = p.matcher(buffer);
243 if(m.matches()) {
244 clusters.add(m.group(1));
245 }
246 }
247 results.close();
248 table.close();
249 } catch(Exception e) {
250 log.error(ExceptionUtil.getStackTrace(e));
251 }
252 return clusters;
253 }
254 }