1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.dataloader;
20
21 import java.io.IOException;
22 import java.sql.Connection;
23 import java.sql.ResultSet;
24 import java.sql.SQLException;
25 import java.sql.Statement;
26 import java.text.SimpleDateFormat;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.Iterator;
30 import java.util.regex.Matcher;
31 import java.util.regex.Pattern;
32 import java.util.concurrent.Callable;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37 import org.apache.hadoop.chukwa.database.DatabaseConfig;
38 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
41 import org.apache.hadoop.chukwa.util.ClusterConfig;
42 import org.apache.hadoop.chukwa.util.DatabaseWriter;
43 import org.apache.hadoop.chukwa.util.ExceptionUtil;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.io.SequenceFile;
47
48 public class MetricDataLoader implements Callable {
49 private static Log log = LogFactory.getLog(MetricDataLoader.class);
50
51 private Statement stmt = null;
52 private ResultSet rs = null;
53 private DatabaseConfig mdlConfig = null;
54 private HashMap<String, String> normalize = null;
55 private HashMap<String, String> transformer = null;
56 private HashMap<String, Float> conversion = null;
57 private HashMap<String, String> dbTables = null;
58 private HashMap<String, HashMap<String, Integer>> dbSchema = null;
59 private String newSpace = "-";
60 private boolean batchMode = true;
61 private Connection conn = null;
62 private Path source = null;
63
64 private static ChukwaConfiguration conf = null;
65 private static FileSystem fs = null;
66 private String jdbc_url = "";
67
68
69 public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
70 source = new Path(fileName);
71 this.conf = conf;
72 this.fs = fs;
73 }
74
75 private void initEnv(String cluster) throws Exception {
76 mdlConfig = new DatabaseConfig();
77 transformer = mdlConfig.startWith("metric.");
78 conversion = new HashMap<String, Float>();
79 normalize = mdlConfig.startWith("normalize.");
80 dbTables = mdlConfig.startWith("report.db.name.");
81 Iterator<?> entries = mdlConfig.iterator();
82 while (entries.hasNext()) {
83 String entry = entries.next().toString();
84 if (entry.startsWith("conversion.")) {
85 String[] metrics = entry.split("=");
86 try {
87 float convertNumber = Float.parseFloat(metrics[1]);
88 conversion.put(metrics[0], convertNumber);
89 } catch (NumberFormatException ex) {
90 log.error(metrics[0] + " is not a number.");
91 }
92 }
93 }
94 log.debug("cluster name:" + cluster);
95 if (!cluster.equals("")) {
96 ClusterConfig cc = new ClusterConfig();
97 jdbc_url = cc.getURL(cluster);
98 }
99 try {
100 DatabaseWriter dbWriter = new DatabaseWriter(cluster);
101 conn = dbWriter.getConnection();
102 } catch(Exception ex) {
103 throw new Exception("JDBC URL does not exist for:"+jdbc_url);
104 }
105 log.debug("Initialized JDBC URL: " + jdbc_url);
106 HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
107 Iterator<String> ki = dbNames.keySet().iterator();
108 dbSchema = new HashMap<String, HashMap<String, Integer>>();
109 while (ki.hasNext()) {
110 String recordType = ki.next().toString();
111 String table = dbNames.get(recordType);
112 try {
113 ResultSet rs = conn.getMetaData().getColumns(null, null, table+"_template", null);
114 HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
115 while(rs.next()) {
116 String name = rs.getString("COLUMN_NAME");
117 int type = rs.getInt("DATA_TYPE");
118 tableSchema.put(name, type);
119 StringBuilder metricName = new StringBuilder();
120 metricName.append("metric.");
121 metricName.append(recordType.substring(15));
122 metricName.append(".");
123 metricName.append(name);
124 String mdlKey = metricName.toString().toLowerCase();
125 if(!transformer.containsKey(mdlKey)) {
126 transformer.put(mdlKey, name);
127 }
128 }
129 rs.close();
130 dbSchema.put(table, tableSchema);
131 } catch (SQLException ex) {
132 log.debug("table: " + table
133 + " template does not exist, MDL will not load data for this table.");
134 }
135 }
136 stmt = conn.createStatement();
137 conn.setAutoCommit(false);
138 }
139
140 public void interrupt() {
141 }
142
143 private String escape(String s, String c) {
144
145 String ns = s.trim();
146 Pattern pattern = Pattern.compile(" +");
147 Matcher matcher = pattern.matcher(ns);
148 String s2 = matcher.replaceAll(c);
149
150 return s2;
151
152 }
153
154 public static String escapeQuotes( String s ) {
155 StringBuffer sb = new StringBuffer();
156 int index;
157 int length = s.length();
158 char ch;
159 for( index = 0; index < length; ++index ) {
160 if(( ch = s.charAt( index )) == '\"' ) {
161 sb.append( "\\\"" );
162 } else if( ch == '\\' ) {
163 sb.append( "\\\\" );
164 } else if( ch == '\'' ) {
165 sb.append( "\\'" );
166 } else {
167 sb.append( ch );
168 }
169 }
170 return( sb.toString());
171 }
172
173 public boolean run() throws IOException {
174 boolean first=true;
175 log.info("StreamName: " + source.getName());
176 SequenceFile.Reader reader = null;
177
178 try {
179
180
181 reader = new SequenceFile.Reader(fs, source, conf);
182 } catch (Exception ex) {
183
184 log.error(ex, ex);
185 }
186 long currentTimeMillis = System.currentTimeMillis();
187 boolean isSuccessful = true;
188 String recordType = null;
189
190 ChukwaRecordKey key = new ChukwaRecordKey();
191 ChukwaRecord record = new ChukwaRecord();
192 String cluster = null;
193 int numOfRecords = 0;
194 try {
195 Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
196 int batch = 0;
197 while (reader.next(key, record)) {
198 numOfRecords++;
199 if(first) {
200 try {
201 cluster = RecordUtil.getClusterName(record);
202 initEnv(cluster);
203 first=false;
204 } catch(Exception ex) {
205 log.error("Initialization failed for: "+cluster+". Please check jdbc configuration.");
206 return false;
207 }
208 }
209 String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
210 log.debug("Timestamp: " + record.getTime());
211 log.debug("DataType: " + key.getReduceType());
212
213 String[] fields = record.getFields();
214 String table = null;
215 String[] priKeys = null;
216 HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
217 StringBuilder normKey = new StringBuilder();
218 String node = record.getValue("csource");
219 recordType = key.getReduceType().toLowerCase();
220 String dbKey = "report.db.name." + recordType;
221 Matcher m = p.matcher(recordType);
222 if (dbTables.containsKey(dbKey)) {
223 String[] tmp = mdlConfig.findTableName(mdlConfig.get(dbKey), record
224 .getTime(), record.getTime());
225 table = tmp[0];
226 } else if(m.matches()) {
227 String timePartition = "_week";
228 int timeSize = Integer.parseInt(m.group(2));
229 if(timeSize == 5) {
230 timePartition = "_month";
231 } else if(timeSize == 30) {
232 timePartition = "_quarter";
233 } else if(timeSize == 180) {
234 timePartition = "_year";
235 } else if(timeSize == 720) {
236 timePartition = "_decade";
237 }
238 int partition = (int) (record.getTime() / timeSize);
239 StringBuilder tmpDbKey = new StringBuilder();
240 tmpDbKey.append("report.db.name.");
241 tmpDbKey.append(m.group(1));
242 if(dbTables.containsKey(tmpDbKey.toString())) {
243 StringBuilder tmpTable = new StringBuilder();
244 tmpTable.append(dbTables.get(tmpDbKey.toString()));
245 tmpTable.append("_");
246 tmpTable.append(partition);
247 tmpTable.append("_");
248 tmpTable.append(timePartition);
249 table = tmpTable.toString();
250 } else {
251 log.debug(tmpDbKey.toString() + " does not exist.");
252 continue;
253 }
254 } else {
255 log.debug(dbKey + " does not exist.");
256 continue;
257 }
258 log.debug("table name:" + table);
259 try {
260 priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
261 ",");
262 } catch (Exception nullException) {
263 log.debug(ExceptionUtil.getStackTrace(nullException));
264 }
265 for (String field : fields) {
266 String keyName = escape(field.toLowerCase(), newSpace);
267 String keyValue = escape(record.getValue(field).toLowerCase(),
268 newSpace);
269 StringBuilder buildKey = new StringBuilder();
270 buildKey.append("normalize.");
271 buildKey.append(recordType);
272 buildKey.append(".");
273 buildKey.append(keyName);
274 if (normalize.containsKey(buildKey.toString())) {
275 if (normKey.toString().equals("")) {
276 normKey.append(keyName);
277 normKey.append(".");
278 normKey.append(keyValue);
279 } else {
280 normKey.append(".");
281 normKey.append(keyName);
282 normKey.append(".");
283 normKey.append(keyValue);
284 }
285 }
286 StringBuilder normalizedKey = new StringBuilder();
287 normalizedKey.append("metric.");
288 normalizedKey.append(recordType);
289 normalizedKey.append(".");
290 normalizedKey.append(normKey);
291 if (hashReport.containsKey(node)) {
292 HashMap<String, String> tmpHash = hashReport.get(node);
293 tmpHash.put(normalizedKey.toString(), keyValue);
294 hashReport.put(node, tmpHash);
295 } else {
296 HashMap<String, String> tmpHash = new HashMap<String, String>();
297 tmpHash.put(normalizedKey.toString(), keyValue);
298 hashReport.put(node, tmpHash);
299 }
300 }
301 for (String field : fields) {
302 String valueName = escape(field.toLowerCase(), newSpace);
303 String valueValue = escape(record.getValue(field).toLowerCase(),
304 newSpace);
305 StringBuilder buildKey = new StringBuilder();
306 buildKey.append("metric.");
307 buildKey.append(recordType);
308 buildKey.append(".");
309 buildKey.append(valueName);
310 if (!normKey.toString().equals("")) {
311 buildKey = new StringBuilder();
312 buildKey.append("metric.");
313 buildKey.append(recordType);
314 buildKey.append(".");
315 buildKey.append(normKey);
316 buildKey.append(".");
317 buildKey.append(valueName);
318 }
319 String normalizedKey = buildKey.toString();
320 if (hashReport.containsKey(node)) {
321 HashMap<String, String> tmpHash = hashReport.get(node);
322 tmpHash.put(normalizedKey, valueValue);
323 hashReport.put(node, tmpHash);
324 } else {
325 HashMap<String, String> tmpHash = new HashMap<String, String>();
326 tmpHash.put(normalizedKey, valueValue);
327 hashReport.put(node, tmpHash);
328
329 }
330
331 }
332 Iterator<String> i = hashReport.keySet().iterator();
333 while (i.hasNext()) {
334 Object iteratorNode = i.next();
335 HashMap<String, String> recordSet = hashReport.get(iteratorNode);
336 Iterator<String> fi = recordSet.keySet().iterator();
337
338 StringBuilder sqlPriKeys = new StringBuilder();
339 try {
340 for (String priKey : priKeys) {
341 if (priKey.equals("timestamp")) {
342 sqlPriKeys.append(priKey);
343 sqlPriKeys.append(" = \"");
344 sqlPriKeys.append(sqlTime);
345 sqlPriKeys.append("\"");
346 }
347 if (!priKey.equals(priKeys[priKeys.length - 1])) {
348 sqlPriKeys.append(sqlPriKeys);
349 sqlPriKeys.append(", ");
350 }
351 }
352 } catch (Exception nullException) {
353
354 log.debug(ExceptionUtil.getStackTrace(nullException));
355 }
356
357 StringBuilder sqlValues = new StringBuilder();
358 boolean firstValue = true;
359 while (fi.hasNext()) {
360 String fieldKey = fi.next();
361 if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
362 if (!firstValue) {
363 sqlValues.append(", ");
364 }
365 try {
366 if (dbSchema.get(dbTables.get(dbKey)).get(
367 transformer.get(fieldKey)) == java.sql.Types.VARCHAR
368 || dbSchema.get(dbTables.get(dbKey)).get(
369 transformer.get(fieldKey)) == java.sql.Types.BLOB) {
370 String conversionKey = "conversion." + fieldKey;
371 if (conversion.containsKey(conversionKey)) {
372 sqlValues.append(transformer.get(fieldKey));
373 sqlValues.append("=");
374 sqlValues.append(recordSet.get(fieldKey));
375 sqlValues.append(conversion.get(conversionKey).toString());
376 } else {
377 sqlValues.append(transformer.get(fieldKey));
378 sqlValues.append("=\'");
379 sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
380 sqlValues.append("\'");
381 }
382 } else if (dbSchema.get(dbTables.get(dbKey)).get(
383 transformer.get(fieldKey)) == java.sql.Types.TIMESTAMP) {
384 SimpleDateFormat formatter = new SimpleDateFormat(
385 "yyyy-MM-dd HH:mm:ss");
386 Date recordDate = new Date();
387 recordDate.setTime(Long.parseLong(recordSet
388 .get(fieldKey)));
389 sqlValues.append(transformer.get(fieldKey));
390 sqlValues.append("=\"");
391 sqlValues.append(formatter.format(recordDate));
392 sqlValues.append("\"");
393 } else if (dbSchema.get(dbTables.get(dbKey)).get(
394 transformer.get(fieldKey)) == java.sql.Types.BIGINT
395 || dbSchema.get(dbTables.get(dbKey)).get(
396 transformer.get(fieldKey)) == java.sql.Types.TINYINT
397 || dbSchema.get(dbTables.get(dbKey)).get(
398 transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
399 long tmp = 0;
400 try {
401 tmp = Long.parseLong(recordSet.get(fieldKey).toString());
402 String conversionKey = "conversion." + fieldKey;
403 if (conversion.containsKey(conversionKey)) {
404 tmp = tmp
405 * Long.parseLong(conversion.get(conversionKey)
406 .toString());
407 }
408 } catch (Exception e) {
409 tmp = 0;
410 }
411 sqlValues.append(transformer.get(fieldKey));
412 sqlValues.append("=");
413 sqlValues.append(tmp);
414 } else {
415 double tmp = 0;
416 tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
417 String conversionKey = "conversion." + fieldKey;
418 if (conversion.containsKey(conversionKey)) {
419 tmp = tmp
420 * Double.parseDouble(conversion.get(conversionKey)
421 .toString());
422 }
423 if (Double.isNaN(tmp)) {
424 tmp = 0;
425 }
426 sqlValues.append(transformer.get(fieldKey));
427 sqlValues.append("=");
428 sqlValues.append(tmp);
429 }
430 firstValue = false;
431 } catch (NumberFormatException ex) {
432 String conversionKey = "conversion." + fieldKey;
433 if (conversion.containsKey(conversionKey)) {
434 sqlValues.append(transformer.get(fieldKey));
435 sqlValues.append("=");
436 sqlValues.append(recordSet.get(fieldKey));
437 sqlValues.append(conversion.get(conversionKey).toString());
438 } else {
439 sqlValues.append(transformer.get(fieldKey));
440 sqlValues.append("='");
441 sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
442 sqlValues.append("'");
443 }
444 firstValue = false;
445 } catch (NullPointerException ex) {
446 log.error("dbKey:" + dbKey + " fieldKey:" + fieldKey
447 + " does not contain valid MDL structure.");
448 }
449 }
450 }
451
452 StringBuilder sql = new StringBuilder();
453 if (sqlPriKeys.length() > 0) {
454 sql.append("INSERT INTO ");
455 sql.append(table);
456 sql.append(" SET ");
457 sql.append(sqlPriKeys.toString());
458 sql.append(",");
459 sql.append(sqlValues.toString());
460 sql.append(" ON DUPLICATE KEY UPDATE ");
461 sql.append(sqlPriKeys.toString());
462 sql.append(",");
463 sql.append(sqlValues.toString());
464 sql.append(";");
465 } else {
466 if(sqlValues.length() > 0) {
467 sql.append("INSERT INTO ");
468 sql.append(table);
469 sql.append(" SET ");
470 sql.append(sqlValues.toString());
471 sql.append(" ON DUPLICATE KEY UPDATE ");
472 sql.append(sqlValues.toString());
473 sql.append(";");
474 }
475 }
476 if(sql.length() > 0) {
477 log.trace(sql);
478
479 if (batchMode) {
480 stmt.addBatch(sql.toString());
481 batch++;
482 } else {
483 stmt.execute(sql.toString());
484 }
485 if (batchMode && batch > 20000) {
486 int[] updateCounts = stmt.executeBatch();
487 log.info("Batch mode inserted=" + updateCounts.length + "records.");
488 batch = 0;
489 }
490 }
491 }
492
493 }
494
495 if (batchMode) {
496 int[] updateCounts = stmt.executeBatch();
497 log.info("Batch mode inserted=" + updateCounts.length + "records.");
498 }
499 } catch (SQLException ex) {
500
501 isSuccessful = false;
502 log.error(ex, ex);
503 log.error("SQLException: " + ex.getMessage());
504 log.error("SQLState: " + ex.getSQLState());
505 log.error("VendorError: " + ex.getErrorCode());
506
507 throw new IOException (ex);
508 } catch (Exception e) {
509 isSuccessful = false;
510 log.error(ExceptionUtil.getStackTrace(e));
511
512 throw new IOException (e);
513 } finally {
514 if (batchMode && conn!=null) {
515 try {
516 conn.commit();
517 log.info("batchMode commit done");
518 } catch (SQLException ex) {
519 log.error(ex, ex);
520 log.error("SQLException: " + ex.getMessage());
521 log.error("SQLState: " + ex.getSQLState());
522 log.error("VendorError: " + ex.getErrorCode());
523 }
524 }
525 long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
526 int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
527 String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
528 log.info(logMsg + " (" + recordType + ","
529 + cluster + ") " + latencySeconds + " sec. numOfRecords: " + numOfRecords);
530 if (rs != null) {
531 try {
532 rs.close();
533 } catch (SQLException ex) {
534 log.error(ex, ex);
535 log.error("SQLException: " + ex.getMessage());
536 log.error("SQLState: " + ex.getSQLState());
537 log.error("VendorError: " + ex.getErrorCode());
538 }
539 rs = null;
540 }
541 if (stmt != null) {
542 try {
543 stmt.close();
544 } catch (SQLException ex) {
545 log.error(ex, ex);
546 log.error("SQLException: " + ex.getMessage());
547 log.error("SQLState: " + ex.getSQLState());
548 log.error("VendorError: " + ex.getErrorCode());
549 }
550 stmt = null;
551 }
552 if (conn != null) {
553 try {
554 conn.close();
555 } catch (SQLException ex) {
556 log.error(ex, ex);
557 log.error("SQLException: " + ex.getMessage());
558 log.error("SQLState: " + ex.getSQLState());
559 log.error("VendorError: " + ex.getErrorCode());
560 }
561 conn = null;
562 }
563
564 if (reader != null) {
565 try {
566 reader.close();
567 } catch (Exception e) {
568 log.warn("Could not close SequenceFile.Reader:" ,e);
569 }
570 reader = null;
571 }
572 }
573 return true;
574 }
575
576 public Boolean call() throws IOException {
577 return run();
578 }
579
580
581 public static void main(String[] args) {
582 try {
583 conf = new ChukwaConfiguration();
584 fs = FileSystem.get(conf);
585 MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]);
586 mdl.run();
587 } catch (Exception e) {
588 e.printStackTrace();
589 }
590 }
591
592 }