View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.analysis.salsa.fsm;
19  
20  import java.io.BufferedReader;
21  import java.io.File;
22  import java.io.FileReader;
23  import java.io.FilenameFilter;
24  import java.io.IOException;
25  import java.text.SimpleDateFormat;
26  import java.util.ArrayList;
27  import java.util.Calendar;
28  import java.util.Date;
29  import java.util.regex.*;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
34  import org.apache.hadoop.chukwa.database.TableCreator;
35  
36  import org.apache.hadoop.chukwa.datacollection.DataFactory;
37  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
38  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
39  import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
40  import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
41  import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
42  import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
43  import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender;
44  import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
45  import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
46  import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
47  import org.apache.hadoop.chukwa.dataloader.MetricDataLoader;
48  import org.apache.hadoop.conf.Configuration;
49  import org.mortbay.jetty.Connector;
50  import org.mortbay.jetty.Server;
51  import org.mortbay.jetty.nio.SelectChannelConnector;
52  import org.mortbay.jetty.servlet.Context;
53  import org.mortbay.jetty.servlet.ServletHolder;
54  import org.apache.hadoop.hdfs.MiniDFSCluster;
55  import org.apache.hadoop.io.SequenceFile;
56  import org.apache.hadoop.mapred.FileInputFormat;
57  import org.apache.hadoop.mapred.FileOutputFormat;
58  import org.apache.hadoop.mapred.JobConf;
59  import org.apache.hadoop.mapred.JobClient;
60  import org.apache.hadoop.mapred.JobPriority;
61  import org.apache.hadoop.mapred.MiniMRCluster;
62  import org.apache.hadoop.mapred.SequenceFileInputFormat;
63  import org.apache.hadoop.util.ToolRunner;
64  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
65  import org.apache.hadoop.chukwa.ChunkImpl;
66  import org.apache.hadoop.conf.Configuration;
67  import org.apache.hadoop.fs.*;
68  import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordOutputFormat;
69  import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordPartitioner;
70  import org.apache.hadoop.chukwa.extraction.demux.Demux;
71  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
72  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
73  import org.apache.hadoop.chukwa.util.DatabaseWriter;
74  import org.apache.hadoop.chukwa.util.ExceptionUtil;
75  import org.apache.hadoop.chukwa.analysis.salsa.fsm.*;
76  
77  import junit.framework.TestCase;
78  
79  public class TestFSMBuilder extends TestCase {
80    private static Log log = LogFactory.getLog(TestFSMBuilder.class);
81  
82    int LINES = 10000;
83    int THREADS = 2;
84    private MiniDFSCluster dfs = null;
85    int NUM_HADOOP_SLAVES = 4;
86    private FileSystem fileSys = null;
87    private MiniMRCluster mr = null;
88    private Server jettyCollector = null;
89    private ChukwaAgent agent = null;
90    private HttpConnector conn = null;
91    private ChukwaHttpSender sender = null;
92    private int agentPort = 9093;
93    private int collectorPort = 9990;
94    private static final String dataSink = "/demux/input";
95    private static final String fsmSink = "/analysis/salsafsm";
96    private static Path DEMUX_INPUT_PATH = null;
97    private static Path DEMUX_OUTPUT_PATH = null;
98    private static Path FSM_OUTPUT_PATH = null;
99    private ChukwaConfiguration conf = new ChukwaConfiguration();
100   private static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
101   private static String cluster = "demo";
102   long[] timeWindow = {7, 30, 91, 365, 3650};
103   long current = 1244617200000L;  // 2009-06-10
104 
105   public void setUp() {
106     // Startup HDFS cluster - stored collector-ed JobHistory chunks
107     // Startup MR cluster - run Demux, FSMBuilder
108     // Startup collector
109     // Startup agent
110     
111     System.out.println("In setUp()");
112     try {
113       System.setProperty("hadoop.log.dir", System.getProperty(
114           "test.build.data", "/tmp"));
115     } catch (Exception e) {
116       e.printStackTrace();
117       fail("Could not set up: " + e.toString());
118     }  
119 
120     // Startup HDFS cluster - stored collector-ed JobHistory chunks
121     try {
122       dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
123       fileSys = dfs.getFileSystem();
124       DEMUX_INPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+dataSink);          
125       DEMUX_OUTPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+"/demux/output");
126     } catch(Exception e) {
127       e.printStackTrace();
128       fail("Fail to startup HDFS cluster.");      
129     }
130     // Startup MR Cluster
131     try {
132       mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
133           .toString(), 1);
134     } catch(Exception e) {
135       fail("Fail to startup Map/reduce cluster.");
136     }
137     // Startup collector
138     try {
139       // Configure Collector
140       conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
141       conf.set("writer.hdfs.filesystem",fileSys.getUri().toString());
142       conf.set("chukwaCollector.outputDir",dataSink);
143       conf.set("chukwaCollector.rotateInterval", "10000");
144       
145       // Set up jetty connector
146       SelectChannelConnector jettyConnector = new SelectChannelConnector();
147       jettyConnector.setLowResourcesConnections(THREADS-1);
148       jettyConnector.setLowResourceMaxIdleTime(1500);
149       jettyConnector.setPort(collectorPort);
150       
151       // Set up jetty server proper, using connector
152       jettyCollector = new Server(collectorPort);
153       Context root = new Context(jettyCollector, "/", Context.SESSIONS);
154       root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
155       jettyCollector.start();
156       jettyCollector.setStopAtShutdown(true);
157       Thread.sleep(10000);
158     } catch(Exception e) {
159       fail("Fail to startup collector.");
160     }
161 
162     // Startup agent
163     try {
164       // Configure Agent
165       conf.set("chukwaAgent.tags", "cluster=\"demo\"");
166       DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown\""));
167       conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
168       conf.set("chukwaAgent.checkpoint.interval", "10000");
169       int portno = conf.getInt("chukwaAgent.control.port", agentPort);
170       agent = new ChukwaAgent(conf);
171       conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
172       conn.start();      
173       sender = new ChukwaHttpSender(conf);
174       ArrayList<String> collectorList = new ArrayList<String>();
175       collectorList.add("http://localhost:"+collectorPort+"/chukwa");
176       sender.setCollectors(new RetryListOfCollectors(collectorList, conf));
177     } catch (AlreadyRunningException e) {
178       fail("Chukwa Agent is already running");
179     }
180     System.out.println("Done setUp().");
181   }
182 
183   public String readFile(File aFile) {
184     StringBuffer contents = new StringBuffer();
185     try {
186       BufferedReader input = new BufferedReader(new FileReader(aFile));
187       try {
188         String line = null; // not declared within while loop
189         while ((line = input.readLine()) != null) {
190           contents.append(line);
191           contents.append(System.getProperty("line.separator"));
192         }
193       } finally {
194         input.close();
195       }
196     } catch (IOException ex) {
197       ex.printStackTrace();
198     }
199     return contents.toString();
200   }
201 
202   public void tearDown() {
203     FileSystem fs;
204     System.out.println("In tearDown()");
205     try {      
206 
207       fs = dfs.getFileSystem();
208       fs.delete(DEMUX_OUTPUT_PATH, true);
209 
210       agent.shutdown();
211       conn.shutdown();
212       jettyCollector.stop();
213       mr.shutdown();
214       dfs.shutdown();
215       Thread.sleep(2000);
216     } catch(Exception e) {
217       e.printStackTrace();
218       fail(e.toString());
219     }
220     System.out.println("Done tearDown()");
221   }
222 
223   /**
224    * Performs tasks common to all tests
225    * Sets up agent to collect samples of the 2 log types in use
226    * (job history logs via JobLog and clienttrace via ClientTrace log types)
227    * Calls Demux to process the logs
228    */
229   protected void initialTasks () {
230     System.out.println("In initialTasks()");
231     try {
232       // Test Chukwa Agent Controller and Agent Communication
233       ChukwaAgentController cli = new ChukwaAgentController("localhost", agentPort);
234       String[] source = new File(System.getenv("CHUKWA_DATA_DIR") + File.separator + "log").list(new FilenameFilter() {
235         public boolean accept(File dir, String name) {
236           return name.endsWith(".log");
237         }
238       });
239       System.out.println(System.getenv("CHUKWA_DATA_DIR") + File.separator + "log");
240       for(String fname : source) {
241         if (!(fname.equals("JobHistory.log") || fname.equals("ClientTrace.log"))) {
242           continue;
243         }
244         StringBuilder fullPath = new StringBuilder();
245         fullPath.append(System.getProperty("CHUKWA_DATA_DIR"));
246         fullPath.append(File.separator);
247         fullPath.append("log");
248         fullPath.append(File.separator);        
249         fullPath.append(fname);
250         String recordType = fname.substring(0,fname.indexOf("."));
251         String adaptorId = cli.add(
252           "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped", 
253           recordType, "0 " + fullPath.toString(), 0);
254         assertNotNull(adaptorId);
255         Thread.sleep(2000);
256       }
257       cli.removeAll();
258       Thread.sleep(30000);
259     } catch (Exception e) {
260       e.printStackTrace();
261       fail(e.toString());
262     }
263     
264     // Test Data Sink files written by Collector    
265     Path demuxDir = new Path(dataSink+"/*");
266     FileSystem fs;
267     try {
268       fs = dfs.getFileSystem();
269       FileStatus[] events = fs.globStatus(demuxDir);
270       log.info("Number of data sink files written:"+events.length);
271       assertTrue(events.length!=0);
272     } catch (IOException e) {
273       e.printStackTrace();
274       fail("File System Error.");
275     }
276     
277     // Test Demux    
278     log.info("Testing demux");
279     try {
280       //ChukwaConfiguration conf = new ChukwaConfiguration();
281       System.setProperty("hadoop.log.dir", System.getProperty(
282           "test.build.data", "/tmp"));
283     
284       String[] sortArgs = { DEMUX_INPUT_PATH.toString(), DEMUX_OUTPUT_PATH.toString() };
285       //      JobConf job = mr.createJobConf();
286       JobConf job = new JobConf(new ChukwaConfiguration(), Demux.class);
287       job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
288       job.setJobName("Chukwa-Demux_" + day.format(new Date()));
289       job.setInputFormat(SequenceFileInputFormat.class);
290       job.setMapperClass(Demux.MapClass.class);
291       job.setPartitionerClass(ChukwaRecordPartitioner.class);
292       job.setReducerClass(Demux.ReduceClass.class);
293 
294       job.setOutputKeyClass(ChukwaRecordKey.class);
295       job.setOutputValueClass(ChukwaRecord.class);
296       job.setOutputFormat(ChukwaRecordOutputFormat.class);
297       job.setJobPriority(JobPriority.VERY_HIGH);
298       job.setNumMapTasks(2);
299       job.setNumReduceTasks(1);
300       Path input = new Path(fileSys.getUri().toString()+File.separator+dataSink+File.separator+"*.done");
301       FileInputFormat.setInputPaths(job, input);
302       FileOutputFormat.setOutputPath(job, DEMUX_OUTPUT_PATH);
303       String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
304         public boolean accept(File dir, String name) {
305           return name.endsWith(".jar");
306         }
307       });
308       job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
309       //assertEquals(ToolRunner.run(job, new Demux(), sortArgs), 0);
310       JobClient.runJob(job);
311     } catch (Exception e) {
312       fail(ExceptionUtil.getStackTrace(e));
313     }
314 
315     System.out.println("Done initialTasks()");
316   }
317 
318   public void testFSMBuilder_JobHistory020 () {
319     initialTasks();
320     // Test FSMBuilder (job history only)
321     log.info("Testing FSMBuilder (Job History only)");
322     System.out.println("In JobHistory020");
323     // Run FSMBuilder on Demux output
324     try {
325       JobConf job = new JobConf(new ChukwaConfiguration(), FSMBuilder.class);
326       job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
327       job.setJobName("Chukwa-FSMBuilder_" + day.format(new Date()));
328       job.setMapperClass(JobHistoryTaskDataMapper.class);
329       job.setPartitionerClass(FSMIntermedEntryPartitioner.class);
330       job.setReducerClass(FSMBuilder.FSMReducer.class);
331       job.setMapOutputValueClass(FSMIntermedEntry.class);
332       job.setMapOutputKeyClass(ChukwaRecordKey.class);
333 
334       job.setInputFormat(SequenceFileInputFormat.class);
335       job.setOutputKeyClass(ChukwaRecordKey.class);
336       job.setOutputValueClass(ChukwaRecord.class);
337       job.setOutputFormat(ChukwaRecordOutputFormat.class);
338       job.setNumReduceTasks(1);
339 
340       Path inputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/TaskData*.evt");
341       this.FSM_OUTPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+fsmSink);
342 
343       FileInputFormat.setInputPaths(job, inputPath);
344       FileOutputFormat.setOutputPath(job, FSM_OUTPUT_PATH);
345 
346       String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
347         public boolean accept(File dir, String name) {
348           return name.endsWith(".jar");
349         }
350       });
351       job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
352       JobClient.runJob(job);
353     } catch (Exception e) {
354       fail("Error running FSMBuilder: "+e.toString());
355     }
356     System.out.println("Done running FSMBuilder; Checking results");
357   
358     // Check FSMBuilder output by reading the sequence file(s) generated
359     // Hard-coded to check the contents of test/samples/JobLog.log
360     try {
361 
362       Pattern task_id_pat = Pattern.compile("attempt_[0-9]*_[0-9]*_[mr]_([0-9]*)_[0-9]*");
363 
364       ChukwaRecordKey key = new ChukwaRecordKey();
365       ChukwaRecord record = new ChukwaRecord();
366 
367       // initialize data structures for checking FSM
368       // should see 10 maps, 8 reduces
369       boolean mapSeen[] = new boolean[10];
370       boolean reduceSeen[] = new boolean[8];
371       boolean reduceShuffleSeen[] = new boolean[8];
372       boolean reduceSortSeen[] = new boolean[8];
373       boolean reduceReducerSeen[] = new boolean[8];
374       for (int i = 0; i < 10; i++) mapSeen[i] = false;
375       for (int i = 0; i < 8; i++) { 
376         reduceSeen[i] = false;
377         reduceShuffleSeen[i] = false;
378         reduceSortSeen[i] = false;
379         reduceReducerSeen[i] = false;
380       }
381 
382       Path fsm_outputs = new Path(FSM_OUTPUT_PATH.toString()+File.separator+
383         "/*/MAPREDUCE_FSM/MAPREDUCE_FSM*.evt");
384       FileStatus [] files;
385       files = fileSys.globStatus(fsm_outputs);
386       int count = 0;
387 
388       for (int i = 0; i < files.length; i++) {
389         SequenceFile.Reader r = new SequenceFile.Reader(fileSys, files[i].getPath(), conf);
390         System.out.println("Processing files " + files[i].getPath().toString());
391         while (r.next(key, record)) {
392           String state_name = record.getValue("STATE_NAME");
393           String task_id = record.getValue("TASK_ID");
394           
395           Matcher m = task_id_pat.matcher(task_id);
396           if (!m.matches()) {
397             continue;
398           }
399           String tasknum_string = m.group(1);
400           if (tasknum_string == null) {
401             continue;
402           }
403           int tasknum = Integer.parseInt(tasknum_string);
404   
405           if (state_name.equals("MAP")) {
406             assertTrue("Map sequence number should be < 10",tasknum < 10);
407             mapSeen[tasknum] = true;
408           } else if (state_name.equals("REDUCE")) {
409             assertTrue("Reduce sequence number should be < 8",tasknum < 8);
410             reduceSeen[tasknum] = true;
411           } else if (state_name.equals("REDUCE_SHUFFLEWAIT")) {
412             assertTrue("Reduce sequence number should be < 8",tasknum < 8);
413             reduceShuffleSeen[tasknum] = true;
414           } else if (state_name.equals("REDUCE_SORT")) {
415             assertTrue("Reduce sequence number should be < 8",tasknum < 8);
416             reduceSortSeen[tasknum] = true;
417           } else if (state_name.equals("REDUCE_REDUCER")) {
418             assertTrue("Reduce sequence number should be < 8",tasknum < 8);
419             reduceReducerSeen[tasknum] = true;
420           }
421           count++;
422         }
423       }
424       System.out.println("Processed " + count + " records.");
425       assertTrue("Total number of states is 42 - 10 maps + (8 reduces * 4)",count == 42);  
426 
427       // We must have seen all 10 maps and all 8 reduces; 
428       // check for that here
429       boolean passed = true;
430       for (int i = 0; i < 10; i++) passed &= mapSeen[i];
431       for (int i = 0; i < 8; i++) {
432         passed &= reduceSeen[i];
433         passed &= reduceShuffleSeen[i];
434         passed &= reduceSortSeen[i];
435         passed &= reduceReducerSeen[i];
436       }
437 
438       assertTrue("Seen all Maps and Reduces in generated states.",passed);
439 
440     } catch (Exception e) {
441       fail("Error checking FSMBuilder output: "+e.toString());
442     } 
443     
444   }
445 
446   public void testFSMBuilder_ClientTrace020 () {
447     initialTasks();
448     // Test FSMBuilder (job history only)
449     log.info("Testing FSMBuilder (ClientTrace only)");
450     System.out.println("In ClientTrace020");
451     // Run FSMBuilder on Demux output
452     try {
453       // Process TaskTracker shuffle clienttrace entries first
454       JobConf job = new JobConf(new ChukwaConfiguration(), FSMBuilder.class);
455       job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
456       job.setJobName("Chukwa-FSMBuilder_" + day.format(new Date()));
457       job.setMapperClass(TaskTrackerClientTraceMapper.class);
458       job.setPartitionerClass(FSMIntermedEntryPartitioner.class);
459       job.setReducerClass(FSMBuilder.FSMReducer.class);
460       job.setMapOutputValueClass(FSMIntermedEntry.class);
461       job.setMapOutputKeyClass(ChukwaRecordKey.class);
462 
463       job.setInputFormat(SequenceFileInputFormat.class);
464       job.setOutputKeyClass(ChukwaRecordKey.class);
465       job.setOutputValueClass(ChukwaRecord.class);
466       job.setOutputFormat(ChukwaRecordOutputFormat.class);
467       job.setNumReduceTasks(1);
468 
469       Path inputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/ClientTraceDetailed*.evt");
470       Path fsmOutputPath1 = new Path(fileSys.getUri().toString()+File.separator+fsmSink+"1");
471 
472       FileInputFormat.setInputPaths(job, inputPath);
473       FileOutputFormat.setOutputPath(job, fsmOutputPath1);
474 
475       String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
476         public boolean accept(File dir, String name) {
477           return name.endsWith(".jar");
478         }
479       });
480       job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
481       JobClient.runJob(job);
482       System.out.println("Processed TaskTracker ClientTrace.");
483 
484       // Process DataNode clienttrace entries
485       job = new JobConf(new ChukwaConfiguration(), FSMBuilder.class);
486       job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
487       job.setJobName("Chukwa-FSMBuilder_" + day.format(new Date()));
488       job.setMapperClass(DataNodeClientTraceMapper.class);
489       job.setPartitionerClass(FSMIntermedEntryPartitioner.class);
490       job.setReducerClass(FSMBuilder.FSMReducer.class);
491       job.setMapOutputValueClass(FSMIntermedEntry.class);
492       job.setMapOutputKeyClass(ChukwaRecordKey.class);
493 
494       job.setInputFormat(SequenceFileInputFormat.class);
495       job.setOutputKeyClass(ChukwaRecordKey.class);
496       job.setOutputValueClass(ChukwaRecord.class);
497       job.setOutputFormat(ChukwaRecordOutputFormat.class);
498       job.setNumReduceTasks(1);
499 
500       inputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/ClientTraceDetailed*.evt");
501       Path fsmOutputPath2 = new Path(fileSys.getUri().toString()+File.separator+fsmSink+"2");
502 
503       FileInputFormat.setInputPaths(job, inputPath);
504       FileOutputFormat.setOutputPath(job, fsmOutputPath2);
505 
506       jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
507         public boolean accept(File dir, String name) {
508           return name.endsWith(".jar");
509         }
510       });
511       job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
512       JobClient.runJob(job);
513       System.out.println("Processed DataNode ClientTrace.");
514 
515     } catch (Exception e) {
516       fail("Error running FSMBuilder: "+e.toString());
517     }
518     System.out.println("Done running FSMBuilder; Checking results");
519 
520     try {
521       Path fsm_outputs = new Path(fileSys.getUri().toString()+File.separator+
522         fsmSink + "*/*/*/*.evt");
523       FileStatus [] files;
524       files = fileSys.globStatus(fsm_outputs);
525       int count = 0;
526       int numHDFSRead = 0, numHDFSWrite = 0, numShuffles = 0;
527       ChukwaRecordKey key = new ChukwaRecordKey();
528       ChukwaRecord record = new ChukwaRecord();
529 
530       for (int i = 0; i < files.length; i++) {
531         SequenceFile.Reader r = new SequenceFile.Reader(fileSys, files[i].getPath(), conf);
532         System.out.println("Processing files " + files[i].getPath().toString());
533         while (r.next(key, record)) {
534           String state_name = record.getValue("STATE_NAME");
535   
536           if (state_name.equals("READ_LOCAL") || state_name.equals("READ_REMOTE")) 
537           {
538             numHDFSRead++;
539           } else if (state_name.equals("WRITE_LOCAL") || state_name.equals("WRITE_REMOTE") 
540               || state_name.equals("WRITE_REPLICATED")) 
541           {
542             numHDFSWrite++;
543           } else if (state_name.equals("SHUFFLE_LOCAL") || state_name.equals("SHUFFLE_REMOTE")) 
544           {
545             numShuffles++;
546           }
547           count++;
548         }
549       }
550       System.out.println("Processed " + count + " records."); 
551       System.out.println("HDFSRD: " + numHDFSRead + " HDFSWR: " + numHDFSWrite + " SHUF: " + numShuffles);
552       assertTrue("Number of HDFS reads", numHDFSRead == 10);
553       assertTrue("Number of HDFS writes", numHDFSWrite == 8);      
554       assertTrue("Number of shuffles", numShuffles == 80);
555 
556     } catch (Exception e) {
557       fail("Error checking FSMBuilder results: " + e.toString());
558     }      
559   }
560 
561 }
562