1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.dataloader;
19
20 import junit.framework.TestCase;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.chukwa.ChunkImpl;
23 import org.apache.hadoop.chukwa.Chunk;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.NoSuchElementException;
27 import java.util.regex.Matcher;
28
29 import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
30 import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
31 import org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter;
32 import org.apache.hadoop.chukwa.rest.bean.ClientTraceBean;
33
34 import java.net.*;
35 import java.io.*;
36
37 public class TestSocketDataLoader extends TestCase{
38 public void testSocketTee() throws Exception {
39
40 Configuration conf = new Configuration();
41
42 conf.set("chukwaCollector.pipeline",
43 SocketTeeWriter.class.getCanonicalName());
44
45 conf.set("chukwaCollector.writerClass",
46 PipelineStageWriter.class.getCanonicalName());
47
48 PipelineStageWriter psw = new PipelineStageWriter();
49 psw.init(conf);
50
51 SocketDataLoader sdl = new SocketDataLoader("all");
52
53 System.out.println("pipeline established; now pushing a chunk");
54 ArrayList<Chunk> l = new ArrayList<Chunk>();
55 l.add(new ChunkImpl("dt", "name", 1, new byte[] {'a'}, null));
56 psw.add(l);
57
58
59 try {
60 Collection<Chunk> clist = sdl.read();
61 for(Chunk c : clist) {
62 if(c!=null && c.getData()!=null) {
63 assertTrue("a".equals(new String(c.getData())));
64 }
65 }
66 } catch(NoSuchElementException e) {
67 }
68 }
69
70 }