1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.analysis.salsa.fsm;
19
20 import java.io.BufferedReader;
21 import java.io.File;
22 import java.io.FileReader;
23 import java.io.FilenameFilter;
24 import java.io.IOException;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Calendar;
28 import java.util.Date;
29 import java.util.regex.*;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
34 import org.apache.hadoop.chukwa.database.TableCreator;
35
36 import org.apache.hadoop.chukwa.datacollection.DataFactory;
37 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
38 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
39 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
40 import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
41 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
42 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
43 import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender;
44 import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
45 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
46 import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
47 import org.apache.hadoop.chukwa.dataloader.MetricDataLoader;
48 import org.apache.hadoop.conf.Configuration;
49 import org.mortbay.jetty.Connector;
50 import org.mortbay.jetty.Server;
51 import org.mortbay.jetty.nio.SelectChannelConnector;
52 import org.mortbay.jetty.servlet.Context;
53 import org.mortbay.jetty.servlet.ServletHolder;
54 import org.apache.hadoop.hdfs.MiniDFSCluster;
55 import org.apache.hadoop.io.SequenceFile;
56 import org.apache.hadoop.mapred.FileInputFormat;
57 import org.apache.hadoop.mapred.FileOutputFormat;
58 import org.apache.hadoop.mapred.JobConf;
59 import org.apache.hadoop.mapred.JobClient;
60 import org.apache.hadoop.mapred.JobPriority;
61 import org.apache.hadoop.mapred.MiniMRCluster;
62 import org.apache.hadoop.mapred.SequenceFileInputFormat;
63 import org.apache.hadoop.util.ToolRunner;
64 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
65 import org.apache.hadoop.chukwa.ChunkImpl;
66 import org.apache.hadoop.conf.Configuration;
67 import org.apache.hadoop.fs.*;
68 import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordOutputFormat;
69 import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordPartitioner;
70 import org.apache.hadoop.chukwa.extraction.demux.Demux;
71 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
72 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
73 import org.apache.hadoop.chukwa.util.DatabaseWriter;
74 import org.apache.hadoop.chukwa.util.ExceptionUtil;
75 import org.apache.hadoop.chukwa.analysis.salsa.fsm.*;
76
77 import junit.framework.TestCase;
78
79 public class TestFSMBuilder extends TestCase {
80 private static Log log = LogFactory.getLog(TestFSMBuilder.class);
81
82 int LINES = 10000;
83 int THREADS = 2;
84 private MiniDFSCluster dfs = null;
85 int NUM_HADOOP_SLAVES = 4;
86 private FileSystem fileSys = null;
87 private MiniMRCluster mr = null;
88 private Server jettyCollector = null;
89 private ChukwaAgent agent = null;
90 private HttpConnector conn = null;
91 private ChukwaHttpSender sender = null;
92 private int agentPort = 9093;
93 private int collectorPort = 9990;
94 private static final String dataSink = "/demux/input";
95 private static final String fsmSink = "/analysis/salsafsm";
96 private static Path DEMUX_INPUT_PATH = null;
97 private static Path DEMUX_OUTPUT_PATH = null;
98 private static Path FSM_OUTPUT_PATH = null;
99 private ChukwaConfiguration conf = new ChukwaConfiguration();
100 private static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
101 private static String cluster = "demo";
102 long[] timeWindow = {7, 30, 91, 365, 3650};
103 long current = 1244617200000L;
104
105 public void setUp() {
106
107
108
109
110
111 System.out.println("In setUp()");
112 try {
113 System.setProperty("hadoop.log.dir", System.getProperty(
114 "test.build.data", "/tmp"));
115 } catch (Exception e) {
116 e.printStackTrace();
117 fail("Could not set up: " + e.toString());
118 }
119
120
121 try {
122 dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
123 fileSys = dfs.getFileSystem();
124 DEMUX_INPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+dataSink);
125 DEMUX_OUTPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+"/demux/output");
126 } catch(Exception e) {
127 e.printStackTrace();
128 fail("Fail to startup HDFS cluster.");
129 }
130
131 try {
132 mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
133 .toString(), 1);
134 } catch(Exception e) {
135 fail("Fail to startup Map/reduce cluster.");
136 }
137
138 try {
139
140 conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
141 conf.set("writer.hdfs.filesystem",fileSys.getUri().toString());
142 conf.set("chukwaCollector.outputDir",dataSink);
143 conf.set("chukwaCollector.rotateInterval", "10000");
144
145
146 SelectChannelConnector jettyConnector = new SelectChannelConnector();
147 jettyConnector.setLowResourcesConnections(THREADS-1);
148 jettyConnector.setLowResourceMaxIdleTime(1500);
149 jettyConnector.setPort(collectorPort);
150
151
152 jettyCollector = new Server(collectorPort);
153 Context root = new Context(jettyCollector, "/", Context.SESSIONS);
154 root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
155 jettyCollector.start();
156 jettyCollector.setStopAtShutdown(true);
157 Thread.sleep(10000);
158 } catch(Exception e) {
159 fail("Fail to startup collector.");
160 }
161
162
163 try {
164
165 conf.set("chukwaAgent.tags", "cluster=\"demo\"");
166 DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown\""));
167 conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
168 conf.set("chukwaAgent.checkpoint.interval", "10000");
169 int portno = conf.getInt("chukwaAgent.control.port", agentPort);
170 agent = new ChukwaAgent(conf);
171 conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
172 conn.start();
173 sender = new ChukwaHttpSender(conf);
174 ArrayList<String> collectorList = new ArrayList<String>();
175 collectorList.add("http://localhost:"+collectorPort+"/chukwa");
176 sender.setCollectors(new RetryListOfCollectors(collectorList, conf));
177 } catch (AlreadyRunningException e) {
178 fail("Chukwa Agent is already running");
179 }
180 System.out.println("Done setUp().");
181 }
182
183 public String readFile(File aFile) {
184 StringBuffer contents = new StringBuffer();
185 try {
186 BufferedReader input = new BufferedReader(new FileReader(aFile));
187 try {
188 String line = null;
189 while ((line = input.readLine()) != null) {
190 contents.append(line);
191 contents.append(System.getProperty("line.separator"));
192 }
193 } finally {
194 input.close();
195 }
196 } catch (IOException ex) {
197 ex.printStackTrace();
198 }
199 return contents.toString();
200 }
201
202 public void tearDown() {
203 FileSystem fs;
204 System.out.println("In tearDown()");
205 try {
206
207 fs = dfs.getFileSystem();
208 fs.delete(DEMUX_OUTPUT_PATH, true);
209
210 agent.shutdown();
211 conn.shutdown();
212 jettyCollector.stop();
213 mr.shutdown();
214 dfs.shutdown();
215 Thread.sleep(2000);
216 } catch(Exception e) {
217 e.printStackTrace();
218 fail(e.toString());
219 }
220 System.out.println("Done tearDown()");
221 }
222
223
224
225
226
227
228
229 protected void initialTasks () {
230 System.out.println("In initialTasks()");
231 try {
232
233 ChukwaAgentController cli = new ChukwaAgentController("localhost", agentPort);
234 String[] source = new File(System.getenv("CHUKWA_DATA_DIR") + File.separator + "log").list(new FilenameFilter() {
235 public boolean accept(File dir, String name) {
236 return name.endsWith(".log");
237 }
238 });
239 System.out.println(System.getenv("CHUKWA_DATA_DIR") + File.separator + "log");
240 for(String fname : source) {
241 if (!(fname.equals("JobHistory.log") || fname.equals("ClientTrace.log"))) {
242 continue;
243 }
244 StringBuilder fullPath = new StringBuilder();
245 fullPath.append(System.getProperty("CHUKWA_DATA_DIR"));
246 fullPath.append(File.separator);
247 fullPath.append("log");
248 fullPath.append(File.separator);
249 fullPath.append(fname);
250 String recordType = fname.substring(0,fname.indexOf("."));
251 String adaptorId = cli.add(
252 "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
253 recordType, "0 " + fullPath.toString(), 0);
254 assertNotNull(adaptorId);
255 Thread.sleep(2000);
256 }
257 cli.removeAll();
258 Thread.sleep(30000);
259 } catch (Exception e) {
260 e.printStackTrace();
261 fail(e.toString());
262 }
263
264
265 Path demuxDir = new Path(dataSink+"/*");
266 FileSystem fs;
267 try {
268 fs = dfs.getFileSystem();
269 FileStatus[] events = fs.globStatus(demuxDir);
270 log.info("Number of data sink files written:"+events.length);
271 assertTrue(events.length!=0);
272 } catch (IOException e) {
273 e.printStackTrace();
274 fail("File System Error.");
275 }
276
277
278 log.info("Testing demux");
279 try {
280
281 System.setProperty("hadoop.log.dir", System.getProperty(
282 "test.build.data", "/tmp"));
283
284 String[] sortArgs = { DEMUX_INPUT_PATH.toString(), DEMUX_OUTPUT_PATH.toString() };
285
286 JobConf job = new JobConf(new ChukwaConfiguration(), Demux.class);
287 job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
288 job.setJobName("Chukwa-Demux_" + day.format(new Date()));
289 job.setInputFormat(SequenceFileInputFormat.class);
290 job.setMapperClass(Demux.MapClass.class);
291 job.setPartitionerClass(ChukwaRecordPartitioner.class);
292 job.setReducerClass(Demux.ReduceClass.class);
293
294 job.setOutputKeyClass(ChukwaRecordKey.class);
295 job.setOutputValueClass(ChukwaRecord.class);
296 job.setOutputFormat(ChukwaRecordOutputFormat.class);
297 job.setJobPriority(JobPriority.VERY_HIGH);
298 job.setNumMapTasks(2);
299 job.setNumReduceTasks(1);
300 Path input = new Path(fileSys.getUri().toString()+File.separator+dataSink+File.separator+"*.done");
301 FileInputFormat.setInputPaths(job, input);
302 FileOutputFormat.setOutputPath(job, DEMUX_OUTPUT_PATH);
303 String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
304 public boolean accept(File dir, String name) {
305 return name.endsWith(".jar");
306 }
307 });
308 job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
309
310 JobClient.runJob(job);
311 } catch (Exception e) {
312 fail(ExceptionUtil.getStackTrace(e));
313 }
314
315 System.out.println("Done initialTasks()");
316 }
317
318 public void testFSMBuilder_JobHistory020 () {
319 initialTasks();
320
321 log.info("Testing FSMBuilder (Job History only)");
322 System.out.println("In JobHistory020");
323
324 try {
325 JobConf job = new JobConf(new ChukwaConfiguration(), FSMBuilder.class);
326 job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
327 job.setJobName("Chukwa-FSMBuilder_" + day.format(new Date()));
328 job.setMapperClass(JobHistoryTaskDataMapper.class);
329 job.setPartitionerClass(FSMIntermedEntryPartitioner.class);
330 job.setReducerClass(FSMBuilder.FSMReducer.class);
331 job.setMapOutputValueClass(FSMIntermedEntry.class);
332 job.setMapOutputKeyClass(ChukwaRecordKey.class);
333
334 job.setInputFormat(SequenceFileInputFormat.class);
335 job.setOutputKeyClass(ChukwaRecordKey.class);
336 job.setOutputValueClass(ChukwaRecord.class);
337 job.setOutputFormat(ChukwaRecordOutputFormat.class);
338 job.setNumReduceTasks(1);
339
340 Path inputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/TaskData*.evt");
341 this.FSM_OUTPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+fsmSink);
342
343 FileInputFormat.setInputPaths(job, inputPath);
344 FileOutputFormat.setOutputPath(job, FSM_OUTPUT_PATH);
345
346 String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
347 public boolean accept(File dir, String name) {
348 return name.endsWith(".jar");
349 }
350 });
351 job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
352 JobClient.runJob(job);
353 } catch (Exception e) {
354 fail("Error running FSMBuilder: "+e.toString());
355 }
356 System.out.println("Done running FSMBuilder; Checking results");
357
358
359
360 try {
361
362 Pattern task_id_pat = Pattern.compile("attempt_[0-9]*_[0-9]*_[mr]_([0-9]*)_[0-9]*");
363
364 ChukwaRecordKey key = new ChukwaRecordKey();
365 ChukwaRecord record = new ChukwaRecord();
366
367
368
369 boolean mapSeen[] = new boolean[10];
370 boolean reduceSeen[] = new boolean[8];
371 boolean reduceShuffleSeen[] = new boolean[8];
372 boolean reduceSortSeen[] = new boolean[8];
373 boolean reduceReducerSeen[] = new boolean[8];
374 for (int i = 0; i < 10; i++) mapSeen[i] = false;
375 for (int i = 0; i < 8; i++) {
376 reduceSeen[i] = false;
377 reduceShuffleSeen[i] = false;
378 reduceSortSeen[i] = false;
379 reduceReducerSeen[i] = false;
380 }
381
382 Path fsm_outputs = new Path(FSM_OUTPUT_PATH.toString()+File.separator+
383 "/*/MAPREDUCE_FSM/MAPREDUCE_FSM*.evt");
384 FileStatus [] files;
385 files = fileSys.globStatus(fsm_outputs);
386 int count = 0;
387
388 for (int i = 0; i < files.length; i++) {
389 SequenceFile.Reader r = new SequenceFile.Reader(fileSys, files[i].getPath(), conf);
390 System.out.println("Processing files " + files[i].getPath().toString());
391 while (r.next(key, record)) {
392 String state_name = record.getValue("STATE_NAME");
393 String task_id = record.getValue("TASK_ID");
394
395 Matcher m = task_id_pat.matcher(task_id);
396 if (!m.matches()) {
397 continue;
398 }
399 String tasknum_string = m.group(1);
400 if (tasknum_string == null) {
401 continue;
402 }
403 int tasknum = Integer.parseInt(tasknum_string);
404
405 if (state_name.equals("MAP")) {
406 assertTrue("Map sequence number should be < 10",tasknum < 10);
407 mapSeen[tasknum] = true;
408 } else if (state_name.equals("REDUCE")) {
409 assertTrue("Reduce sequence number should be < 8",tasknum < 8);
410 reduceSeen[tasknum] = true;
411 } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) {
412 assertTrue("Reduce sequence number should be < 8",tasknum < 8);
413 reduceShuffleSeen[tasknum] = true;
414 } else if (state_name.equals("REDUCE_SORT")) {
415 assertTrue("Reduce sequence number should be < 8",tasknum < 8);
416 reduceSortSeen[tasknum] = true;
417 } else if (state_name.equals("REDUCE_REDUCER")) {
418 assertTrue("Reduce sequence number should be < 8",tasknum < 8);
419 reduceReducerSeen[tasknum] = true;
420 }
421 count++;
422 }
423 }
424 System.out.println("Processed " + count + " records.");
425 assertTrue("Total number of states is 42 - 10 maps + (8 reduces * 4)",count == 42);
426
427
428
429 boolean passed = true;
430 for (int i = 0; i < 10; i++) passed &= mapSeen[i];
431 for (int i = 0; i < 8; i++) {
432 passed &= reduceSeen[i];
433 passed &= reduceShuffleSeen[i];
434 passed &= reduceSortSeen[i];
435 passed &= reduceReducerSeen[i];
436 }
437
438 assertTrue("Seen all Maps and Reduces in generated states.",passed);
439
440 } catch (Exception e) {
441 fail("Error checking FSMBuilder output: "+e.toString());
442 }
443
444 }
445
446 public void testFSMBuilder_ClientTrace020 () {
447 initialTasks();
448
449 log.info("Testing FSMBuilder (ClientTrace only)");
450 System.out.println("In ClientTrace020");
451
452 try {
453
454 JobConf job = new JobConf(new ChukwaConfiguration(), FSMBuilder.class);
455 job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
456 job.setJobName("Chukwa-FSMBuilder_" + day.format(new Date()));
457 job.setMapperClass(TaskTrackerClientTraceMapper.class);
458 job.setPartitionerClass(FSMIntermedEntryPartitioner.class);
459 job.setReducerClass(FSMBuilder.FSMReducer.class);
460 job.setMapOutputValueClass(FSMIntermedEntry.class);
461 job.setMapOutputKeyClass(ChukwaRecordKey.class);
462
463 job.setInputFormat(SequenceFileInputFormat.class);
464 job.setOutputKeyClass(ChukwaRecordKey.class);
465 job.setOutputValueClass(ChukwaRecord.class);
466 job.setOutputFormat(ChukwaRecordOutputFormat.class);
467 job.setNumReduceTasks(1);
468
469 Path inputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/ClientTraceDetailed*.evt");
470 Path fsmOutputPath1 = new Path(fileSys.getUri().toString()+File.separator+fsmSink+"1");
471
472 FileInputFormat.setInputPaths(job, inputPath);
473 FileOutputFormat.setOutputPath(job, fsmOutputPath1);
474
475 String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
476 public boolean accept(File dir, String name) {
477 return name.endsWith(".jar");
478 }
479 });
480 job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
481 JobClient.runJob(job);
482 System.out.println("Processed TaskTracker ClientTrace.");
483
484
485 job = new JobConf(new ChukwaConfiguration(), FSMBuilder.class);
486 job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
487 job.setJobName("Chukwa-FSMBuilder_" + day.format(new Date()));
488 job.setMapperClass(DataNodeClientTraceMapper.class);
489 job.setPartitionerClass(FSMIntermedEntryPartitioner.class);
490 job.setReducerClass(FSMBuilder.FSMReducer.class);
491 job.setMapOutputValueClass(FSMIntermedEntry.class);
492 job.setMapOutputKeyClass(ChukwaRecordKey.class);
493
494 job.setInputFormat(SequenceFileInputFormat.class);
495 job.setOutputKeyClass(ChukwaRecordKey.class);
496 job.setOutputValueClass(ChukwaRecord.class);
497 job.setOutputFormat(ChukwaRecordOutputFormat.class);
498 job.setNumReduceTasks(1);
499
500 inputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/ClientTraceDetailed*.evt");
501 Path fsmOutputPath2 = new Path(fileSys.getUri().toString()+File.separator+fsmSink+"2");
502
503 FileInputFormat.setInputPaths(job, inputPath);
504 FileOutputFormat.setOutputPath(job, fsmOutputPath2);
505
506 jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
507 public boolean accept(File dir, String name) {
508 return name.endsWith(".jar");
509 }
510 });
511 job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
512 JobClient.runJob(job);
513 System.out.println("Processed DataNode ClientTrace.");
514
515 } catch (Exception e) {
516 fail("Error running FSMBuilder: "+e.toString());
517 }
518 System.out.println("Done running FSMBuilder; Checking results");
519
520 try {
521 Path fsm_outputs = new Path(fileSys.getUri().toString()+File.separator+
522 fsmSink + "*/*/*/*.evt");
523 FileStatus [] files;
524 files = fileSys.globStatus(fsm_outputs);
525 int count = 0;
526 int numHDFSRead = 0, numHDFSWrite = 0, numShuffles = 0;
527 ChukwaRecordKey key = new ChukwaRecordKey();
528 ChukwaRecord record = new ChukwaRecord();
529
530 for (int i = 0; i < files.length; i++) {
531 SequenceFile.Reader r = new SequenceFile.Reader(fileSys, files[i].getPath(), conf);
532 System.out.println("Processing files " + files[i].getPath().toString());
533 while (r.next(key, record)) {
534 String state_name = record.getValue("STATE_NAME");
535
536 if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE"))
537 {
538 numHDFSRead++;
539 } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE")
540 || state_name.equals("WRITE_REPLICATED"))
541 {
542 numHDFSWrite++;
543 } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE"))
544 {
545 numShuffles++;
546 }
547 count++;
548 }
549 }
550 System.out.println("Processed " + count + " records.");
551 System.out.println("HDFSRD: " + numHDFSRead + " HDFSWR: " + numHDFSWrite + " SHUF: " + numShuffles);
552 assertTrue("Number of HDFS reads", numHDFSRead == 10);
553 assertTrue("Number of HDFS writes", numHDFSWrite == 8);
554 assertTrue("Number of shuffles", numShuffles == 80);
555
556 } catch (Exception e) {
557 fail("Error checking FSMBuilder results: " + e.toString());
558 }
559 }
560
561 }
562