View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
19  
20  import junit.framework.TestCase;
21  import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
22  import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
23  import org.apache.hadoop.chukwa.Chunk;
24  
25  import org.apache.activemq.ActiveMQConnection;
26  
27  import javax.jms.Message;
28  import javax.jms.TopicConnection;
29  import javax.jms.TopicSession;
30  import javax.jms.Session;
31  import javax.jms.Topic;
32  import javax.jms.TopicPublisher;
33  
34  /**
35   * Tests the functionality of JMSAdapter and JMSTextMessageTransformer
36   */
37  public class TestJMSAdaptor extends TestCase implements ChunkReceiver {
38    String DATA_TYPE = "Test";
39    String MESSAGE_PAYLOAD = "Some JMS message payload";
40  
41    TopicConnection connection = null;
42    TopicSession session = null;
43    TopicPublisher publisher = null;
44    int bytesReceived = 0;
45    int messagesReceived = 0;
46  
47    protected void setUp() throws Exception {
48      connection =  ActiveMQConnection.makeConnection("vm://localhost");
49      session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
50      Topic topic = session.createTopic("test.topic");
51      publisher = session.createPublisher(topic);
52      messagesReceived = 0;
53      bytesReceived = 0;
54    }
55  
56    protected void tearDown() throws Exception {
57      session.close();
58      connection.close();
59    }
60  
61    public void testJMSTextMessage() throws Exception {
62  
63      JMSAdaptor adaptor = new JMSAdaptor();
64      adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic",
65              AdaptorManager.NULL);
66      adaptor.start("id", DATA_TYPE, 0, this);
67  
68      Message message = session.createTextMessage(MESSAGE_PAYLOAD);
69      publisher.publish(message);
70  
71      synchronized(this) {
72        wait(1000);
73      }
74      assertEquals("Message not received", 1, messagesReceived);
75    }
76  
77    public void testJMSTextMessageWithTransformer() throws Exception {
78  
79      JMSAdaptor adaptor = new JMSAdaptor();
80      adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic -x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSTextMessageTransformer",
81              AdaptorManager.NULL);
82      adaptor.start("id", DATA_TYPE, 0, this);
83  
84      Message message = session.createTextMessage(MESSAGE_PAYLOAD);
85      publisher.publish(message);
86  
87      synchronized(this) {
88        wait(1000);
89      }
90      assertEquals("Message not received", 1, messagesReceived);
91    }
92  
93    public void testJMSTextMessageWithSelector() throws Exception {
94  
95      JMSAdaptor adaptor = new JMSAdaptor();
96      adaptor.parseArgs(DATA_TYPE,
97              "vm://localhost -t test.topic -s \"foo='bar'\"",
98              AdaptorManager.NULL);
99      adaptor.start("id", DATA_TYPE, 0, this);
100 
101     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
102     publisher.publish(message);
103     
104     message = session.createTextMessage(MESSAGE_PAYLOAD);
105     message.setStringProperty("foo", "bar");
106     publisher.publish(message);
107 
108     synchronized(this) {
109       wait(1000);
110     }
111 
112     assertEquals("Message not received", 1, messagesReceived);
113   }
114 
115   public void testJMSTextMessageWithMultiWordSelector() throws Exception {
116 
117     JMSAdaptor adaptor = new JMSAdaptor();
118     adaptor.parseArgs(DATA_TYPE,
119             "vm://localhost -t test.topic -s \"foo='bar' and bar='foo'\"",
120             AdaptorManager.NULL);    
121     adaptor.start("id", DATA_TYPE, 0, this);
122 
123     Message message = session.createTextMessage(MESSAGE_PAYLOAD);
124     publisher.publish(message);
125 
126     message = session.createTextMessage(MESSAGE_PAYLOAD);
127     message.setStringProperty("foo", "bar");
128     publisher.publish(message);
129 
130     message = session.createTextMessage(MESSAGE_PAYLOAD);
131     message.setStringProperty("foo", "bar");
132     message.setStringProperty("bar", "foo");
133     publisher.publish(message);
134 
135     synchronized(this) {
136       wait(1000);
137     }
138 
139     assertEquals("Message not received", 1, messagesReceived);
140   }
141 
142   public void add(Chunk c) {
143     bytesReceived += c.getData().length;
144     assertEquals("Unexpected data length",
145             MESSAGE_PAYLOAD.length(), c.getData().length);
146     assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
147     assertEquals("Chunk sequenceId should be total bytes received.",
148             bytesReceived, c.getSeqID());
149     assertEquals("Unexpected message payload",
150             MESSAGE_PAYLOAD, new String(c.getData()));
151     messagesReceived++;
152   }
153 }