1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.collector;
20
21
22 import junit.framework.TestCase;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
25 import org.apache.hadoop.chukwa.*;
26 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
27 import org.apache.hadoop.chukwa.datacollection.sender.*;
28 import org.apache.hadoop.chukwa.datacollection.writer.*;
29 import java.util.*;
30 import org.mortbay.jetty.Server;
31 import org.mortbay.jetty.servlet.Context;
32 import org.mortbay.jetty.servlet.ServletHolder;
33
34 public class TestCollector extends TestCase {
35
36 public void testCollector() {
37 try {
38 Configuration conf = new Configuration();
39 conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
40 conf.set("chukwaCollector.pipeline",
41 "org.apache.hadoop.chukwa.datacollection.writer.Dedup,"
42
43 + "org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter");
44 conf.set("chukwaCollector.writerClass", PipelineStageWriter.class
45 .getCanonicalName());
46 ChukwaHttpSender sender = new ChukwaHttpSender(conf);
47 ArrayList<String> collectorList = new ArrayList<String>();
48 collectorList.add("http://localhost:9990/chukwa");
49 sender.setCollectors(new RetryListOfCollectors(collectorList, conf));
50 Server server = new Server(9990);
51 Context root = new Context(server, "/", Context.SESSIONS);
52
53 root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
54 server.start();
55 server.setStopAtShutdown(false);
56 Thread.sleep(1000);
57
58 Chunk c = new ChunkImpl("data", "stream", 0,
59 "testing -- this should appear once".getBytes(), null);
60 ArrayList<Chunk> toSend = new ArrayList<Chunk>();
61 toSend.add(c);
62 toSend.add(c);
63 sender.send(toSend);
64 Thread.sleep(1000);
65 assertEquals(1, CaptureWriter.outputs.size());
66 } catch (Exception e) {
67 fail(e.toString());
68 }
69
70 }
71
72 }