View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
19  
20  import junit.framework.TestCase;
21  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
22  import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
23  import org.apache.hadoop.chukwa.Chunk;
24  
25  import org.apache.activemq.ActiveMQConnection;
26  
27  import javax.jms.Message;
28  import javax.jms.TopicConnection;
29  import javax.jms.TopicSession;
30  import javax.jms.Session;
31  import javax.jms.Topic;
32  import javax.jms.TopicPublisher;
33  import java.util.ArrayList;
34  
35  /**
36   * Tests the functionality JMSMessagePropertyTransformer.
37   */
38  public class TestJMSMessagePropertyTransformer extends TestCase implements ChunkReceiver {
39    String DATA_TYPE = "Test";
40    String MESSAGE_PAYLOAD = "Some JMS message payload";
41  
42    TopicConnection connection = null;
43    TopicSession session = null;
44    TopicPublisher publisher = null;
45    ArrayList<String> chunkPayloads;
46    int bytesReceived = 0;
47  
48    protected void setUp() throws Exception {
49      connection =  ActiveMQConnection.makeConnection("vm://localhost");
50      session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
51      Topic topic = session.createTopic("test.topic");
52      publisher = session.createPublisher(topic);
53      chunkPayloads = new ArrayList<String>();
54      bytesReceived = 0;
55    }
56  
57    protected void tearDown() throws Exception {
58      session.close();
59      connection.close();
60    }
61  
62    public void testJMSMessageProperties() throws Exception {
63  
64      JMSAdaptor adaptor = new JMSAdaptor();
65      adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
66                      "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
67                      "-p \"foo,bar,num\"",
68              AdaptorManager.NULL);
69      adaptor.start("id", DATA_TYPE, 0, this);
70  
71      Message message = session.createTextMessage(MESSAGE_PAYLOAD);
72      message.setStringProperty("bar", "bar_value");
73      message.setStringProperty("bat", "bat_value");
74      message.setStringProperty("foo", "foo_value");
75      message.setIntProperty("num", 1);
76      publisher.publish(message);
77  
78      synchronized(this) {
79        wait(1000);
80      }
81      assertEquals("Message not received", 1, chunkPayloads.size());
82      assertEquals("Incorrect chunk payload found",
83              "foo_value\tbar_value\t1", chunkPayloads.get(0));
84    }
85  
86    public void testJMSMessagePropertiesNoQuotes() throws Exception {
87  
88      JMSAdaptor adaptor = new JMSAdaptor();
89      adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
90                      "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
91                      "-p foo,bar,num",
92              AdaptorManager.NULL);
93      adaptor.start("id", DATA_TYPE, 0, this);
94  
95      Message message = session.createTextMessage(MESSAGE_PAYLOAD);
96      message.setStringProperty("bar", "bar_value");
97      message.setStringProperty("bat", "bat_value");
98      message.setStringProperty("foo", "foo_value");
99      message.setIntProperty("num", 1);
100     publisher.publish(message);
101 
102     synchronized(this) {
103       wait(1000);
104     }
105     assertEquals("Message not received", 1, chunkPayloads.size());
106     assertEquals("Incorrect chunk payload found",
107             "foo_value\tbar_value\t1", chunkPayloads.get(0));
108   }
109 
110   public void testJMSMessagePropertiesWithDelimiter() throws Exception {
111 
112     JMSAdaptor adaptor = new JMSAdaptor();
113     adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
114                     "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
115                     "-p \"foo,bar,num -d ' '\"",
116             AdaptorManager.NULL);
117     adaptor.start("id", DATA_TYPE, 0, this);
118 
119     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
120     message.setStringProperty("bar", "bar_value");
121     message.setStringProperty("bat", "bat_value");
122     message.setStringProperty("foo", "foo_value");
123     message.setIntProperty("num", 1);
124     publisher.publish(message);
125 
126     synchronized(this) {
127       wait(1000);
128     }
129     assertEquals("Message not received", 1, chunkPayloads.size());
130     assertEquals("Incorrect chunk payload found", "foo_value bar_value 1", chunkPayloads.get(0));
131   }
132 
133   public void testJMSMessagePropertiesWithNoQuotesDelimiter() throws Exception {
134 
135     JMSAdaptor adaptor = new JMSAdaptor();
136     adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
137                     "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
138                     "-p \"foo,bar,num -d ^^^\"",
139             AdaptorManager.NULL);
140     adaptor.start("id", DATA_TYPE, 0, this);
141 
142     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
143     message.setStringProperty("bar", "bar_value");
144     message.setStringProperty("bat", "bat_value");
145     message.setStringProperty("foo", "foo_value");
146     message.setIntProperty("num", 1);
147     publisher.publish(message);
148 
149     synchronized(this) {
150       wait(1000);
151     }
152     assertEquals("Message not received", 1, chunkPayloads.size());
153     assertEquals("Incorrect chunk payload found", "foo_value^^^bar_value^^^1", chunkPayloads.get(0));
154   }
155 
156   public void testJMSMessagePropertiesWithMultiWordDelimiter() throws Exception {
157 
158     JMSAdaptor adaptor = new JMSAdaptor();
159     adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
160                     "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
161                     "-p \"foo,bar,num -d '[ insert between values ]'\"",
162             AdaptorManager.NULL);
163     adaptor.start("id", DATA_TYPE, 0, this);
164 
165     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
166     message.setStringProperty("bar", "bar_value");
167     message.setStringProperty("bat", "bat_value");
168     message.setStringProperty("foo", "foo_value");
169     message.setIntProperty("num", 1);
170     publisher.publish(message);
171 
172     synchronized(this) {
173       wait(1000);
174     }
175     assertEquals("Message not received", 1, chunkPayloads.size());
176     assertEquals("Incorrect chunk payload found",
177             "foo_value[ insert between values ]bar_value[ insert between values ]1",
178             chunkPayloads.get(0));
179   }
180 
181   public void testJMSPropMissingWithAllRequired() throws Exception {
182 
183     JMSAdaptor adaptor = new JMSAdaptor();
184     adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
185                     "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
186                     "-p \"foo,bar,num\"",
187             AdaptorManager.NULL);
188     adaptor.start("id", DATA_TYPE, 0, this);
189 
190     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
191     message.setStringProperty("bar", "bar_value");
192     message.setStringProperty("bat", "bat_value");
193     message.setIntProperty("num", 1);
194     publisher.publish(message);
195 
196     synchronized(this) {
197       wait(1000);
198     }
199     assertEquals("Message should not have been received", 0, chunkPayloads.size());
200   }
201 
202   public void testJMSPropMissingWithSomeRequired() throws Exception {
203 
204     JMSAdaptor adaptor = new JMSAdaptor();
205     adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
206                     "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
207                     "-p \"foo,bar,num -r foo\"",
208             AdaptorManager.NULL);
209     adaptor.start("id", DATA_TYPE, 0, this);
210 
211     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
212     message.setStringProperty("bar", "bar_value");
213     message.setStringProperty("bat", "bat_value");
214     message.setIntProperty("num", 1);
215     publisher.publish(message);
216 
217     synchronized(this) {
218       wait(1000);
219     }
220     assertEquals("Message should not have been received", 0, chunkPayloads.size());
221   }
222 
223   public void testJMSPropMissingWithSomeRequired2() throws Exception {
224 
225     JMSAdaptor adaptor = new JMSAdaptor();
226     adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
227                     "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
228                     "-p \"foo,bar,num -r foo\"",
229             AdaptorManager.NULL);
230     adaptor.start("id", DATA_TYPE, 0, this);
231 
232     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
233     message.setStringProperty("foo", "foo_value");
234     message.setStringProperty("bat", "bat_value");
235     message.setIntProperty("num", 1);
236     publisher.publish(message);
237 
238     synchronized(this) {
239       wait(1000);
240     }
241     assertEquals("Message not received", 1, chunkPayloads.size());
242     assertEquals("Incorrect chunk payload found", "foo_value\t\t1", chunkPayloads.get(0));
243   }
244 
245   public void testJMSPropfoundWithSomeRequired() throws Exception {
246 
247     JMSAdaptor adaptor = new JMSAdaptor();
248     adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
249                     "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
250                     "-p \"foo,bar,num -r foo\"",
251             AdaptorManager.NULL);
252     adaptor.start("id", DATA_TYPE, 0, this);
253 
254     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
255     message.setStringProperty("bar", "bar_value");
256     message.setStringProperty("bat", "bat_value");
257     message.setStringProperty("foo", "foo_value");
258     message.setIntProperty("num", 1);
259     publisher.publish(message);
260 
261     synchronized(this) {
262       wait(1000);
263     }
264     assertEquals("Message not received", 1, chunkPayloads.size());
265     assertEquals("Incorrect chunk payload found", "foo_value\tbar_value\t1", chunkPayloads.get(0));
266   }
267 
268 
269 
270   public void add(Chunk c) {
271     bytesReceived += c.getData().length;
272     assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
273     assertEquals("Chunk sequenceId should be total bytes received.",
274             bytesReceived, c.getSeqID());
275     chunkPayloads.add(new String(c.getData()));
276   }
277 }