1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.util;
20
21 import junit.framework.TestCase;
22
23 import java.text.SimpleDateFormat;
24 import java.text.ParseException;
25 import java.net.InetAddress;
26 import java.io.File;
27 import java.io.IOException;
28 import java.io.BufferedReader;
29 import java.io.FileReader;
30 import java.util.Calendar;
31
32 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.TsProcessor;
33 import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
34 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
35 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.io.SequenceFile;
40
41 public class TestCreateRecordFile extends TestCase {
42 private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
43 private Calendar calendar = Calendar.getInstance();
44
45 public void testWriteSequenceFile() throws IOException, ParseException {
46 String outputDir = System.getProperty("test.build.data", "/tmp");
47
48
49 String datadir = System.getenv("CHUKWA_DATA_DIR");
50 if(datadir == null)
51 datadir = "test/samples";
52 else
53 datadir = datadir + File.separator + "log";
54 File inputFile = new File( datadir+ File.separator + "ClientTrace.log");
55 Path outputFile = new Path(outputDir + "/" + this.getClass().getName() + "/ClientTrace.evt");
56 String clusterName = "testClusterName";
57 String dataType = "testDataType";
58 String streamName = "testStreamName";
59 MapProcessor processor = new TsProcessor();
60
61
62 CreateRecordFile.makeTestSequenceFile(inputFile, outputFile, clusterName,
63 dataType, streamName, processor);
64
65 ChukwaRecordKey key = new ChukwaRecordKey();
66 ChukwaRecord record = new ChukwaRecord();
67
68 Configuration conf = new Configuration();
69 FileSystem fs = outputFile.getFileSystem(conf);
70 SequenceFile.Reader sequenceReader = new SequenceFile.Reader(fs, outputFile, conf);
71
72
73 BufferedReader inputReader = new BufferedReader(new FileReader(inputFile));
74
75 String expectedHostname = InetAddress.getLocalHost().getHostName();
76
77
78 int i = 0;
79 while (sequenceReader.next(key, record)) {
80 String line = inputReader.readLine();
81 assertNotNull("Sequence file contains more records than input file", line);
82
83 long expectedTime = sdf.parse(line.substring(0,23)).getTime();
84 calendar.setTimeInMillis(expectedTime);
85 calendar.set(Calendar.MINUTE, 0);
86 calendar.set(Calendar.SECOND, 0);
87 calendar.set(Calendar.MILLISECOND, 0);
88
89 String expectedKey = calendar.getTimeInMillis() + "/" +
90 expectedHostname + "/" + expectedTime;
91 String expectedTags = "cluster=\"" + clusterName + "\"";
92
93
94 assertEquals("Invalid key found for record " + i, expectedKey, key.getKey());
95 assertEquals("Invalid dataType found for record " + i, dataType, key.getReduceType());
96
97
98 assertEquals("Invalid record time for record " + i, expectedTime, record.getTime());
99 assertEquals("Invalid body for record " + i, line, record.getValue("body"));
100 assertEquals("Invalid capp for record " + i, streamName, record.getValue("capp"));
101 assertEquals("Invalid csource for record " + i, expectedHostname, record.getValue("csource"));
102 assertEquals("Invalid ctags for record " + i, expectedTags , record.getValue("ctags").trim());
103
104 i++;
105 }
106
107 sequenceReader.close();
108 inputReader.close();
109 }
110
111 }