1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.extraction.demux;
19
20
21 import java.io.IOException;
22 import java.util.Calendar;
23 import org.apache.hadoop.hdfs.MiniDFSCluster;
24 import org.apache.hadoop.io.SequenceFile;
25 import org.apache.hadoop.mapred.JobConf;
26 import org.apache.hadoop.mapred.MiniMRCluster;
27 import org.apache.hadoop.util.ToolRunner;
28 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
29 import org.apache.hadoop.chukwa.ChunkImpl;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.*;
32 import junit.framework.TestCase;
33
34
35
36
37
38
39
40
41
42
43 public class TestDemux extends TestCase {
44
45 java.util.Random r = new java.util.Random();
46
47 public ChunkImpl getARandomChunk() {
48 int ms = r.nextInt(1000);
49 String line = "2008-05-29 10:42:22," + ms
50 + " INFO org.apache.hadoop.dfs.DataNode: Some text goes here"
51 + r.nextInt() + "\n";
52
53 ChunkImpl c = new ChunkImpl("HadoopLogProcessor", "test",
54 line.length() , line.getBytes(), null);
55 return c;
56 }
57
58 public void writeASinkFile(Configuration conf, FileSystem fileSys, Path dest,
59 int chunks) throws IOException {
60 FSDataOutputStream out = fileSys.create(dest);
61
62 Calendar calendar = Calendar.getInstance();
63 SequenceFile.Writer seqFileWriter = SequenceFile.createWriter(conf, out,
64 ChukwaArchiveKey.class, ChunkImpl.class,
65 SequenceFile.CompressionType.NONE, null);
66 for (int i = 0; i < chunks; ++i) {
67 ChunkImpl chunk = getARandomChunk();
68 ChukwaArchiveKey archiveKey = new ChukwaArchiveKey();
69
70 calendar.setTimeInMillis(System.currentTimeMillis());
71 calendar.set(Calendar.MINUTE, 0);
72 calendar.set(Calendar.SECOND, 0);
73 calendar.set(Calendar.MILLISECOND, 0);
74 archiveKey.setTimePartition(calendar.getTimeInMillis());
75 archiveKey.setDataType(chunk.getDataType());
76 archiveKey.setStreamName(chunk.getStreamName());
77 archiveKey.setSeqId(chunk.getSeqID());
78 seqFileWriter.append(archiveKey, chunk);
79 }
80 seqFileWriter.close();
81 out.close();
82 }
83
84 private void runDemux(JobConf job, Path sortInput, Path sortOutput)
85 throws Exception {
86
87 String[] sortArgs = { sortInput.toString(), sortOutput.toString() };
88
89
90 assertEquals(ToolRunner.run(job, new Demux(), sortArgs), 0);
91 }
92
93 int NUM_HADOOP_SLAVES = 1;
94 int LINES = 10000;
95 private static final Path DEMUX_INPUT_PATH = new Path("/demux/input");
96 private static final Path DEMUX_OUTPUT_PATH = new Path("/demux/output");
97
98 public void testDemux() {
99 try {
100 System.out.println("testing demux");
101 Configuration conf = new Configuration();
102 System.setProperty("hadoop.log.dir", System.getProperty(
103 "test.build.data", "/tmp"));
104 MiniDFSCluster dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
105 null);
106 FileSystem fileSys = dfs.getFileSystem();
107 MiniMRCluster mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
108 .toString(), 1);
109 writeASinkFile(conf, fileSys, DEMUX_INPUT_PATH, LINES);
110
111 System.out.println("wrote "
112 + fileSys.getFileStatus(DEMUX_INPUT_PATH).getLen()
113 + " bytes of temp test data");
114 long ts_start = System.currentTimeMillis();
115 runDemux(mr.createJobConf(), DEMUX_INPUT_PATH, DEMUX_OUTPUT_PATH);
116
117 long time = (System.currentTimeMillis() - ts_start);
118 long bytes = fileSys.getContentSummary(DEMUX_OUTPUT_PATH).getLength();
119 System.out.println("result was " + bytes + " bytes long");
120 System.out.println("processing took " + time + " milliseconds");
121 System.out.println("aka " + time * 1.0 / LINES + " ms per line or "
122 + time * 1000.0 / bytes + " ms per kilobyte of log data");
123
124 } catch (Exception e) {
125 e.printStackTrace();
126 }
127 }
128
129 }