View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.writer;
19  
20  import junit.framework.TestCase;
21  import org.apache.hadoop.conf.Configuration;
22  import org.apache.hadoop.chukwa.ChunkImpl;
23  import org.apache.hadoop.chukwa.Chunk;
24  import java.util.ArrayList;
25  import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
26  import org.apache.hadoop.io.IOUtils;
27  import java.net.*;
28  import java.io.*;
29  
30  public class TestSocketTee  extends TestCase{
31    public void testSocketTee() throws Exception {
32      
33      Configuration conf = new Configuration();  
34      
35      conf.set("chukwaCollector.pipeline",
36          SocketTeeWriter.class.getCanonicalName()+","// note comma
37          + CaptureWriter.class.getCanonicalName());
38      
39      conf.set("chukwaCollector.writerClass", 
40          PipelineStageWriter.class.getCanonicalName());
41      
42      PipelineStageWriter psw = new PipelineStageWriter();
43      psw.init(conf);
44  
45      System.out.println("pipeline established; now pushing a chunk");
46      ArrayList<Chunk> l = new ArrayList<Chunk>();
47      l.add(new ChunkImpl("dt", "name", 1, new byte[] {'a'}, null));
48      psw.add(l);
49      //push a chunk through. It should get written, but the socket tee shouldn't do anything.
50      assertEquals(1, CaptureWriter.outputs.size());
51      //now connect and set up a filter.
52      
53      System.out.println("connecting to localhost");
54      Socket s = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
55   //   s.setSoTimeout(2000);
56      DataOutputStream dos = new DataOutputStream (s.getOutputStream());
57      dos.write((SocketTeeWriter.WRITABLE + " datatype=dt3\n").getBytes());
58      DataInputStream dis = new DataInputStream(s.getInputStream());
59  
60      System.out.println("command send");
61  
62      dis.readFully(new byte[3]);
63      //push a chunk not matching filter -- nothing should happen.
64      l = new ArrayList<Chunk>();
65      l.add(new ChunkImpl("dt2", "name", 1, new byte[] {'b'}, null));
66      psw.add(l);
67      assertEquals(2, CaptureWriter.outputs.size());
68  
69      System.out.println("sent nonmatching chunk");
70  
71      //and now one that does match -- data should be available to read off the socket
72  
73      l = new ArrayList<Chunk>();
74      l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'c'}, null));
75      psw.add(l);
76      assertEquals(3, CaptureWriter.outputs.size());
77  
78      System.out.println("sent matching chunk");
79      
80      System.out.println("reading...");
81      ChunkImpl chunk = ChunkImpl.read(dis);
82      assertTrue(chunk.getDataType().equals("dt3"));
83      System.out.println(chunk);
84  
85      dis.close();
86      dos.close();
87      s.close();
88      
89      Socket s2 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
90      s2.getOutputStream().write((SocketTeeWriter.RAW+" content=.*d.*\n").getBytes());
91      dis = new DataInputStream(s2.getInputStream());
92      dis.readFully(new byte[3]); //read "OK\n"
93      l = new ArrayList<Chunk>();
94      l.add(new ChunkImpl("dt3", "name", 1, new byte[] {'d'}, null));
95      psw.add(l);
96      assertEquals(4, CaptureWriter.outputs.size());
97  
98      int len = dis.readInt();
99      assertTrue(len == 1);
100     byte[] data = new byte[100];
101     int read = dis.read(data);
102     assertTrue(read == 1);
103     assertTrue(data[0] == 'd');
104     
105     s2.close();
106     dis.close();
107     
108     l = new ArrayList<Chunk>();
109     l.add(new ChunkImpl("dt3", "name", 3, new byte[] {'c', 'a', 'd'}, null));
110     psw.add(l);
111     assertEquals(5, CaptureWriter.outputs.size());
112     
113     
114     Socket s3 = new Socket("localhost", SocketTeeWriter.DEFAULT_PORT);
115     s3.getOutputStream().write((SocketTeeWriter.ASCII_HEADER+" all\n").getBytes());
116     dis = new DataInputStream(s3.getInputStream());
117     dis.readFully(new byte[3]); //read "OK\n"
118     l = new ArrayList<Chunk>();
119     chunk= new ChunkImpl("dataTypeFoo", "streamName", 4, new byte[] {'t','e','x','t'}, null);
120     chunk.setSource("hostNameFoo");
121     l.add(chunk);
122     psw.add(l);
123     assertEquals(6, CaptureWriter.outputs.size());
124     len = dis.readInt();
125     data = new byte[len];
126     IOUtils.readFully(dis, data, 0, len);
127     String rcvd = new String(data);
128     System.out.println("got " + read+"/" +len  +" bytes: " + rcvd);
129     assertEquals("hostNameFoo dataTypeFoo streamName 4\ntext", rcvd);
130     s3.close();
131     dis.close();
132     
133   }
134   
135 }