1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.log4j.net;
19
20 import java.io.IOException;
21 import java.net.DatagramPacket;
22 import java.net.DatagramSocket;
23 import java.net.SocketException;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27
28 import org.apache.log4j.plugins.Pauseable;
29 import org.apache.log4j.plugins.Receiver;
30 import org.apache.log4j.spi.Decoder;
31 import org.apache.log4j.spi.LoggingEvent;
32
33
34 /***
35 * Receive LoggingEvents encoded with an XMLLayout, convert the XML data to a
36 * LoggingEvent and post the LoggingEvent.
37 *
38 * @author Scott Deboy <sdeboy@apache.org>
39 *
40 */
41 public class UDPReceiver extends Receiver implements PortBased, Pauseable {
42 private static final int PACKET_LENGTH = 16384;
43 private UDPReceiverThread receiverThread;
44 private String encoding;
45
46
47 private String decoder = "org.apache.log4j.xml.XMLDecoder";
48 private Decoder decoderImpl;
49 protected boolean paused;
50 private transient boolean closed = false;
51 private int port;
52 private DatagramSocket socket;
53 UDPHandlerThread handlerThread;
54
55 public int getPort() {
56 return port;
57 }
58
59 public void setPort(int port) {
60 this.port = port;
61 }
62
63 /***
64 * The <b>Encoding</b> option specifies how the bytes are encoded. If this
65 * option is not specified, the system encoding will be used.
66 * */
67 public void setEncoding(String encoding) {
68 this.encoding = encoding;
69 }
70
71 /***
72 * Returns value of the <b>Encoding</b> option.
73 */
74 public String getEncoding() {
75 return encoding;
76 }
77
78 public String getDecoder() {
79 return decoder;
80 }
81
82 public void setDecoder(String decoder) {
83 this.decoder = decoder;
84 }
85
86 public boolean isPaused() {
87 return paused;
88 }
89
90 public void setPaused(boolean b) {
91 paused = b;
92 }
93
94 public synchronized void shutdown() {
95 if(closed == true) {
96 return;
97 }
98 closed = true;
99
100
101 if (socket != null) {
102 socket.close();
103 }
104
105 try {
106 if(handlerThread != null) {
107 handlerThread.close();
108 handlerThread.join();
109 }
110 if(receiverThread != null) {
111 receiverThread.join();
112 }
113 } catch(InterruptedException ie) {
114 }
115 }
116
117 /***
118 Returns true if this receiver is active. */
119
120
121
122
123 public void activateOptions() {
124 try {
125 Class c = Class.forName(decoder);
126 Object o = c.newInstance();
127
128 if (o instanceof Decoder) {
129 this.decoderImpl = (Decoder) o;
130 }
131 } catch (ClassNotFoundException cnfe) {
132 getLogger().warn("Unable to find decoder", cnfe);
133 } catch (IllegalAccessException iae) {
134 getLogger().warn("Could not construct decoder", iae);
135 } catch (InstantiationException ie) {
136 getLogger().warn("Could not construct decoder", ie);
137 }
138
139 try {
140 socket = new DatagramSocket(port);
141 receiverThread = new UDPReceiverThread();
142 receiverThread.start();
143 handlerThread = new UDPHandlerThread();
144 handlerThread.start();
145 } catch (IOException ioe) {
146 ioe.printStackTrace();
147 }
148 }
149
150 class UDPHandlerThread extends Thread {
151 private List list = new ArrayList();
152
153 public UDPHandlerThread() {
154 setDaemon(true);
155 }
156
157 public void append(String data) {
158 synchronized (list) {
159 list.add(data);
160 list.notify();
161 }
162 }
163
164 /***
165 * Allow the UDPHandlerThread to wakeup and exit gracefully.
166 */
167 void close() {
168 synchronized(list) {
169 list.notify();
170 }
171 }
172
173 public void run() {
174 ArrayList list2 = new ArrayList();
175
176 while (!UDPReceiver.this.closed) {
177 synchronized (list) {
178 try {
179 while (!UDPReceiver.this.closed && list.size() == 0) {
180 list.wait();
181 }
182
183 if (list.size() > 0) {
184 list2.addAll(list);
185 list.clear();
186 }
187 } catch (InterruptedException ie) {
188 }
189 }
190
191 if (list2.size() > 0) {
192 Iterator iter = list2.iterator();
193
194 while (iter.hasNext()) {
195 String data = (String) iter.next();
196 List v = decoderImpl.decodeEvents(data);
197
198 if (v != null) {
199 Iterator eventIter = v.iterator();
200
201 while (eventIter.hasNext()) {
202 if (!isPaused()) {
203 doPost((LoggingEvent) eventIter.next());
204 }
205 }
206 }
207 }
208
209 list2.clear();
210 } else {
211 try {
212 synchronized (this) {
213 wait(1000);
214 }
215 } catch (InterruptedException ie) {
216 }
217 }
218 }
219 getLogger().debug(UDPReceiver.this.getName()+ "'s handler thread is exiting");
220 }
221 }
222
223 class UDPReceiverThread extends Thread {
224 public UDPReceiverThread() {
225 setDaemon(true);
226 }
227
228 public void run() {
229 byte[] b = new byte[PACKET_LENGTH];
230 DatagramPacket p = new DatagramPacket(b, b.length);
231
232 while (!UDPReceiver.this.closed) {
233 try {
234 socket.receive(p);
235
236
237
238 if (encoding == null) {
239 handlerThread.append(
240 new String(p.getData(), 0, p.getLength()));
241 } else {
242 handlerThread.append(
243 new String(p.getData(), 0, p.getLength(), encoding));
244 }
245 } catch (SocketException se) {
246
247 } catch (IOException ioe) {
248 ioe.printStackTrace();
249 }
250 }
251
252
253 }
254 }
255 }