1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.dataloader;
19
20 import junit.framework.TestCase;
21 import java.util.Calendar;
22 import org.apache.hadoop.chukwa.database.Macro;
23 import org.apache.hadoop.chukwa.util.DatabaseWriter;
24 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
25 import org.apache.hadoop.chukwa.util.ExceptionUtil;
26 import org.apache.hadoop.chukwa.database.TableCreator;
27 import org.apache.hadoop.chukwa.dataloader.MetricDataLoader;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import java.io.BufferedReader;
32 import java.io.File;
33 import java.io.FileReader;
34 import java.io.IOException;
35 import java.sql.ResultSet;
36 import java.sql.ResultSetMetaData;
37 import java.sql.SQLException;
38 import java.util.ArrayList;
39
40 public class TestDatabaseMetricDataLoader extends TestCase {
41
42 long[] timeWindow = {7, 30, 91, 365, 3650};
43 String[] tables = {"system_metrics","disk","mr_job","mr_task"};
44 String cluster = "demo";
45 long current = Calendar.getInstance().getTimeInMillis();
46
47 public void setUp() {
48 System.setProperty("CLUSTER","demo");
49 DatabaseWriter db = new DatabaseWriter(cluster);
50 String buffer = "";
51 File aFile = new File(System.getenv("CHUKWA_CONF_DIR")
52 + File.separator + "database_create_tables.sql");
53 buffer = readFile(aFile);
54 String tables[] = buffer.split(";");
55 for(String table : tables) {
56 if(table.length()>5) {
57 try {
58 db.execute(table);
59 } catch (Exception e) {
60 fail("Fail to retrieve meta data from database table: "+table);
61 }
62 }
63 }
64 db.close();
65 for(int i=0;i<timeWindow.length;i++) {
66 TableCreator tc = new TableCreator();
67 long start = current;
68 long end = current + (timeWindow[i]*1440*60*1000);
69 try {
70 tc.createTables(start, end);
71 } catch (Exception e) {
72 fail("Fail to create database tables.");
73 }
74 }
75 }
76
77 public void tearDown() {
78 DatabaseWriter db = null;
79 try {
80 db = new DatabaseWriter(cluster);
81 ResultSet rs = db.query("show tables");
82 ArrayList<String> list = new ArrayList<String>();
83 while(rs.next()) {
84 String table = rs.getString(1);
85 list.add(table);
86 }
87 for(String table : list) {
88 db.execute("drop table "+table);
89 }
90 } catch(Throwable ex) {
91 } finally {
92 if(db!=null) {
93 db.close();
94 }
95 }
96 }
97
98 public String readFile(File aFile) {
99 StringBuffer contents = new StringBuffer();
100 try {
101 BufferedReader input = new BufferedReader(new FileReader(aFile));
102 try {
103 String line = null;
104 while ((line = input.readLine()) != null) {
105 contents.append(line);
106 contents.append(System.getProperty("line.separator"));
107 }
108 } finally {
109 input.close();
110 }
111 } catch (IOException ex) {
112 ex.printStackTrace();
113 }
114 return contents.toString();
115 }
116
117 public void testMetricDataLoader() {
118 boolean skip=false;
119 String srcDir = System.getenv("CHUKWA_DATA_DIR") + File.separator + "samples";
120 try {
121 ChukwaConfiguration conf = new ChukwaConfiguration();
122 FileSystem fs = FileSystem.get(conf);
123 FileStatus[] sources = fs.listStatus(new Path(srcDir));
124 for (FileStatus sequenceFile : sources) {
125 MetricDataLoader mdl = new MetricDataLoader(conf, fs, sequenceFile.getPath().toUri().toString());
126 mdl.call();
127 }
128 if(sources.length==0) {
129 skip=true;
130 }
131 } catch (Throwable ex) {
132 fail("SQL Exception: "+ExceptionUtil.getStackTrace(ex));
133 }
134 if(!skip) {
135 DatabaseWriter db = new DatabaseWriter(cluster);
136 for(int i=0;i<tables.length;i++) {
137 String query = "select [avg("+tables[i]+")] from ["+tables[i]+"]";
138 Macro mp = new Macro(current,query);
139 query = mp.toString();
140 try {
141 ResultSet rs = db.query(query);
142 ResultSetMetaData rsmd = rs.getMetaData();
143 int numberOfColumns = rsmd.getColumnCount();
144 while(rs.next()) {
145 for(int j=1;j<=numberOfColumns;j++) {
146 assertTrue("Table: "+tables[i]+", Column: "+rsmd.getColumnName(j)+", contains no data.",rs.getString(j)!=null);
147 }
148 }
149 } catch(Throwable ex) {
150 fail("MetricDataLoader failed: "+ExceptionUtil.getStackTrace(ex));
151 }
152 }
153 db.close();
154 assertTrue("MetricDataLoader executed successfully.",true);
155 }
156 }
157
158 }