View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor;
19  
20  import static org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile;
21  import java.io.File;
22  import java.io.IOException;
23  import java.net.DatagramPacket;
24  import java.net.DatagramSocket;
25  import java.net.InetSocketAddress;
26  import junit.framework.TestCase;
27  import org.apache.hadoop.chukwa.Chunk;
28  import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
29  import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
30  import org.apache.hadoop.conf.Configuration;
31  
32  public class TestBufferingWrappers extends TestCase {
33  
34    Configuration conf = new Configuration();
35    static File baseDir;
36    ChunkCatcherConnector chunks;
37    
38    public TestBufferingWrappers() throws IOException {
39      baseDir = new File(System.getProperty("test.build.data", "/tmp"));
40      conf.setInt("chukwaAgent.control.port", 0);
41      conf.set("chukwaAgent.checkpoint.dir", baseDir.getCanonicalPath());
42      conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
43      conf.setInt("chukwaAgent.adaptor.fileadaptor.timeoutperiod", 100);
44      conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
45  
46      chunks = new ChunkCatcherConnector();
47      chunks.start();
48    }
49    
50    public void testMBResendAfterStop() throws Exception{
51      resendAfterStop("MemBuffered");
52    }
53  
54    public void testWriteaheadResendAfterStop() throws Exception{
55      resendAfterStop("WriteaheadBuffered");
56    }
57  
58    
59    //start a wrapped FileAdaptor. Pushes a chunk. Stop it and restart.
60    //chunk hasn't been acked, so should get pushed again.
61    //we delete the file and also change the data type each time through the loop
62    //to make sure we get the cached chunk.
63    public void resendAfterStop(String adaptor)  throws IOException,
64    ChukwaAgent.AlreadyRunningException, InterruptedException {
65      
66      ChukwaAgent agent = new ChukwaAgent(conf);
67      String ADAPTORID = "adaptor_test" + System.currentTimeMillis(); 
68      String STR = "test data";
69      int PORTNO = 9878;
70      DatagramSocket send = new DatagramSocket();
71      byte[] buf = STR.getBytes();
72      DatagramPacket p = new DatagramPacket(buf, buf.length);
73      p.setSocketAddress(new InetSocketAddress("127.0.0.1",PORTNO));
74      
75      assertEquals(0, agent.adaptorCount());
76      String name =agent.processAddCommand("add "+ ADAPTORID + " = "+adaptor+" UDPAdaptor raw "+PORTNO+ " 0");
77      assertEquals(name, ADAPTORID);
78      Thread.sleep(500);
79      send.send(p);
80      
81      for(int i=0; i< 5; ++i) {
82        Chunk c = chunks.waitForAChunk(5000);
83        System.out.println("received " + i);
84        assertNotNull(c);
85        String dat = new String(c.getData());
86        assertTrue(dat.equals(STR));
87        assertTrue(c.getDataType().equals("raw"));
88        assertEquals(c.getSeqID(), STR.length());
89        
90        agent.stopAdaptor(name, AdaptorShutdownPolicy.RESTARTING);
91        Thread.sleep(500); //for socket to deregister
92        name =agent.processAddCommand("add "+ADAPTORID + " = "+adaptor+" UDPAdaptor raw "+PORTNO + " 0");
93        assertEquals(name, ADAPTORID);
94      }
95      Chunk c = chunks.waitForAChunk(5000);
96  
97      Thread.sleep(500);
98      
99      buf = "different data".getBytes();
100     p = new DatagramPacket(buf, buf.length);   
101     p.setSocketAddress(new InetSocketAddress("127.0.0.1",PORTNO));
102     send.send(p);
103     c = chunks.waitForAChunk(5000);
104     assertNotNull(c);
105     assertEquals(buf.length + STR.length(), c.getSeqID());
106     
107     agent.stopAdaptor(name, true);
108     assertEquals(0, agent.adaptorCount());
109     Thread.sleep(500);//before re-binding
110     agent.shutdown();
111   }
112 }