1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.validationframework.util;
19
20
21 import java.io.DataInputStream;
22 import java.io.EOFException;
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.FileOutputStream;
26 import java.io.FileWriter;
27 import java.io.IOException;
28 import java.net.URI;
29 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
30 import org.apache.hadoop.chukwa.Chunk;
31 import org.apache.hadoop.chukwa.ChunkImpl;
32 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
33 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
34 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.io.SequenceFile;
39 import org.apache.log4j.Logger;
40
41 public class DataOperations {
42 static Logger log = Logger.getLogger(DataOperations.class);
43
44 public static void copyFile(String fromFileName, String toFileName)
45 throws IOException {
46 File fromFile = new File(fromFileName);
47 File toFile = new File(toFileName);
48
49 FileInputStream from = null;
50 FileOutputStream to = null;
51 try {
52 from = new FileInputStream(fromFile);
53 to = new FileOutputStream(toFile);
54 byte[] buffer = new byte[4096];
55 int bytesRead;
56
57 while ((bytesRead = from.read(buffer)) != -1)
58 to.write(buffer, 0, bytesRead);
59 } finally {
60 if (from != null)
61 try {
62 from.close();
63 } catch (IOException e) {
64 ;
65 }
66 if (to != null)
67 try {
68 to.close();
69 } catch (IOException e) {
70
71 }
72 }
73 }
74
75 public static boolean validateMD5(String inputFile, String testFile) {
76
77
78 String md5_1 = MD5.checksum(new File(inputFile));
79 String md5_2 = MD5.checksum(new File(testFile));
80
81 return md5_1.intern() == md5_2.intern();
82 }
83
84 public static boolean validateMD5(FileSystem fs, Path inputFile, Path testFile) {
85
86
87 String md5_1 = MD5.checksum(fs, inputFile);
88 String md5_2 = MD5.checksum(fs, testFile);
89
90 return md5_1.intern() == md5_2.intern();
91 }
92
93 public static boolean validateChukwaRecords(FileSystem fs,
94 Configuration conf, Path inputFile, Path testFile) {
95 SequenceFile.Reader goldReader = null;
96 SequenceFile.Reader testReader = null;
97 try {
98
99
100 goldReader = new SequenceFile.Reader(fs, inputFile, conf);
101 testReader = new SequenceFile.Reader(fs, testFile, conf);
102
103 ChukwaRecordKey goldKey = new ChukwaRecordKey();
104 ChukwaRecord goldRecord = new ChukwaRecord();
105
106 ChukwaRecordKey testKey = new ChukwaRecordKey();
107 ChukwaRecord testRecord = new ChukwaRecord();
108
109
110 while (goldReader.next(goldKey, goldRecord)) {
111 testReader.next(testKey, testRecord);
112
113 if (goldKey.compareTo(testKey) != 0) {
114 log.info(">>>>>>>>>>>>>> Not the same Key");
115 log.info(">>>>>>>>>>>>>> Record [" + goldKey.getKey() + "] ["
116 + goldKey.getReduceType() + "]");
117 log.info(">>>>>>>>>>>>>> Record [" + testKey.getKey() + "] ["
118 + testKey.getReduceType() + "]");
119 return false;
120 }
121
122 if (goldRecord.compareTo(testRecord) != 0) {
123 log.info(">>>>>>>>>>>>>> Not the same Value");
124 log.info(">>>>>>>>>>>>>> Record [" + goldKey.getKey() + "] ["
125 + goldKey.getReduceType() + "]");
126 log.info(">>>>>>>>>>>>>> Record [" + testKey.getKey() + "] ["
127 + testKey.getReduceType() + "]");
128 log.info(">>>>>>>>>>>>>> Gold Value [" + goldRecord.toString() + "]");
129 log.info(">>>>>>>>>>>>>> Test value [" + testRecord.toString() + "]");
130
131 return false;
132 }
133 }
134
135 return true;
136 } catch (IOException e) {
137 e.printStackTrace();
138 return false;
139 } finally {
140 try {
141 goldReader.close();
142 testReader.close();
143 } catch (IOException e) {
144 }
145
146 }
147 }
148
149 public static void extractRawLogFromdataSink(String directory, String fileName)
150 throws Exception {
151 ChukwaConfiguration conf = new ChukwaConfiguration();
152 String fsName = conf.get("writer.hdfs.filesystem");
153 FileSystem fs = FileSystem.get(new URI(fsName), conf);
154
155 SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(directory
156 + fileName + ".done"), conf);
157
158 File outputFile = new File(directory + fileName + ".raw");
159
160 ChukwaArchiveKey key = new ChukwaArchiveKey();
161 ChunkImpl chunk = ChunkImpl.getBlankChunk();
162 FileWriter out = new FileWriter(outputFile);
163 try {
164 while (r.next(key, chunk)) {
165 out.write(new String(chunk.getData()));
166 }
167 } finally {
168 out.close();
169 r.close();
170 }
171
172 }
173
174 public static void extractRawLogFromDump(String directory, String fileName)
175 throws Exception {
176 File inputFile = new File(directory + fileName + ".bin");
177 File outputFile = new File(directory + fileName + ".raw");
178 DataInputStream dis = new DataInputStream(new FileInputStream(inputFile));
179 Chunk chunk = null;
180 FileWriter out = new FileWriter(outputFile);
181 boolean eof = false;
182 do {
183 try {
184 chunk = ChunkImpl.read(dis);
185 out.write(new String(chunk.getData()));
186 } catch (EOFException e) {
187 eof = true;
188 }
189
190 } while (!eof);
191
192 dis.close();
193 out.close();
194 }
195 }