View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.dataloader;
20  
21  import java.io.IOException;
22  import java.sql.Connection;
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.sql.Statement;
26  import java.text.SimpleDateFormat;
27  import java.util.Date;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.regex.Matcher;
31  import java.util.regex.Pattern;
32  import java.util.concurrent.Callable;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
37  import org.apache.hadoop.chukwa.database.DatabaseConfig;
38  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
39  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
40  import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
41  import org.apache.hadoop.chukwa.util.ClusterConfig;
42  import org.apache.hadoop.chukwa.util.DatabaseWriter;
43  import org.apache.hadoop.chukwa.util.ExceptionUtil;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.io.SequenceFile;
47  
48  public class MetricDataLoader implements Callable {
49    private static Log log = LogFactory.getLog(MetricDataLoader.class);
50  
51    private Statement stmt = null;
52    private ResultSet rs = null;
53    private DatabaseConfig mdlConfig = null;
54    private HashMap<String, String> normalize = null;
55    private HashMap<String, String> transformer = null;
56    private HashMap<String, Float> conversion = null;
57    private HashMap<String, String> dbTables = null;
58    private HashMap<String, HashMap<String, Integer>> dbSchema = null;
59    private String newSpace = "-";
60    private boolean batchMode = true;
61    private Connection conn = null;
62    private Path source = null;
63  
64    private static ChukwaConfiguration conf = null;
65    private static FileSystem fs = null;
66    private String jdbc_url = "";
67  
68    /** Creates a new instance of DBWriter */
69    public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) {
70      source = new Path(fileName);
71      this.conf = conf;
72      this.fs = fs;
73    }
74  
75    private void initEnv(String cluster) throws Exception {
76      mdlConfig = new DatabaseConfig();
77      transformer = mdlConfig.startWith("metric.");
78      conversion = new HashMap<String, Float>();
79      normalize = mdlConfig.startWith("normalize.");
80      dbTables = mdlConfig.startWith("report.db.name.");
81      Iterator<?> entries = mdlConfig.iterator();
82      while (entries.hasNext()) {
83        String entry = entries.next().toString();
84        if (entry.startsWith("conversion.")) {
85          String[] metrics = entry.split("=");
86          try {
87            float convertNumber = Float.parseFloat(metrics[1]);
88            conversion.put(metrics[0], convertNumber);
89          } catch (NumberFormatException ex) {
90            log.error(metrics[0] + " is not a number.");
91          }
92        }
93      }
94      log.debug("cluster name:" + cluster);
95      if (!cluster.equals("")) {
96        ClusterConfig cc = new ClusterConfig();
97        jdbc_url = cc.getURL(cluster);
98      }
99      try {
100       DatabaseWriter dbWriter = new DatabaseWriter(cluster);
101       conn = dbWriter.getConnection();
102     } catch(Exception ex) {
103       throw new Exception("JDBC URL does not exist for:"+jdbc_url);
104     }
105     log.debug("Initialized JDBC URL: " + jdbc_url);
106     HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
107     Iterator<String> ki = dbNames.keySet().iterator();
108     dbSchema = new HashMap<String, HashMap<String, Integer>>();
109     while (ki.hasNext()) {
110       String recordType = ki.next().toString();
111       String table = dbNames.get(recordType);
112       try {
113         ResultSet rs = conn.getMetaData().getColumns(null, null, table+"_template", null);
114         HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
115         while(rs.next()) {
116           String name = rs.getString("COLUMN_NAME");
117           int type = rs.getInt("DATA_TYPE");
118           tableSchema.put(name, type);
119           StringBuilder metricName = new StringBuilder();
120           metricName.append("metric.");
121           metricName.append(recordType.substring(15));
122           metricName.append(".");
123           metricName.append(name);
124           String mdlKey = metricName.toString().toLowerCase();
125           if(!transformer.containsKey(mdlKey)) {
126             transformer.put(mdlKey, name);
127           }          
128         }
129         rs.close();
130         dbSchema.put(table, tableSchema);
131       } catch (SQLException ex) {
132         log.debug("table: " + table
133           + " template does not exist, MDL will not load data for this table.");
134       }
135     }
136     stmt = conn.createStatement();
137     conn.setAutoCommit(false);
138   }
139 
140   public void interrupt() {
141   }
142 
143   private String escape(String s, String c) {
144 
145     String ns = s.trim();
146     Pattern pattern = Pattern.compile(" +");
147     Matcher matcher = pattern.matcher(ns);
148     String s2 = matcher.replaceAll(c);
149 
150     return s2;
151 
152   }
153 
154   public static String escapeQuotes( String s ) {
155     StringBuffer sb = new StringBuffer(); 
156     int index; 
157     int length = s.length(); 
158     char ch;
159     for( index = 0; index < length; ++index ) {
160       if(( ch = s.charAt( index )) == '\"' ) {
161         sb.append( "\\\"" ); 
162       } else if( ch == '\\' ) {
163         sb.append( "\\\\" ); 
164       } else if( ch == '\'' ) {
165         sb.append( "\\'" );
166       } else {
167         sb.append( ch );
168       }
169     }
170     return( sb.toString()); 
171   }
172   
173   public boolean run() throws IOException {
174     boolean first=true;
175     log.info("StreamName: " + source.getName());
176     SequenceFile.Reader reader = null;
177 
178     try {
179       // The newInstance() call is a work around for some
180       // broken Java implementations
181       reader = new SequenceFile.Reader(fs, source, conf);
182     } catch (Exception ex) {
183       // handle the error
184       log.error(ex, ex);
185     }
186     long currentTimeMillis = System.currentTimeMillis();
187     boolean isSuccessful = true;
188     String recordType = null;
189 
190     ChukwaRecordKey key = new ChukwaRecordKey();
191     ChukwaRecord record = new ChukwaRecord();
192     String cluster = null;
193     int numOfRecords = 0;
194     try {
195       Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
196       int batch = 0;
197       while (reader.next(key, record)) {
198     	numOfRecords++;
199         if(first) { 
200           try {
201             cluster = RecordUtil.getClusterName(record);
202             initEnv(cluster);
203             first=false;
204           } catch(Exception ex) {
205             log.error("Initialization failed for: "+cluster+".  Please check jdbc configuration.");
206             return false;
207           }
208         }
209         String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
210         log.debug("Timestamp: " + record.getTime());
211         log.debug("DataType: " + key.getReduceType());
212 
213         String[] fields = record.getFields();
214         String table = null;
215         String[] priKeys = null;
216         HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
217         StringBuilder normKey = new StringBuilder();
218         String node = record.getValue("csource");
219         recordType = key.getReduceType().toLowerCase();
220         String dbKey = "report.db.name." + recordType;
221         Matcher m = p.matcher(recordType);
222         if (dbTables.containsKey(dbKey)) {
223           String[] tmp = mdlConfig.findTableName(mdlConfig.get(dbKey), record
224               .getTime(), record.getTime());
225           table = tmp[0];
226         } else if(m.matches()) {
227           String timePartition = "_week";
228           int timeSize = Integer.parseInt(m.group(2));
229           if(timeSize == 5) {
230             timePartition = "_month";
231           } else if(timeSize == 30) {
232             timePartition = "_quarter";
233           } else if(timeSize == 180) {
234             timePartition = "_year";
235           } else if(timeSize == 720) {
236             timePartition = "_decade";
237           }
238           int partition = (int) (record.getTime() / timeSize);
239           StringBuilder tmpDbKey = new StringBuilder();
240           tmpDbKey.append("report.db.name.");
241           tmpDbKey.append(m.group(1));
242           if(dbTables.containsKey(tmpDbKey.toString())) {
243             StringBuilder tmpTable = new StringBuilder();
244             tmpTable.append(dbTables.get(tmpDbKey.toString()));
245             tmpTable.append("_");
246             tmpTable.append(partition);
247             tmpTable.append("_");
248             tmpTable.append(timePartition);
249             table = tmpTable.toString();
250           } else {
251             log.debug(tmpDbKey.toString() + " does not exist.");
252             continue;            
253           }
254         } else {
255           log.debug(dbKey + " does not exist.");
256           continue;
257         }
258         log.debug("table name:" + table);
259         try {
260           priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
261               ",");
262         } catch (Exception nullException) {
263           log.debug(ExceptionUtil.getStackTrace(nullException));
264         }
265         for (String field : fields) {
266           String keyName = escape(field.toLowerCase(), newSpace);
267           String keyValue = escape(record.getValue(field).toLowerCase(),
268               newSpace);
269           StringBuilder buildKey = new StringBuilder();
270           buildKey.append("normalize.");
271           buildKey.append(recordType);
272           buildKey.append(".");
273           buildKey.append(keyName);
274           if (normalize.containsKey(buildKey.toString())) {
275             if (normKey.toString().equals("")) {
276               normKey.append(keyName);
277               normKey.append(".");
278               normKey.append(keyValue);
279             } else {
280               normKey.append(".");
281               normKey.append(keyName);
282               normKey.append(".");
283               normKey.append(keyValue);
284             }
285           }
286           StringBuilder normalizedKey = new StringBuilder();
287           normalizedKey.append("metric.");
288           normalizedKey.append(recordType);
289           normalizedKey.append(".");
290           normalizedKey.append(normKey);
291           if (hashReport.containsKey(node)) {
292             HashMap<String, String> tmpHash = hashReport.get(node);
293             tmpHash.put(normalizedKey.toString(), keyValue);
294             hashReport.put(node, tmpHash);
295           } else {
296             HashMap<String, String> tmpHash = new HashMap<String, String>();
297             tmpHash.put(normalizedKey.toString(), keyValue);
298             hashReport.put(node, tmpHash);
299           }
300         }
301         for (String field : fields) {
302           String valueName = escape(field.toLowerCase(), newSpace);
303           String valueValue = escape(record.getValue(field).toLowerCase(),
304               newSpace);
305           StringBuilder buildKey = new StringBuilder();
306           buildKey.append("metric.");
307           buildKey.append(recordType);
308           buildKey.append(".");
309           buildKey.append(valueName);
310           if (!normKey.toString().equals("")) {
311             buildKey = new StringBuilder();
312             buildKey.append("metric.");
313             buildKey.append(recordType);
314             buildKey.append(".");
315             buildKey.append(normKey);
316             buildKey.append(".");
317             buildKey.append(valueName);
318           }
319           String normalizedKey = buildKey.toString();
320           if (hashReport.containsKey(node)) {
321             HashMap<String, String> tmpHash = hashReport.get(node);
322             tmpHash.put(normalizedKey, valueValue);
323             hashReport.put(node, tmpHash);
324           } else {
325             HashMap<String, String> tmpHash = new HashMap<String, String>();
326             tmpHash.put(normalizedKey, valueValue);
327             hashReport.put(node, tmpHash);
328 
329           }
330 
331         }
332         Iterator<String> i = hashReport.keySet().iterator();
333         while (i.hasNext()) {
334           Object iteratorNode = i.next();
335           HashMap<String, String> recordSet = hashReport.get(iteratorNode);
336           Iterator<String> fi = recordSet.keySet().iterator();
337           // Map any primary key that was not included in the report keyName
338           StringBuilder sqlPriKeys = new StringBuilder();
339           try {
340             for (String priKey : priKeys) {
341               if (priKey.equals("timestamp")) {
342                 sqlPriKeys.append(priKey);
343                 sqlPriKeys.append(" = \"");
344                 sqlPriKeys.append(sqlTime);
345                 sqlPriKeys.append("\"");
346               }
347               if (!priKey.equals(priKeys[priKeys.length - 1])) {
348                 sqlPriKeys.append(sqlPriKeys);
349                 sqlPriKeys.append(", ");
350               }
351             }
352           } catch (Exception nullException) {
353             // ignore if primary key is empty
354             log.debug(ExceptionUtil.getStackTrace(nullException));
355           }
356           // Map the hash objects to database table columns
357           StringBuilder sqlValues = new StringBuilder();
358           boolean firstValue = true;
359           while (fi.hasNext()) {
360             String fieldKey = fi.next();
361             if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) {
362               if (!firstValue) {
363                 sqlValues.append(", ");
364               }
365               try {
366                 if (dbSchema.get(dbTables.get(dbKey)).get(
367                     transformer.get(fieldKey)) == java.sql.Types.VARCHAR
368                     || dbSchema.get(dbTables.get(dbKey)).get(
369                         transformer.get(fieldKey)) == java.sql.Types.BLOB) {
370                   String conversionKey = "conversion." + fieldKey;
371                   if (conversion.containsKey(conversionKey)) {
372                     sqlValues.append(transformer.get(fieldKey));
373                     sqlValues.append("=");
374                     sqlValues.append(recordSet.get(fieldKey));
375                     sqlValues.append(conversion.get(conversionKey).toString());
376                   } else {
377                     sqlValues.append(transformer.get(fieldKey));
378                     sqlValues.append("=\'");
379                     sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
380                     sqlValues.append("\'");
381                   }
382                 } else if (dbSchema.get(dbTables.get(dbKey)).get(
383                     transformer.get(fieldKey)) == java.sql.Types.TIMESTAMP) {
384                   SimpleDateFormat formatter = new SimpleDateFormat(
385                       "yyyy-MM-dd HH:mm:ss");
386                   Date recordDate = new Date();
387                   recordDate.setTime(Long.parseLong(recordSet
388                       .get(fieldKey)));
389                   sqlValues.append(transformer.get(fieldKey));
390                   sqlValues.append("=\"");
391                   sqlValues.append(formatter.format(recordDate));
392                   sqlValues.append("\"");
393                 } else if (dbSchema.get(dbTables.get(dbKey)).get(
394                     transformer.get(fieldKey)) == java.sql.Types.BIGINT
395                     || dbSchema.get(dbTables.get(dbKey)).get(
396                         transformer.get(fieldKey)) == java.sql.Types.TINYINT
397                     || dbSchema.get(dbTables.get(dbKey)).get(
398                         transformer.get(fieldKey)) == java.sql.Types.INTEGER) {
399                   long tmp = 0;
400                   try {
401                     tmp = Long.parseLong(recordSet.get(fieldKey).toString());
402                     String conversionKey = "conversion." + fieldKey;
403                     if (conversion.containsKey(conversionKey)) {
404                       tmp = tmp
405                           * Long.parseLong(conversion.get(conversionKey)
406                               .toString());
407                     }
408                   } catch (Exception e) {
409                     tmp = 0;
410                   }
411                   sqlValues.append(transformer.get(fieldKey));
412                   sqlValues.append("=");
413                   sqlValues.append(tmp);
414                 } else {
415                   double tmp = 0;
416                   tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
417                   String conversionKey = "conversion." + fieldKey;
418                   if (conversion.containsKey(conversionKey)) {
419                     tmp = tmp
420                         * Double.parseDouble(conversion.get(conversionKey)
421                             .toString());
422                   }
423                   if (Double.isNaN(tmp)) {
424                     tmp = 0;
425                   }
426                   sqlValues.append(transformer.get(fieldKey));
427                   sqlValues.append("=");
428                   sqlValues.append(tmp);
429                 }
430                 firstValue = false;
431               } catch (NumberFormatException ex) {
432                 String conversionKey = "conversion." + fieldKey;
433                 if (conversion.containsKey(conversionKey)) {
434                   sqlValues.append(transformer.get(fieldKey));
435                   sqlValues.append("=");
436                   sqlValues.append(recordSet.get(fieldKey));
437                   sqlValues.append(conversion.get(conversionKey).toString());
438                 } else {
439                   sqlValues.append(transformer.get(fieldKey));
440                   sqlValues.append("='");
441                   sqlValues.append(escapeQuotes(recordSet.get(fieldKey)));
442                   sqlValues.append("'");
443                 }
444                 firstValue = false;
445               } catch (NullPointerException ex) {
446                 log.error("dbKey:" + dbKey + " fieldKey:" + fieldKey
447                     + " does not contain valid MDL structure.");
448               }
449             }
450           }
451 
452           StringBuilder sql = new StringBuilder();
453           if (sqlPriKeys.length() > 0) {
454             sql.append("INSERT INTO ");
455             sql.append(table);
456             sql.append(" SET ");
457             sql.append(sqlPriKeys.toString());
458             sql.append(",");
459             sql.append(sqlValues.toString());
460             sql.append(" ON DUPLICATE KEY UPDATE ");
461             sql.append(sqlPriKeys.toString());
462             sql.append(",");
463             sql.append(sqlValues.toString());
464             sql.append(";");
465           } else {
466             if(sqlValues.length() > 0) {
467               sql.append("INSERT INTO ");
468               sql.append(table);
469               sql.append(" SET ");
470               sql.append(sqlValues.toString());
471               sql.append(" ON DUPLICATE KEY UPDATE ");
472               sql.append(sqlValues.toString());
473               sql.append(";");
474             }
475           }
476           if(sql.length() > 0) {
477             log.trace(sql);
478           
479             if (batchMode) {
480               stmt.addBatch(sql.toString());
481               batch++;
482             } else {
483               stmt.execute(sql.toString());
484             }
485             if (batchMode && batch > 20000) {
486               int[] updateCounts = stmt.executeBatch();
487               log.info("Batch mode inserted=" + updateCounts.length + "records.");
488               batch = 0;
489             }
490           }
491         }
492 
493       }
494 
495       if (batchMode) {
496         int[] updateCounts = stmt.executeBatch();
497         log.info("Batch mode inserted=" + updateCounts.length + "records.");
498       }
499     } catch (SQLException ex) {
500       // handle any errors
501       isSuccessful = false;
502       log.error(ex, ex);
503       log.error("SQLException: " + ex.getMessage());
504       log.error("SQLState: " + ex.getSQLState());
505       log.error("VendorError: " + ex.getErrorCode());
506       // throw an exception up the chain to give the PostProcessorManager a chance to retry
507       throw new IOException (ex);
508     } catch (Exception e) {
509       isSuccessful = false;
510       log.error(ExceptionUtil.getStackTrace(e));
511       // throw an exception up the chain to give the PostProcessorManager a chance to retry
512       throw new IOException (e);
513     } finally {
514       if (batchMode && conn!=null) {
515         try {
516           conn.commit();
517           log.info("batchMode commit done");
518         } catch (SQLException ex) {
519           log.error(ex, ex);
520           log.error("SQLException: " + ex.getMessage());
521           log.error("SQLState: " + ex.getSQLState());
522           log.error("VendorError: " + ex.getErrorCode());
523         }
524       }
525       long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
526       int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
527       String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
528       log.info(logMsg + " (" + recordType + ","
529           + cluster + ") " + latencySeconds + " sec. numOfRecords: " + numOfRecords);
530       if (rs != null) {
531         try {
532           rs.close();
533         } catch (SQLException ex) {
534           log.error(ex, ex);
535           log.error("SQLException: " + ex.getMessage());
536           log.error("SQLState: " + ex.getSQLState());
537           log.error("VendorError: " + ex.getErrorCode());
538         }
539         rs = null;
540       }
541       if (stmt != null) {
542         try {
543           stmt.close();
544         } catch (SQLException ex) {
545           log.error(ex, ex);
546           log.error("SQLException: " + ex.getMessage());
547           log.error("SQLState: " + ex.getSQLState());
548           log.error("VendorError: " + ex.getErrorCode());
549         }
550         stmt = null;
551       }
552       if (conn != null) {
553         try {
554           conn.close();
555         } catch (SQLException ex) {
556           log.error(ex, ex);
557           log.error("SQLException: " + ex.getMessage());
558           log.error("SQLState: " + ex.getSQLState());
559           log.error("VendorError: " + ex.getErrorCode());
560         }
561         conn = null;
562       }
563       
564       if (reader != null) {
565         try {
566           reader.close();
567         } catch (Exception e) {
568           log.warn("Could not close SequenceFile.Reader:" ,e);
569         }
570         reader = null;
571       }
572     }
573     return true;
574   }
575 
576   public Boolean call() throws IOException {
577     return run();  
578   }
579   
580 
581   public static void main(String[] args) {
582     try {
583       conf = new ChukwaConfiguration();
584       fs = FileSystem.get(conf);
585       MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]);
586       mdl.run();
587     } catch (Exception e) {
588       e.printStackTrace();
589     }
590   }
591 
592 }