1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.inputtools;
20
21
22 import java.io.IOException;
23 import org.apache.hadoop.mapred.Reporter;
24 import junit.framework.TestCase;
25 import org.apache.hadoop.chukwa.*;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.mapred.*;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.io.*;
31
32 public class TestInputFormat extends TestCase {
33
34 String[] lines = { "the rain", "in spain", "falls mainly", "in the plain" };
35
36 public void testInputFormat() {
37
38 try {
39 JobConf conf = new JobConf();
40 String TMP_DIR = System.getProperty("test.build.data", "/tmp");
41 Path filename = new Path("file:///" + TMP_DIR + "/tmpSeqFile");
42 SequenceFile.Writer sfw = SequenceFile.createWriter(FileSystem
43 .getLocal(conf), conf, filename, ChukwaArchiveKey.class,
44 ChunkImpl.class, SequenceFile.CompressionType.NONE, Reporter.NULL);
45
46 StringBuilder buf = new StringBuilder();
47 int offsets[] = new int[lines.length];
48 for (int i = 0; i < lines.length; ++i) {
49 buf.append(lines[i]);
50 buf.append("\n");
51 offsets[i] = buf.length() - 1;
52 }
53 ChukwaArchiveKey key = new ChukwaArchiveKey(0, "datatype", "sname", 0);
54 ChunkImpl val = new ChunkImpl("datatype", "sname", 0, buf.toString()
55 .getBytes(), null);
56 val.setRecordOffsets(offsets);
57 sfw.append(key, val);
58 sfw.append(key, val);
59 sfw.close();
60
61 long len = FileSystem.getLocal(conf).getFileStatus(filename).getLen();
62 InputSplit split = new FileSplit(filename, 0, len, (String[]) null);
63 ChukwaInputFormat in = new ChukwaInputFormat();
64 RecordReader<LongWritable, Text> r = in.getRecordReader(split, conf,
65 Reporter.NULL);
66
67 LongWritable l = r.createKey();
68 Text line = r.createValue();
69 for (int i = 0; i < lines.length * 2; ++i) {
70 boolean succeeded = r.next(l, line);
71 assertTrue(succeeded);
72 assertEquals(i, l.get());
73 assertEquals(lines[i % lines.length], line.toString());
74 System.out.println("read line: " + l.get() + " " + line);
75 }
76 boolean succeeded = r.next(l, line);
77 assertFalse(succeeded);
78
79 } catch (IOException e) {
80 e.printStackTrace();
81 fail("IO exception " + e);
82 }
83 }
84
85 }