1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.chukwa.datacollection.adaptor.jms;
19
20 import junit.framework.TestCase;
21 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
22 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
23 import org.apache.hadoop.chukwa.Chunk;
24
25 import org.apache.activemq.ActiveMQConnection;
26
27 import javax.jms.Message;
28 import javax.jms.TopicConnection;
29 import javax.jms.TopicSession;
30 import javax.jms.Session;
31 import javax.jms.Topic;
32 import javax.jms.TopicPublisher;
33
34
35
36
37 public class TestJMSAdaptor extends TestCase implements ChunkReceiver {
38 String DATA_TYPE = "Test";
39 String MESSAGE_PAYLOAD = "Some JMS message payload";
40
41 TopicConnection connection = null;
42 TopicSession session = null;
43 TopicPublisher publisher = null;
44 int bytesReceived = 0;
45 int messagesReceived = 0;
46
47 protected void setUp() throws Exception {
48 connection = ActiveMQConnection.makeConnection("vm://localhost");
49 session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
50 Topic topic = session.createTopic("test.topic");
51 publisher = session.createPublisher(topic);
52 messagesReceived = 0;
53 bytesReceived = 0;
54 }
55
56 protected void tearDown() throws Exception {
57 session.close();
58 connection.close();
59 }
60
61 public void testJMSTextMessage() throws Exception {
62
63 JMSAdaptor adaptor = new JMSAdaptor();
64 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic",
65 AdaptorManager.NULL);
66 adaptor.start("id", DATA_TYPE, 0, this);
67
68 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
69 publisher.publish(message);
70
71 synchronized(this) {
72 wait(1000);
73 }
74 assertEquals("Message not received", 1, messagesReceived);
75 }
76
77 public void testJMSTextMessageWithTransformer() throws Exception {
78
79 JMSAdaptor adaptor = new JMSAdaptor();
80 adaptor.parseArgs(DATA_TYPE, "vm://localhost -t test.topic -x org.apache.hadoop.chukwa.datacollection.adaptor.jms.JMSTextMessageTransformer",
81 AdaptorManager.NULL);
82 adaptor.start("id", DATA_TYPE, 0, this);
83
84 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
85 publisher.publish(message);
86
87 synchronized(this) {
88 wait(1000);
89 }
90 assertEquals("Message not received", 1, messagesReceived);
91 }
92
93 public void testJMSTextMessageWithSelector() throws Exception {
94
95 JMSAdaptor adaptor = new JMSAdaptor();
96 adaptor.parseArgs(DATA_TYPE,
97 "vm://localhost -t test.topic -s \"foo='bar'\"",
98 AdaptorManager.NULL);
99 adaptor.start("id", DATA_TYPE, 0, this);
100
101 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
102 publisher.publish(message);
103
104 message = session.createTextMessage(MESSAGE_PAYLOAD);
105 message.setStringProperty("foo", "bar");
106 publisher.publish(message);
107
108 synchronized(this) {
109 wait(1000);
110 }
111
112 assertEquals("Message not received", 1, messagesReceived);
113 }
114
115 public void testJMSTextMessageWithMultiWordSelector() throws Exception {
116
117 JMSAdaptor adaptor = new JMSAdaptor();
118 adaptor.parseArgs(DATA_TYPE,
119 "vm://localhost -t test.topic -s \"foo='bar' and bar='foo'\"",
120 AdaptorManager.NULL);
121 adaptor.start("id", DATA_TYPE, 0, this);
122
123 Message message = session.createTextMessage(MESSAGE_PAYLOAD);
124 publisher.publish(message);
125
126 message = session.createTextMessage(MESSAGE_PAYLOAD);
127 message.setStringProperty("foo", "bar");
128 publisher.publish(message);
129
130 message = session.createTextMessage(MESSAGE_PAYLOAD);
131 message.setStringProperty("foo", "bar");
132 message.setStringProperty("bar", "foo");
133 publisher.publish(message);
134
135 synchronized(this) {
136 wait(1000);
137 }
138
139 assertEquals("Message not received", 1, messagesReceived);
140 }
141
142 public void add(Chunk c) {
143 bytesReceived += c.getData().length;
144 assertEquals("Unexpected data length",
145 MESSAGE_PAYLOAD.length(), c.getData().length);
146 assertEquals("Unexpected data type", DATA_TYPE, c.getDataType());
147 assertEquals("Chunk sequenceId should be total bytes received.",
148 bytesReceived, c.getSeqID());
149 assertEquals("Unexpected message payload",
150 MESSAGE_PAYLOAD, new String(c.getData()));
151 messagesReceived++;
152 }
153 }