1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer;
19
20 import java.io.File;
21 import java.util.LinkedList;
22 import java.util.List;
23 import java.util.Date;
24 import java.text.SimpleDateFormat;
25
26 import junit.framework.Assert;
27 import junit.framework.TestCase;
28
29 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
30 import org.apache.hadoop.chukwa.Chunk;
31 import org.apache.hadoop.chukwa.ChunkBuilder;
32 import org.apache.hadoop.chukwa.ChunkImpl;
33 import org.apache.hadoop.chukwa.datacollection.writer.localfs.LocalWriter;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.io.SequenceFile;
38
39
40
41
42 public class TestChukwaWriters extends TestCase{
43
44 public void testWriters() {
45 try {
46
47 Configuration conf = new Configuration();
48 FileSystem fs = FileSystem.getLocal(conf);
49
50
51 ChukwaWriter seqWriter = new SeqFileWriter();
52 ChukwaWriter localWriter = new LocalWriter();
53
54 List<Chunk> chunksSeqWriter = new LinkedList<Chunk>();
55 List<Chunk> chunksLocalWriter = new LinkedList<Chunk>();
56 for(int i=0;i<10;i++) {
57 ChunkBuilder cb1 = new ChunkBuilder();
58 cb1.addRecord(("record-" +i) .getBytes());
59 cb1.addRecord("foo" .getBytes());
60 cb1.addRecord("bar".getBytes());
61 cb1.addRecord("baz".getBytes());
62 chunksSeqWriter.add(cb1.getChunk());
63
64 ChunkBuilder cb2 = new ChunkBuilder();
65 cb2.addRecord(("record-" +i) .getBytes());
66 cb2.addRecord("foo" .getBytes());
67 cb2.addRecord("bar".getBytes());
68 cb2.addRecord("baz".getBytes());
69 chunksLocalWriter.add(cb2.getChunk());
70
71 }
72
73 File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
74 if (!tempDir.exists()) {
75 tempDir.mkdirs();
76 }
77
78 String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testWriters_" + System.currentTimeMillis() + "/";
79
80
81 Configuration confSeqWriter = new Configuration();
82 confSeqWriter.set("chukwaCollector.rotateInterval", "300000");
83 confSeqWriter.set("writer.hdfs.filesystem", "file:///");
84 String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
85 confSeqWriter.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
86
87 seqWriter.init(confSeqWriter);
88 Thread.sleep(5000);
89 seqWriter.add(chunksSeqWriter);
90 seqWriter.close();
91
92 String seqWriterFile = null;
93
94 File directory = new File(seqWriterOutputDir);
95 String[] files = directory.list();
96 for(String file: files) {
97 if ( file.endsWith(".done") ){
98 seqWriterFile = seqWriterOutputDir + File.separator + file;
99 break;
100 }
101 }
102
103 Assert.assertFalse(seqWriterFile == null);
104
105 String seqWriterDump = dumpArchive(fs,conf,seqWriterFile);
106
107 Configuration confLocalWriter = new Configuration();
108 confSeqWriter.set("writer.hdfs.filesystem", "file:///");
109 String localWriterOutputDir = outputDirectory +"/localWriter/localOutputDir";
110 confLocalWriter.set("chukwaCollector.localOutputDir",localWriterOutputDir);
111 confLocalWriter.set("chukwaCollector.rotateInterval", "300000");
112 confLocalWriter.set("chukwaCollector.minPercentFreeDisk", "2");
113
114
115
116 String localWriterFile = null;
117 localWriter.init(confLocalWriter);
118 Thread.sleep(5000);
119 localWriter.add(chunksLocalWriter);
120 localWriter.close();
121
122 directory = new File(localWriterOutputDir);
123 files = directory.list();
124 for(String file: files) {
125 if ( file.endsWith(".done") ){
126 localWriterFile = localWriterOutputDir + File.separator + file;
127 break;
128 }
129 }
130
131 Assert.assertFalse(localWriterFile == null);
132 String localWriterDump = dumpArchive(fs,conf,localWriterFile);
133
134 Assert.assertTrue(seqWriterDump.intern() == localWriterDump.intern());
135
136 File fOutputDirectory = new File(outputDirectory);
137 fOutputDirectory.delete();
138 } catch (Throwable e) {
139 e.printStackTrace();
140 Assert.fail("Exception in TestChukwaWriters," + e.getMessage());
141 }
142
143 }
144
145 protected String dumpArchive(FileSystem fs,Configuration conf, String file) throws Throwable {
146 SequenceFile.Reader reader = null;
147 try {
148 reader = new SequenceFile.Reader(fs, new Path(file), conf);
149
150 ChukwaArchiveKey key = new ChukwaArchiveKey();
151 ChunkImpl chunk = ChunkImpl.getBlankChunk();
152
153 StringBuilder sb = new StringBuilder();
154 while (reader.next(key, chunk)) {
155 sb.append("\nTimePartition: " + key.getTimePartition());
156 sb.append("DataType: " + key.getDataType());
157 sb.append("StreamName: " + key.getStreamName());
158 sb.append("SeqId: " + key.getSeqId());
159 sb.append("\t\t =============== ");
160
161 sb.append("Cluster : " + chunk.getTags());
162 sb.append("DataType : " + chunk.getDataType());
163 sb.append("Source : " + chunk.getSource());
164 sb.append("Application : " + chunk.getStreamName());
165 sb.append("SeqID : " + chunk.getSeqID());
166 sb.append("Data : " + new String(chunk.getData()));
167 return sb.toString();
168 }
169 } catch (Throwable e) {
170 Assert.fail("Exception while reading SeqFile"+ e.getMessage());
171 throw e;
172 }
173
174 finally {
175 if (reader != null) {
176 reader.close();
177 }
178 }
179 return null;
180 }
181
182
183
184
185
186
187
188
189
190
191
192 public void testSeqWriterFixedCloseInterval() {
193 try {
194 long rotateInterval = 10000;
195 long intervalOffset = 3000;
196
197 ChukwaWriter seqWriter = new SeqFileWriter();
198
199 File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
200 if (!tempDir.exists()) {
201 tempDir.mkdirs();
202 }
203
204 String outputDirectory = tempDir.getPath() + "/testChukwaWriters_testSeqWriterFixedCloseInterval_" +
205 System.currentTimeMillis() + "/";
206
207 Configuration confSeqWriter = new Configuration();
208 confSeqWriter.set("chukwaCollector.rotateInterval", String.valueOf(rotateInterval));
209 confSeqWriter.set("writer.hdfs.filesystem", "file:///");
210 String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
211 confSeqWriter.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
212 confSeqWriter.set("chukwaCollector.isFixedTimeRotatorScheme", "true");
213 confSeqWriter.set("chukwaCollector.fixedTimeIntervalOffset", String.valueOf(intervalOffset));
214
215 File directory = new File(seqWriterOutputDir);
216
217
218
219 File[] files = directory.listFiles();
220 if (files != null) {
221 for(File file: files) {
222 file.delete();
223 }
224 }
225
226
227
228
229
230 long currentTime = System.currentTimeMillis();
231 long currentTimeInSec = currentTime/1000;
232 long timeAfterPrevRotateInterval = currentTimeInSec % rotateInterval;
233 if(timeAfterPrevRotateInterval > (rotateInterval - 2)){
234 Thread.sleep(2000);
235 }
236
237 seqWriter.init(confSeqWriter);
238 String [] fileNames = directory.list();
239 String firstFileName = "";
240 String initialTimestamp = "";
241
242
243
244 for(String file: fileNames) {
245 if ( file.endsWith(".chukwa") ){
246
247
248 firstFileName = file;
249
250
251 initialTimestamp = file.split("_")[0];
252
253
254 initialTimestamp = initialTimestamp.substring(0, initialTimestamp.length()-3);
255 break;
256 }
257 }
258
259 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddhhmmss");
260 Date initialDate = formatter.parse(initialTimestamp);
261 long initialDateInMillis = initialDate.getTime();
262
263
264 long prevRoundedInterval = initialDateInMillis - (initialDateInMillis %
265 rotateInterval);
266 long expectedNextCloseDate = prevRoundedInterval +
267 rotateInterval + intervalOffset;
268
269
270
271
272 long sleepTime = rotateInterval + intervalOffset;
273
274 Thread.sleep(sleepTime);
275 fileNames = directory.list();
276 String nextTimestamp = "";
277
278 for(String file: fileNames) {
279 if ( file.endsWith(".chukwa") && !file.equals(firstFileName)){
280 nextTimestamp = file.split("_")[0];
281 nextTimestamp = nextTimestamp.substring(0, nextTimestamp.length()-3);
282 break;
283 }
284 }
285
286 Date nextDate = formatter.parse(nextTimestamp);
287 long nextDateInMillis = nextDate.getTime();
288
289 long threshold = 500;
290
291
292
293
294 Assert.assertTrue("File not closed at expected time",
295 (nextDateInMillis - expectedNextCloseDate < threshold));
296 seqWriter.close();
297
298 } catch (Throwable e) {
299 e.printStackTrace();
300 Assert.fail("Exception in TestChukwaWriters - " +
301 "testSeqFileFixedCloseInterval()," + e.getMessage());
302 }
303 }
304
305
306
307
308
309
310
311
312
313 public void testFixedIntervalOffsetCalculation(){
314 try{
315 SeqFileWriter seqFileWriter = new SeqFileWriter();
316 SimpleDateFormat formatter = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
317
318
319 long rotateInterval = 300000;
320 long offsetInterval = 60000;
321 long currentTimestamps[] = new long[5] ;
322 long expectedRotateTimestamps[] = new long[5];
323
324 Date date = formatter.parse("2011/06/15 01:05:00");
325 currentTimestamps[0] = date.getTime();
326 expectedRotateTimestamps[0] = 1308125460000L;
327
328 date = formatter.parse("2011/06/15 01:06:00");
329 currentTimestamps[1] = date.getTime();
330 expectedRotateTimestamps[1] = 1308125460000L;
331
332 date = formatter.parse("2011/06/15 01:02:00");
333 currentTimestamps[2] = date.getTime();
334 expectedRotateTimestamps[2] = 1308125160000L;
335
336 date = formatter.parse("2011/06/15 01:04:00");
337 currentTimestamps[3] = date.getTime();
338 expectedRotateTimestamps[3] = 1308125160000L;
339
340
341 date = formatter.parse("2011/06/15 01:56:00");
342 currentTimestamps[4] = date.getTime();
343 expectedRotateTimestamps[4] = 1308128460000L;
344
345 int i=0;
346 long expectedDelay = 0;
347 long actualRotateTimestamp = 0;
348 for(; i<5; i++){
349 expectedDelay = seqFileWriter.getDelayForFixedInterval(
350 currentTimestamps[i], rotateInterval, offsetInterval);
351 actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
352 Assert.assertTrue("Incorrect value for delay",
353 (actualRotateTimestamp==expectedRotateTimestamps[i]));
354 }
355
356
357 rotateInterval = 60000;
358 offsetInterval = 30000;
359
360 date = formatter.parse("2011/06/15 01:05:00");
361 currentTimestamps[0] = date.getTime();
362 expectedRotateTimestamps[0] = 1308125190000L;
363
364 date = formatter.parse("2011/06/15 01:04:30");
365 currentTimestamps[1] = date.getTime();
366 expectedRotateTimestamps[1] = 1308125130000L;
367
368 date = formatter.parse("2011/06/15 01:05:30");
369 currentTimestamps[2] = date.getTime();
370 expectedRotateTimestamps[2] = 1308125190000L;
371
372 date = formatter.parse("2011/06/15 01:04:00");
373 currentTimestamps[3] = date.getTime();
374 expectedRotateTimestamps[3] = 1308125130000L;
375
376
377 date = formatter.parse("2011/06/15 01:59:30");
378 currentTimestamps[4] = date.getTime();
379 expectedRotateTimestamps[4] = 1308128430000L;
380
381 for(i=0; i<5; i++){
382 expectedDelay = seqFileWriter.getDelayForFixedInterval(
383 currentTimestamps[i], rotateInterval, offsetInterval);
384 actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
385 Assert.assertTrue("Incorrect value for delay",
386 (actualRotateTimestamp==expectedRotateTimestamps[i]));
387 }
388
389
390 rotateInterval = 60000;
391 offsetInterval = 60000;
392
393 date = formatter.parse("2011/06/15 01:02:00");
394 currentTimestamps[0] = date.getTime();
395 expectedRotateTimestamps[0] = 1308125040000L;
396
397 date = formatter.parse("2011/06/15 01:02:30");
398 currentTimestamps[1] = date.getTime();
399 expectedRotateTimestamps[1] = 1308125040000L;
400
401
402 date = formatter.parse("2011/06/15 01:59:30");
403 currentTimestamps[2] = date.getTime();
404 expectedRotateTimestamps[2] = 1308128460000L;
405
406 for(i=0; i<3; i++){
407 expectedDelay = seqFileWriter.getDelayForFixedInterval(
408 currentTimestamps[i], rotateInterval, offsetInterval);
409 actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
410 Assert.assertTrue("Incorrect value for delay",
411 (actualRotateTimestamp==expectedRotateTimestamps[i]));
412 }
413
414
415 rotateInterval = 60000;
416 offsetInterval = 120000;
417
418 date = formatter.parse("2011/06/15 01:02:00");
419 currentTimestamps[0] = date.getTime();
420 expectedRotateTimestamps[0] = 1308125100000L;
421
422 date = formatter.parse("2011/06/15 01:02:30");
423 currentTimestamps[1] = date.getTime();
424 expectedRotateTimestamps[1] = 1308125100000L;
425
426
427 date = formatter.parse("2011/06/15 01:59:30");
428 currentTimestamps[2] = date.getTime();
429 expectedRotateTimestamps[2] = 1308128520000L;
430
431 for(i=0; i<3; i++){
432 expectedDelay = seqFileWriter.getDelayForFixedInterval(
433 currentTimestamps[i], rotateInterval, offsetInterval);
434 actualRotateTimestamp = currentTimestamps[i] + expectedDelay;
435 Assert.assertTrue("Incorrect value for delay",
436 (actualRotateTimestamp==expectedRotateTimestamps[i]));
437 }
438
439 } catch (Throwable e) {
440 e.printStackTrace();
441 Assert.fail("Exception in TestChukwaWriters - " +
442 "testFixedIntervalOffsetCalculation()," + e.getMessage());
443 }
444 }
445 }