View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.datacollection.collector;
20  
21  
22  import junit.framework.TestCase;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
25  import org.apache.hadoop.chukwa.*;
26  import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
27  import org.apache.hadoop.chukwa.datacollection.sender.*;
28  import org.apache.hadoop.chukwa.datacollection.writer.*;
29  import java.util.*;
30  import org.mortbay.jetty.Server;
31  import org.mortbay.jetty.servlet.Context;
32  import org.mortbay.jetty.servlet.ServletHolder;
33  
34  public class TestCollector extends TestCase {
35  
36    public void testCollector() {
37      try {
38        Configuration conf = new Configuration();
39        conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
40        conf.set("chukwaCollector.pipeline",
41                "org.apache.hadoop.chukwa.datacollection.writer.Dedup,"// note
42                                                                       // comma
43                    + "org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter");
44        conf.set("chukwaCollector.writerClass", PipelineStageWriter.class
45            .getCanonicalName());
46        ChukwaHttpSender sender = new ChukwaHttpSender(conf);
47        ArrayList<String> collectorList = new ArrayList<String>();
48        collectorList.add("http://localhost:9990/chukwa");
49        sender.setCollectors(new RetryListOfCollectors(collectorList, conf));
50        Server server = new Server(9990);
51        Context root = new Context(server, "/", Context.SESSIONS);
52  
53        root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
54        server.start();
55        server.setStopAtShutdown(false);
56        Thread.sleep(1000);
57  
58        Chunk c = new ChunkImpl("data", "stream", 0,
59            "testing -- this should appear once".getBytes(), null);
60        ArrayList<Chunk> toSend = new ArrayList<Chunk>();
61        toSend.add(c);
62        toSend.add(c);
63        sender.send(toSend);
64        Thread.sleep(1000);
65        assertEquals(1, CaptureWriter.outputs.size());
66      } catch (Exception e) {
67        fail(e.toString());
68      }
69  
70    }
71  
72  }