View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.PrintWriter;
24  import org.apache.hadoop.chukwa.Chunk;
25  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
26  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
27  import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
28  import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
29  import junit.framework.Assert;
30  import junit.framework.TestCase;
31  
32  public class TestFileTailingAdaptorBigRecord extends TestCase {
33  
34    ChunkCatcherConnector chunks;
35  
36    public void testBigRecord() {
37      File f = null;
38      try {
39        File tempDir = new File(System.getProperty("test.build.data", "/tmp"));
40        if (!tempDir.exists()) {
41          tempDir.mkdirs();
42        }
43        String logFile = tempDir.getPath() + "/Chukwa-bigRecord.txt";
44        f = makeTestFile(logFile);
45  
46        chunks = new ChunkCatcherConnector();
47        chunks.start();
48  
49        // Remove any adaptor left over from previous run
50        ChukwaConfiguration cc = new ChukwaConfiguration();
51        cc.set("chukwaAgent.control.port", "0");
52        cc.setInt("chukwaAgent.fileTailingAdaptor.maxReadSize", 55);
53        ChukwaAgent agent = new ChukwaAgent(cc);
54        int portno = agent.getControllerPort();
55        while (portno == -1) {
56          Thread.sleep(1000);
57          portno = agent.getControllerPort();
58        }
59  
60        // System.out.println("Port number:" + portno);
61        ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
62        cli.removeAll();
63        // sleep for some time to make sure we don't get chunk from existing
64        // streams
65        Thread.sleep(5000);
66        String adaptorId = agent
67            .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped"
68                + " BigRecord " + logFile + " 0");
69        assertNotNull(adaptorId);
70  
71        boolean record8Found = false;
72        Chunk c = null;
73        // Keep reading until record8
74        // If the adaptor is stopped then Junit will fail with a timeOut
75        while (!record8Found) {
76          c = chunks.waitForAChunk();//only wait three minutes
77          String data = new String(c.getData());
78          if (c.getDataType().equals("BigRecord")
79              && data.indexOf("8 abcdefghijklmnopqrstuvwxyz") >= 0) {
80            record8Found = true;
81          }
82        }
83        agent.stopAdaptor(adaptorId, true);
84        agent.shutdown();
85        Thread.sleep(2000);
86      } catch (Exception e) {
87        Assert.fail("Exception in testBigRecord: " + e.getMessage());
88      } finally {
89        if (f != null) {
90          f.delete();
91        }
92      }
93    }
94  
95    private File makeTestFile(String name) throws IOException {
96      File tmpOutput = new File(name);
97      FileOutputStream fos = new FileOutputStream(tmpOutput);
98  
99      PrintWriter pw = new PrintWriter(fos);
100     for (int i = 0; i < 5; ++i) {
101       pw.print(i + " ");
102       pw.println("abcdefghijklmnopqrstuvwxyz");
103     }
104     pw.print("6 ");
105     for (int i = 0; i < 10; ++i) {
106       pw.print("abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz");
107     }
108     pw.print("\n");
109     pw.print("7 ");
110     pw.println("abcdefghijklmnopqrstuvwxyz");
111     pw.print("8 ");
112     pw.println("abcdefghijklmnopqrstuvwxyz");
113 
114     pw.flush();
115     pw.close();
116     return tmpOutput;
117   }
118 
119 }