View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.agent;
19  
20  
21  import java.io.*;
22  import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
23  import org.apache.hadoop.conf.Configuration;
24  import junit.framework.TestCase;
25  
26  public class TestAgentConfig extends TestCase {
27    public void testInitAdaptors_vs_Checkpoint() {
28      try {
29        // create two target files, foo and bar
30        File foo = File.createTempFile("foo", "test");
31        foo.deleteOnExit();
32        PrintStream ps = new PrintStream(new FileOutputStream(foo));
33        ps.println("foo");
34        ps.close();
35  
36        File bar = File.createTempFile("bar", "test");
37        bar.deleteOnExit();
38        ps = new PrintStream(new FileOutputStream(bar));
39        ps.println("bar");
40        ps.close();
41  
42        // initially, read foo
43        File initialAdaptors = File.createTempFile("initial", "adaptors");
44        initialAdaptors.deleteOnExit();
45        ps = new PrintStream(new FileOutputStream(initialAdaptors));
46        ps.println("add adaptor_testAdaptor= org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8  raw 0 "
47                + foo.getAbsolutePath() + " 0  ");
48        ps.close();
49  
50        Configuration conf = new Configuration();
51        conf.set("chukwaAgent.control.port", "0");
52        conf.set("chukwaAgent.initial_adaptors", initialAdaptors
53            .getAbsolutePath());
54        File checkpointDir = File.createTempFile("chukwatest", "checkpoint");
55        checkpointDir.delete();
56        checkpointDir.mkdir();
57        checkpointDir.deleteOnExit();
58        conf.set("chukwaAgent.checkpoint.dir", checkpointDir.getAbsolutePath());
59  
60        ChukwaAgent agent = new ChukwaAgent(conf);
61        ConsoleOutConnector conn = new ConsoleOutConnector(agent, true);
62        conn.start();
63        assertEquals(1, agent.adaptorCount());// check that we processed initial
64                                              // adaptors
65        assertNotNull(agent.getAdaptor("adaptor_testAdaptor"));
66        assertTrue(agent.getAdaptor("adaptor_testAdaptor").getCurrentStatus().contains("foo"));
67  
68        System.out
69            .println("---------------------done with first run, now stopping");
70        agent.shutdown();
71        Thread.sleep(2000);
72        assertEquals(0, agent.adaptorCount());
73        // at this point, there should be a checkpoint file with a tailer reading
74        // foo.
75        // we're going to rewrite initial adaptors to read bar; after reboot
76        // we should  be looking at both foo andn bar.
77        ps = new PrintStream(new FileOutputStream(initialAdaptors, false));// overwrite
78        ps.println("add bar= org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8  raw 0 "
79                + bar.getAbsolutePath() + " 0  ");
80        ps.close();
81  
82        System.out.println("---------------------restarting");
83        agent = new ChukwaAgent(conf);
84        conn = new ConsoleOutConnector(agent, true);
85        conn.start();
86        assertEquals(2, agent.adaptorCount());// check that we processed initial
87                                              // adaptors
88        assertNotNull(agent.getAdaptor("adaptor_testAdaptor"));
89        assertTrue(agent.getAdaptor("adaptor_testAdaptor").getCurrentStatus().contains("foo"));
90        agent.shutdown();
91        Thread.sleep(2000);
92        System.out.println("---------------------done");
93  
94      } catch (Exception e) {
95        e.printStackTrace();
96        fail(e.toString());
97      }
98    }
99  
100   public void testNoCheckpoints() {
101     try {
102       String tmpdir = System.getProperty("test.build.data", "/tmp");
103       File NONCE_DIR = new File(tmpdir, "/test_chukwa_checkpoints");
104       if (NONCE_DIR.exists()) {
105         for (File f : NONCE_DIR.listFiles())
106           f.delete();
107         NONCE_DIR.delete();
108       }
109       // assertFalse(NONCE_DIR.exists());
110       Configuration conf = new Configuration();
111       conf.set("chukwaAgent.checkpoint.dir", NONCE_DIR.getAbsolutePath());
112       conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
113       conf.setInt("chukwaAgent.control.port", 0);
114 
115       System.out.println("\n\n===checkpoints enabled, dir does not exist:");
116       ChukwaAgent agent = new ChukwaAgent(conf);
117       assertEquals(0, agent.getAdaptorList().size());
118       agent.shutdown();
119       Thread.sleep(2000);
120       assertTrue(NONCE_DIR.exists());
121       for (File f : NONCE_DIR.listFiles())
122         f.delete();
123 
124       System.out
125           .println("\n\n===checkpoints enabled, dir exists but is empty:");
126       agent = new ChukwaAgent(conf);
127       assertEquals(0, agent.getAdaptorList().size());
128       agent.shutdown();
129       Thread.sleep(2000);
130       for (File f : NONCE_DIR.listFiles())
131         f.delete();
132 
133       System.out
134           .println("\n\n===checkpoints enabled, dir exists with zero-length file:");
135       (new File(NONCE_DIR, "chukwa_checkpoint_0")).createNewFile();
136       agent = new ChukwaAgent(conf);
137       assertEquals(0, agent.getAdaptorList().size());
138       agent.processAddCommand("ADD org.apache.hadoop.chukwa.datacollection.adaptor.ChukwaTestAdaptor testdata  0");
139       agent.shutdown();
140       Thread.sleep(2000);
141       assertTrue(new File(NONCE_DIR, "chukwa_checkpoint_1").exists());
142 
143       System.out
144           .println("\n\n===checkpoints enabled, dir exists with valid checkpoint");
145       agent = new ChukwaAgent(conf);
146       assertEquals(1, agent.getAdaptorList().size());
147       agent.shutdown();
148       Thread.sleep(2000);
149       // checkpoint # increments by one on boot and reload
150       assertTrue(new File(NONCE_DIR, "chukwa_checkpoint_2").exists());
151 
152     } catch (Exception e) {
153       fail(e.toString());
154     }
155   }
156 
157 }