1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.net;
19
20 import org.apache.log4j.plugins.Pauseable;
21 import org.apache.log4j.plugins.Receiver;
22 import org.apache.log4j.spi.Decoder;
23 import org.apache.log4j.spi.LoggingEvent;
24
25 import java.io.IOException;
26
27 import java.net.DatagramPacket;
28 import java.net.InetAddress;
29 import java.net.MulticastSocket;
30 import java.net.SocketException;
31 import java.net.UnknownHostException;
32
33 import java.util.ArrayList;
34 import java.util.Iterator;
35 import java.util.List;
36
37
38 /***
39 * Multicast-based receiver. Accepts LoggingEvents encoded using
40 * MulticastAppender and XMLLayout. The the XML data is converted
41 * back to a LoggingEvent and is posted.
42 *
43 * @author Scott Deboy <sdeboy@apache.org>
44 *
45 */
46 public class MulticastReceiver extends Receiver implements PortBased,
47 AddressBased, Pauseable {
48 private static final int PACKET_LENGTH = 16384;
49 private boolean isActive = false;
50 private int port;
51 private String address;
52 private String encoding;
53 private MulticastSocket socket = null;
54
55
56 private String decoder = "org.apache.log4j.xml.XMLDecoder";
57 private Decoder decoderImpl;
58 private MulticastHandlerThread handlerThread;
59 private MulticastReceiverThread receiverThread;
60 private boolean paused;
61
62 public String getDecoder() {
63 return decoder;
64 }
65
66 public void setDecoder(String decoder) {
67 this.decoder = decoder;
68 }
69
70 public int getPort() {
71 return port;
72 }
73
74 public void setPort(int port) {
75 this.port = port;
76 }
77
78 public String getAddress() {
79 return address;
80 }
81
82 /***
83 The <b>Encoding</b> option specifies how the bytes are encoded. If this option is not specified,
84 the system encoding will be used.
85 */
86 public void setEncoding(String encoding) {
87 this.encoding = encoding;
88 }
89
90 /***
91 Returns value of the <b>Encoding</b> option.
92 */
93 public String getEncoding() {
94 return encoding;
95 }
96
97 public synchronized void shutdown() {
98 isActive = false;
99 handlerThread.interrupt();
100 receiverThread.interrupt();
101 socket.close();
102 }
103
104 public void setAddress(String address) {
105 this.address = address;
106 }
107
108 public boolean isPaused() {
109 return paused;
110 }
111
112 public void setPaused(boolean b) {
113 paused = b;
114 }
115
116 /***
117 Returns true if this receiver is active. */
118 public synchronized boolean isActive() {
119 return isActive;
120 }
121
122 public void activateOptions() {
123 InetAddress addr = null;
124
125 try {
126 Class c = Class.forName(decoder);
127 Object o = c.newInstance();
128
129 if (o instanceof Decoder) {
130 this.decoderImpl = (Decoder) o;
131 }
132 } catch (ClassNotFoundException cnfe) {
133 getLogger().warn("Unable to find decoder", cnfe);
134 } catch (IllegalAccessException iae) {
135 getLogger().warn("Could not construct decoder", iae);
136 } catch (InstantiationException ie) {
137 getLogger().warn("Could not construct decoder", ie);
138 }
139
140 try {
141 addr = InetAddress.getByName(address);
142 } catch (UnknownHostException uhe) {
143 uhe.printStackTrace();
144 }
145
146 try {
147 isActive = true;
148 socket = new MulticastSocket(port);
149 socket.joinGroup(addr);
150 receiverThread = new MulticastReceiverThread();
151 receiverThread.start();
152 handlerThread = new MulticastHandlerThread();
153 handlerThread.start();
154 } catch (IOException ioe) {
155 ioe.printStackTrace();
156 }
157 }
158
159 class MulticastHandlerThread extends Thread {
160 private List list = new ArrayList();
161
162 public MulticastHandlerThread() {
163 setDaemon(true);
164 }
165
166 public void append(String data) {
167 synchronized (list) {
168 list.add(data);
169 list.notify();
170 }
171 }
172
173 public void run() {
174 ArrayList list2 = new ArrayList();
175
176 while (isAlive()) {
177 synchronized (list) {
178 try {
179 while (list.size() == 0) {
180 list.wait();
181 }
182
183 if (list.size() > 0) {
184 list2.addAll(list);
185 list.clear();
186 }
187 } catch (InterruptedException ie) {
188 }
189 }
190
191 if (list2.size() > 0) {
192 Iterator iter = list2.iterator();
193
194 while (iter.hasNext()) {
195 String data = (String) iter.next();
196 List v = decoderImpl.decodeEvents(data.trim());
197
198 if (v != null) {
199 Iterator eventIter = v.iterator();
200
201 while (eventIter.hasNext()) {
202 if (!isPaused()) {
203 doPost((LoggingEvent) eventIter.next());
204 }
205 }
206 }
207 }
208
209 list2.clear();
210 } else {
211 try {
212 synchronized (this) {
213 wait(1000);
214 }
215 } catch (InterruptedException ie) {
216 }
217 }
218 }
219 }
220 }
221
222 class MulticastReceiverThread extends Thread {
223 public MulticastReceiverThread() {
224 setDaemon(true);
225 }
226
227 public void run() {
228 isActive = true;
229
230 byte[] b = new byte[PACKET_LENGTH];
231 DatagramPacket p = new DatagramPacket(b, b.length);
232
233 while (isActive) {
234 try {
235 socket.receive(p);
236
237
238
239 if (encoding == null) {
240 handlerThread.append(
241 new String(p.getData(), 0, p.getLength()));
242 } else {
243 handlerThread.append(
244 new String(p.getData(), 0, p.getLength(), encoding));
245 }
246 } catch (SocketException se) {
247
248 } catch (IOException ioe) {
249 ioe.printStackTrace();
250 }
251 }
252
253 getLogger().debug("{}'s thread is ending.", MulticastReceiver.this.getName());
254 }
255 }
256 }