View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer;
19  
20  
21  import java.util.ArrayList;
22  
23  import junit.framework.Assert;
24  import junit.framework.TestCase;
25  
26  import org.apache.hadoop.chukwa.Chunk;
27  import org.apache.hadoop.chukwa.ChunkImpl;
28  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.HBaseConfiguration;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.HColumnDescriptor;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.client.HTable;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.client.ResultScanner;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.log4j.Logger;
41  
42  
43  public class TestHBaseWriter extends TestCase{
44    static Logger log = Logger.getLogger(TestHBaseWriter.class);
45    private HBaseTestingUtility util;
46    private HBaseWriter hbw;
47    private Configuration conf;
48    private byte[] columnFamily = Bytes.toBytes("TestColumnFamily");
49    private byte[] qualifier = Bytes.toBytes("Key");
50    private byte[] expectedValue = Bytes.toBytes("Value");
51  
52    private byte[] table = Bytes.toBytes("Test");
53    private byte[] test = Bytes.toBytes("1234567890 Key Value");
54    private ChukwaConfiguration cc;
55    long timestamp = 1234567890;
56    
57    public TestHBaseWriter() {
58      cc = new ChukwaConfiguration();
59  
60      conf = HBaseConfiguration.create();
61      conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
62      try {
63        util = new HBaseTestingUtility(conf);
64        util.startMiniZKCluster();
65        util.getConfiguration().setBoolean("dfs.support.append", true);
66        util.startMiniCluster(2);
67        HTableDescriptor desc = new HTableDescriptor();
68        HColumnDescriptor family = new HColumnDescriptor(columnFamily);
69        desc.setName(table);
70        desc.addFamily(family);
71        util.getHBaseAdmin().createTable(desc);
72  
73      } catch (Exception e) {
74        e.printStackTrace();
75        Assert.fail(e.getMessage());
76      }
77    }
78    
79    public void setup() {
80      
81    }
82    
83    public void tearDown() {
84      
85    }
86    
87    public void testWriters() {
88      ArrayList<Chunk> chunks = new ArrayList<Chunk>();
89      chunks.add(new ChunkImpl("TextParser", "name", timestamp, test, null));      
90      try {      
91        cc.set("hbase.demux.package", "org.apache.chukwa.datacollection.writer.test.demux");
92        cc.set("TextParser","org.apache.hadoop.chukwa.datacollection.writer.test.demux.TextParser");
93        conf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
94        hbw = new HBaseWriter(cc, conf);
95        hbw.init(cc);
96        if(hbw.add(chunks)!=ChukwaWriter.COMMIT_OK) {
97          Assert.fail("Commit status is not OK.");
98        }
99        HTable testTable = new HTable(table);
100       ResultScanner scanner = testTable.getScanner(columnFamily, qualifier);
101       for(Result res : scanner) {
102         Assert.assertEquals(new String(expectedValue), new String(res.getValue(columnFamily, qualifier)));
103       }
104       // Cleanup and return
105       scanner.close();
106       // Compare data in Hbase with generated chunks
107       util.shutdownMiniCluster();
108     } catch (Exception e) {
109       e.printStackTrace();
110       Assert.fail(e.getMessage());
111     }
112   }
113 }