View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
19  
20  import java.io.File;
21  import java.io.FileOutputStream;
22  import java.io.IOException;
23  import java.io.PrintWriter;
24  import org.apache.hadoop.chukwa.Chunk;
25  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
26  import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
27  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
28  import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
29  import org.apache.hadoop.conf.Configuration;
30  import junit.framework.TestCase;
31  import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy;
32  import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
33  import org.apache.log4j.Level;
34  
35  public class TestRCheckAdaptor extends TestCase implements ChunkReceiver {
36    
37    ChunkCatcherConnector chunks;
38  
39    public TestRCheckAdaptor() {
40      chunks = new ChunkCatcherConnector();
41      chunks.start();
42    }
43  
44    public void testBaseCases() throws IOException, InterruptedException,
45        ChukwaAgent.AlreadyRunningException {
46      Configuration conf = new Configuration();
47      conf.set("chukwaAgent.control.port", "0");
48      conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
49          
50  //    RCheckFTAdaptor.log.setLevel(Level.DEBUG);
51      File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
52      TestDirTailingAdaptor.createEmptyDir(baseDir);
53      File tmpOutput = new File(baseDir, "rotateTest.1");
54      PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput));
55      pw.println("First");
56      pw.close();
57      Thread.sleep(1000);//to make sure mod dates are distinguishing.
58      tmpOutput = new File(baseDir, "rotateTest");
59      pw = new PrintWriter(new FileOutputStream(tmpOutput));
60      pw.println("Second");
61      pw.close();
62      
63      
64      ChukwaAgent agent = new ChukwaAgent(conf);
65      String adaptorID = agent.processAddCommand("add lr = filetailer.RCheckFTAdaptor test " + tmpOutput.getAbsolutePath() + " 0");
66      assertNotNull(adaptorID);
67      
68      Chunk c = chunks.waitForAChunk(2000);
69      assertNotNull(c);
70      assertTrue(c.getData().length == 6);
71      assertTrue("First\n".equals(new String(c.getData())));
72      c = chunks.waitForAChunk(2000);
73      assertNotNull(c);
74      assertTrue(c.getData().length == 7);    
75      assertTrue("Second\n".equals(new String(c.getData())));
76  
77      pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
78      pw.println("Third");
79      pw.close();
80      c = chunks.waitForAChunk(2000);
81      
82      assertNotNull(c);
83      assertTrue(c.getData().length == 6);    
84      assertTrue("Third\n".equals(new String(c.getData())));
85      Thread.sleep(1500);
86      
87      tmpOutput.renameTo(new File(baseDir, "rotateTest.2"));
88      pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
89      pw.println("Fourth");
90      pw.close();
91      c = chunks.waitForAChunk(2000);
92  
93      assertNotNull(c);
94      System.out.println("got " + new String(c.getData()));
95      assertTrue("Fourth\n".equals(new String(c.getData())));
96  
97      Thread.sleep(1500);
98      
99      tmpOutput.renameTo(new File(baseDir, "rotateTest.3"));
100     Thread.sleep(400);
101     pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
102     pw.println("Fifth");
103     pw.close();
104     c = chunks.waitForAChunk(2000);
105     assertNotNull(c);
106     System.out.println("got " + new String(c.getData()));
107     assertTrue("Fifth\n".equals(new String(c.getData())));
108 
109     agent.shutdown();
110     Thread.sleep(2000);
111   }
112   
113   
114   public void testContinuously() throws Exception {
115     File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
116     TestDirTailingAdaptor.createEmptyDir(baseDir);
117     File tmpOutput = new File(baseDir, "continuousTest");
118     PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
119     LWFTAdaptor.tailer.SAMPLE_PERIOD_MS = 2000;
120 
121 //    RCheckFTAdaptor.log.setLevel(Level.DEBUG);
122     RCheckFTAdaptor rca = new RCheckFTAdaptor();
123     rca.parseArgs("Test", tmpOutput.getAbsolutePath(), AdaptorManager.NULL);
124     rca.start("id", "Test", 0, this);
125     
126 
127     Thread.sleep(1000);
128     for(int i= 0; i < 200; ++i) {
129       Thread.sleep(120);
130       pw.println("This is line:" + i);
131       if( i % 5 == 0)
132         pw.flush();
133       if(i % 20 == 0) {
134         System.err.println("rotating");
135         pw.close();
136         tmpOutput.renameTo( new File(baseDir, "continuousTest."+(i/10)));
137         pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
138       }
139     }
140     Thread.sleep(1000);
141 
142     rca.shutdown(AdaptorShutdownPolicy.HARD_STOP);
143     
144   }
145 
146   volatile int nextExpectedLine = 0;
147   
148   @Override
149   public void add(Chunk event) throws InterruptedException {
150 //    System.out.println("got a chunk; len = " + event.getData().length);
151     String[] lines = new String(event.getData()).split("\n");
152     System.err.println("got chunk; " + lines.length + " lines " + event.getData().length + " bytes");
153     for(String line: lines) {
154       String n = line.substring(line.indexOf(':')+1);
155       int i = Integer.parseInt(n);
156 //      System.out.println("saw "+i);
157       if(i != nextExpectedLine) {
158         System.err.println("lines out of order: saw " + i + " expected " + nextExpectedLine);
159         System.exit(0);
160         fail();
161       }
162       nextExpectedLine = i+1;
163     
164     }
165   }
166 
167 }