View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  
21  import java.io.*;
22  import junit.framework.TestCase;
23  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
24  import java.util.Map;
25  import java.util.Iterator;
26  import org.apache.hadoop.chukwa.Chunk;
27  import org.apache.hadoop.chukwa.datacollection.adaptor.*;
28  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
29  import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
30  import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
31  import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
32  
33  public class TestLogRotate extends TestCase {
34    ChunkCatcherConnector chunks;
35  
36    public TestLogRotate() {
37      chunks = new ChunkCatcherConnector();
38      chunks.start();
39    }
40  
41    public void testLogRotate() throws IOException, InterruptedException,
42        ChukwaAgent.AlreadyRunningException {
43      ChukwaAgent agent = new ChukwaAgent();
44      // Remove any adaptor left over from previous run
45      ChukwaConfiguration cc = new ChukwaConfiguration();
46      int portno = cc.getInt("chukwaAgent.control.port", 9093);
47      ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
48      cli.removeAll();
49      // sleep for some time to make sure we don't get chunk from existing streams
50      Thread.sleep(5000);
51      File testFile = makeTestFile("chukwaLogRotateTest", 80);
52      String adaptorId = agent
53          .processAddCommand("add lr =org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
54              + " lines " + testFile + " 0");
55      assertTrue(adaptorId.equals("adaptor_lr"));
56      System.out.println("getting a chunk...");
57      Chunk c = chunks.waitForAChunk();
58      System.out.println("got chunk");
59      while (!c.getDataType().equals("lines")) {
60        c = chunks.waitForAChunk();
61      }
62      assertTrue(c.getSeqID() == testFile.length());
63      assertTrue(c.getRecordOffsets().length == 80);
64      int recStart = 0;
65      for (int rec = 0; rec < c.getRecordOffsets().length; ++rec) {
66        String record = new String(c.getData(), recStart,
67            c.getRecordOffsets()[rec] - recStart + 1);
68        System.out.println("record " + rec + " was: " + record);
69        assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
70        recStart = c.getRecordOffsets()[rec] + 1;
71      }
72      assertTrue(c.getDataType().equals("lines"));
73      testFile = makeTestFile("chukwaLogRotateTest", 40);
74      c = chunks.waitForAChunk();
75      System.out.println("got chunk");
76      while (!c.getDataType().equals("lines")) {
77        c = chunks.waitForAChunk();
78      }
79      // assertTrue(c.getSeqID() == testFile.length());
80      assertTrue(c.getRecordOffsets().length == 40);
81      recStart = 0;
82      for (int rec = 0; rec < c.getRecordOffsets().length; ++rec) {
83        String record = new String(c.getData(), recStart,
84            c.getRecordOffsets()[rec] - recStart + 1);
85        System.out.println("record " + rec + " was: " + record);
86        assertTrue(record.equals(rec + " abcdefghijklmnopqrstuvwxyz\n"));
87        recStart = c.getRecordOffsets()[rec] + 1;
88      }
89      assertTrue(c.getDataType().equals("lines"));
90      agent.stopAdaptor(adaptorId, false);
91      agent.shutdown();
92      Thread.sleep(2000);
93    }
94  
95  
96  }