1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
19
20 import junit.framework.TestCase;
21 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
22 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
23 import org.apache.hadoop.chukwa.Chunk;
24
25 import org.apache.activemq.ActiveMQConnection;
26
27 import javax.jms.Message;
28 import javax.jms.TopicConnection;
29 import javax.jms.TopicSession;
30 import javax.jms.Session;
31 import javax.jms.Topic;
32 import javax.jms.TopicPublisher;
33 import java.util.ArrayList;
34
35
36
37
38 public class TestJMSMessagePropertyTransformer extends TestCase implements ChunkReceiver {
39 String DATA_TYPE = "Test";
40 String MESSAGE_PAYLOAD = "Some JMS message payload";
41
42 TopicConnection connection = null;
43 TopicSession session = null;
44 TopicPublisher publisher = null;
45 ArrayList<String> chunkPayloads;
46 int bytesReceived = 0;
47
48 protected void setUp() throws Exception {
49 connection = ActiveMQConnection.makeConnection("vm://localhost");
50 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
51 Topic topic = session.createTopic("test.topic");
52 publisher = session.createPublisher(topic);
53 chunkPayloads = new ArrayList<String>();
54 bytesReceived = 0;
55 }
56
57 protected void tearDown() throws Exception {
58 session.close();
59 connection.close();
60 }
61
62 public void testJMSMessageProperties() throws Exception {
63
64 JMSAdaptor adaptor = new JMSAdaptor();
65 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
66 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
67 "-p \"foo,bar,num\"",
68 AdaptorManager.NULL);
69 adaptor.start("id", DATA_TYPE, 0, this);
70
71 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
72 message.setStringProperty("bar", "bar_value");
73 message.setStringProperty("bat", "bat_value");
74 message.setStringProperty("foo", "foo_value");
75 message.setIntProperty("num", 1);
76 publisher.publish(message);
77
78 synchronized(this) {
79 wait(1000);
80 }
81 assertEquals("Message not received", 1, chunkPayloads.size());
82 assertEquals("Incorrect chunk payload found",
83 "foo_value\tbar_value\t1", chunkPayloads.get(0));
84 }
85
86 public void testJMSMessagePropertiesNoQuotes() throws Exception {
87
88 JMSAdaptor adaptor = new JMSAdaptor();
89 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
90 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
91 "-p foo,bar,num",
92 AdaptorManager.NULL);
93 adaptor.start("id", DATA_TYPE, 0, this);
94
95 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
96 message.setStringProperty("bar", "bar_value");
97 message.setStringProperty("bat", "bat_value");
98 message.setStringProperty("foo", "foo_value");
99 message.setIntProperty("num", 1);
100 publisher.publish(message);
101
102 synchronized(this) {
103 wait(1000);
104 }
105 assertEquals("Message not received", 1, chunkPayloads.size());
106 assertEquals("Incorrect chunk payload found",
107 "foo_value\tbar_value\t1", chunkPayloads.get(0));
108 }
109
110 public void testJMSMessagePropertiesWithDelimiter() throws Exception {
111
112 JMSAdaptor adaptor = new JMSAdaptor();
113 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
114 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
115 "-p \"foo,bar,num -d ' '\"",
116 AdaptorManager.NULL);
117 adaptor.start("id", DATA_TYPE, 0, this);
118
119 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
120 message.setStringProperty("bar", "bar_value");
121 message.setStringProperty("bat", "bat_value");
122 message.setStringProperty("foo", "foo_value");
123 message.setIntProperty("num", 1);
124 publisher.publish(message);
125
126 synchronized(this) {
127 wait(1000);
128 }
129 assertEquals("Message not received", 1, chunkPayloads.size());
130 assertEquals("Incorrect chunk payload found", "foo_value bar_value 1", chunkPayloads.get(0));
131 }
132
133 public void testJMSMessagePropertiesWithNoQuotesDelimiter() throws Exception {
134
135 JMSAdaptor adaptor = new JMSAdaptor();
136 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
137 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
138 "-p \"foo,bar,num -d ^^^\"",
139 AdaptorManager.NULL);
140 adaptor.start("id", DATA_TYPE, 0, this);
141
142 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
143 message.setStringProperty("bar", "bar_value");
144 message.setStringProperty("bat", "bat_value");
145 message.setStringProperty("foo", "foo_value");
146 message.setIntProperty("num", 1);
147 publisher.publish(message);
148
149 synchronized(this) {
150 wait(1000);
151 }
152 assertEquals("Message not received", 1, chunkPayloads.size());
153 assertEquals("Incorrect chunk payload found", "foo_value^^^bar_value^^^1", chunkPayloads.get(0));
154 }
155
156 public void testJMSMessagePropertiesWithMultiWordDelimiter() throws Exception {
157
158 JMSAdaptor adaptor = new JMSAdaptor();
159 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
160 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
161 "-p \"foo,bar,num -d '[ insert between values ]'\"",
162 AdaptorManager.NULL);
163 adaptor.start("id", DATA_TYPE, 0, this);
164
165 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
166 message.setStringProperty("bar", "bar_value");
167 message.setStringProperty("bat", "bat_value");
168 message.setStringProperty("foo", "foo_value");
169 message.setIntProperty("num", 1);
170 publisher.publish(message);
171
172 synchronized(this) {
173 wait(1000);
174 }
175 assertEquals("Message not received", 1, chunkPayloads.size());
176 assertEquals("Incorrect chunk payload found",
177 "foo_value[ insert between values ]bar_value[ insert between values ]1",
178 chunkPayloads.get(0));
179 }
180
181 public void testJMSPropMissingWithAllRequired() throws Exception {
182
183 JMSAdaptor adaptor = new JMSAdaptor();
184 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
185 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
186 "-p \"foo,bar,num\"",
187 AdaptorManager.NULL);
188 adaptor.start("id", DATA_TYPE, 0, this);
189
190 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
191 message.setStringProperty("bar", "bar_value");
192 message.setStringProperty("bat", "bat_value");
193 message.setIntProperty("num", 1);
194 publisher.publish(message);
195
196 synchronized(this) {
197 wait(1000);
198 }
199 assertEquals("Message should not have been received", 0, chunkPayloads.size());
200 }
201
202 public void testJMSPropMissingWithSomeRequired() throws Exception {
203
204 JMSAdaptor adaptor = new JMSAdaptor();
205 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
206 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
207 "-p \"foo,bar,num -r foo\"",
208 AdaptorManager.NULL);
209 adaptor.start("id", DATA_TYPE, 0, this);
210
211 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
212 message.setStringProperty("bar", "bar_value");
213 message.setStringProperty("bat", "bat_value");
214 message.setIntProperty("num", 1);
215 publisher.publish(message);
216
217 synchronized(this) {
218 wait(1000);
219 }
220 assertEquals("Message should not have been received", 0, chunkPayloads.size());
221 }
222
223 public void testJMSPropMissingWithSomeRequired2() throws Exception {
224
225 JMSAdaptor adaptor = new JMSAdaptor();
226 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
227 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
228 "-p \"foo,bar,num -r foo\"",
229 AdaptorManager.NULL);
230 adaptor.start("id", DATA_TYPE, 0, this);
231
232 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
233 message.setStringProperty("foo", "foo_value");
234 message.setStringProperty("bat", "bat_value");
235 message.setIntProperty("num", 1);
236 publisher.publish(message);
237
238 synchronized(this) {
239 wait(1000);
240 }
241 assertEquals("Message not received", 1, chunkPayloads.size());
242 assertEquals("Incorrect chunk payload found", "foo_value\t\t1", chunkPayloads.get(0));
243 }
244
245 public void testJMSPropfoundWithSomeRequired() throws Exception {
246
247 JMSAdaptor adaptor = new JMSAdaptor();
248 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic " +
249 "-x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSMessagePropertyTransformer " +
250 "-p \"foo,bar,num -r foo\"",
251 AdaptorManager.NULL);
252 adaptor.start("id", DATA_TYPE, 0, this);
253
254 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
255 message.setStringProperty("bar", "bar_value");
256 message.setStringProperty("bat", "bat_value");
257 message.setStringProperty("foo", "foo_value");
258 message.setIntProperty("num", 1);
259 publisher.publish(message);
260
261 synchronized(this) {
262 wait(1000);
263 }
264 assertEquals("Message not received", 1, chunkPayloads.size());
265 assertEquals("Incorrect chunk payload found", "foo_value\tbar_value\t1", chunkPayloads.get(0));
266 }
267
268
269
270 public void add(Chunk c) {
271 bytesReceived += c.getData().length;
272 assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
273 assertEquals("Chunk sequenceId should be total bytes received.",
274 bytesReceived, c.getSeqID());
275 chunkPayloads.add(new String(c.getData()));
276 }
277 }