1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.writer;
19
20 import junit.framework.TestCase;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.chukwa.ChunkImpl;
23 import org.apache.hadoop.chukwa.Chunk;
24 import java.util.ArrayList;
25 import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
26 import org.apache.hadoop.io.IOUtils;
27 import java.net.*;
28 import java.io.*;
29
30 public class TestSocketTee extends TestCase{
31 public void testSocketTee() throws Exception {
32
33 Configuration conf = new Configuration();
34
35 conf.set("chukwaCollector.pipeline",
36 SocketTeeWriter.class.getCanonicalName()+","
37 + CaptureWriter.class.getCanonicalName());
38
39 conf.set("chukwaCollector.writerClass",
40 PipelineStageWriter.class.getCanonicalName());
41
42 PipelineStageWriter psw = new PipelineStageWriter();
43 psw.init(conf);
44
45 System.out.println("pipeline established; now pushing a chunk");
46 ArrayList<Chunk> l = new ArrayList<Chunk>();
47 l.add(new ChunkImpl("dt", "name", 1, new byte[] {'a'}, null));
48 psw.add(l);
49
50 assertEquals(1, CaptureWriter.outputs.size());
51
52
53 System.out.println("connecting to localhost");
54 Socket s = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
55
56 DataOutputStream dos = new DataOutputStream (s.getOutputStream());
57 dos.write((SocketTeeWriter.WRITABLE + " datatype=dt3\n").getBytes());
58 DataInputStream dis = new DataInputStream(s.getInputStream());
59
60 System.out.println("command send");
61
62 dis.readFully(new byte[3]);
63
64 l = new ArrayList<Chunk>();
65 l.add(new ChunkImpl("dt2", "name", 1, new byte[] {'b'}, null));
66 psw.add(l);
67 assertEquals(2, CaptureWriter.outputs.size());
68
69 System.out.println("sent nonmatching chunk");
70
71
72
73 l = new ArrayList<Chunk>();
74 l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'c'}, null));
75 psw.add(l);
76 assertEquals(3, CaptureWriter.outputs.size());
77
78 System.out.println("sent matching chunk");
79
80 System.out.println("reading...");
81 ChunkImpl chunk = ChunkImpl.read(dis);
82 assertTrue(chunk.getDataType().equals("dt3"));
83 System.out.println(chunk);
84
85 dis.close();
86 dos.close();
87 s.close();
88
89 Socket s2 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
90 s2.getOutputStream().write((SocketTeeWriter.RAW+" content=.*d.*\n").getBytes());
91 dis = new DataInputStream(s2.getInputStream());
92 dis.readFully(new byte[3]);
93 l = new ArrayList<Chunk>();
94 l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'d'}, null));
95 psw.add(l);
96 assertEquals(4, CaptureWriter.outputs.size());
97
98 int len = dis.readInt();
99 assertTrue(len == 1);
100 byte[] data = new byte[100];
101 int read = dis.read(data);
102 assertTrue(read == 1);
103 assertTrue(data[0] == 'd');
104
105 s2.close();
106 dis.close();
107
108 l = new ArrayList<Chunk>();
109 l.add(new ChunkImpl("dt3", "name", 3, new byte[] {'c', 'a', 'd'}, null));
110 psw.add(l);
111 assertEquals(5, CaptureWriter.outputs.size());
112
113
114 Socket s3 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
115 s3.getOutputStream().write((SocketTeeWriter.ASCII_HEADER+" all\n").getBytes());
116 dis = new DataInputStream(s3.getInputStream());
117 dis.readFully(new byte[3]);
118 l = new ArrayList<Chunk>();
119 chunk= new ChunkImpl("dataTypeFoo", "streamName", 4, new byte[] {'t','e','x','t'}, null);
120 chunk.setSource("hostNameFoo");
121 l.add(chunk);
122 psw.add(l);
123 assertEquals(6, CaptureWriter.outputs.size());
124 len = dis.readInt();
125 data = new byte[len];
126 IOUtils.readFully(dis, data, 0, len);
127 String rcvd = new String(data);
128 System.out.println("got " + read+"/" +len +" bytes: " + rcvd);
129 assertEquals("hostNameFoo dataTypeFoo streamName 4\ntext", rcvd);
130 s3.close();
131 dis.close();
132
133 }
134
135 }