View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.net;
19  
20  import java.io.IOException;
21  import java.net.DatagramPacket;
22  import java.net.DatagramSocket;
23  import java.net.SocketException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.apache.log4j.plugins.Pauseable;
29  import org.apache.log4j.plugins.Receiver;
30  import org.apache.log4j.spi.Decoder;
31  import org.apache.log4j.spi.LoggingEvent;
32  
33  
34  /***
35   *  Receive LoggingEvents encoded with an XMLLayout, convert the XML data to a
36   *  LoggingEvent and post the LoggingEvent.
37   *
38   *  @author Scott Deboy <sdeboy@apache.org>
39   *
40   */
41  public class UDPReceiver extends Receiver implements PortBased, Pauseable {
42    private static final int PACKET_LENGTH = 16384;
43    private UDPReceiverThread receiverThread;
44    private String encoding;
45  
46    //default to log4j xml decoder
47    private String decoder = "org.apache.log4j.xml.XMLDecoder";
48    private Decoder decoderImpl;
49    protected boolean paused;
50    private transient boolean closed = false;
51    private int port;
52    private DatagramSocket socket;
53    UDPHandlerThread handlerThread;
54  
55    public int getPort() {
56      return port;
57    }
58  
59    public void setPort(int port) {
60      this.port = port;
61    }
62  
63    /***
64     * The <b>Encoding</b> option specifies how the bytes are encoded.  If this 
65     * option is not specified, the system encoding will be used.
66     * */
67    public void setEncoding(String encoding) {
68      this.encoding = encoding;
69    }
70  
71    /***
72     * Returns value of the <b>Encoding</b> option.
73     */
74    public String getEncoding() {
75      return encoding;
76    }
77  
78    public String getDecoder() {
79      return decoder;
80    }
81  
82    public void setDecoder(String decoder) {
83      this.decoder = decoder;
84    }
85  
86    public boolean isPaused() {
87      return paused;
88    }
89  
90    public void setPaused(boolean b) {
91      paused = b;
92    }
93  
94    public synchronized void shutdown() {
95      if(closed == true) {
96        return;
97      }
98      closed = true;
99      // Closing the datagram socket will unblock the UDPReceiverThread if it is
100     // was waiting to receive data from the socket.
101     if (socket != null) {
102     	socket.close();
103     }
104 
105     try {
106       if(handlerThread != null) {
107       	handlerThread.close();
108         handlerThread.join();
109       }
110       if(receiverThread != null) {
111         receiverThread.join();
112       }
113     } catch(InterruptedException ie) {
114     }
115   }
116 
117   /***
118     Returns true if this receiver is active. */
119 //  public synchronized boolean isActive() {
120 //    return isActive;
121 //}
122 
123   public void activateOptions() {
124     try {
125       Class c = Class.forName(decoder);
126       Object o = c.newInstance();
127 
128       if (o instanceof Decoder) {
129         this.decoderImpl = (Decoder) o;
130       }
131     } catch (ClassNotFoundException cnfe) {
132       getLogger().warn("Unable to find decoder", cnfe);
133     } catch (IllegalAccessException iae) {
134       getLogger().warn("Could not construct decoder", iae);
135     } catch (InstantiationException ie) {
136       getLogger().warn("Could not construct decoder", ie);
137     }
138 
139     try {
140       socket = new DatagramSocket(port);
141       receiverThread = new UDPReceiverThread();
142       receiverThread.start();
143       handlerThread = new UDPHandlerThread();
144       handlerThread.start();
145     } catch (IOException ioe) {
146       ioe.printStackTrace();
147     }
148   }
149 
150   class UDPHandlerThread extends Thread {
151     private List list = new ArrayList();
152 
153     public UDPHandlerThread() {
154       setDaemon(true);
155     }
156 
157     public void append(String data) {
158       synchronized (list) {
159         list.add(data);
160         list.notify();
161       }
162     }
163   
164     /***
165      * Allow the UDPHandlerThread to wakeup and exit gracefully.
166      */
167     void close() {
168       synchronized(list) {
169       	list.notify();
170       }
171     }
172 
173     public void run() {
174       ArrayList list2 = new ArrayList();
175 
176       while (!UDPReceiver.this.closed) {
177         synchronized (list) {
178           try {
179             while (!UDPReceiver.this.closed && list.size() == 0) {
180               list.wait();
181             }
182 
183             if (list.size() > 0) {
184               list2.addAll(list);
185               list.clear();
186             }
187           } catch (InterruptedException ie) {
188           }
189         }
190 
191         if (list2.size() > 0) {
192           Iterator iter = list2.iterator();
193 
194           while (iter.hasNext()) {
195             String data = (String) iter.next();
196             List v = decoderImpl.decodeEvents(data);
197 
198             if (v != null) {
199               Iterator eventIter = v.iterator();
200 
201               while (eventIter.hasNext()) {
202                 if (!isPaused()) {
203                   doPost((LoggingEvent) eventIter.next());
204                 }
205               }
206             }
207           }
208 
209           list2.clear();
210         } else {
211           try {
212             synchronized (this) {
213               wait(1000);
214             }
215           } catch (InterruptedException ie) {
216           }
217         }
218       } // while
219       getLogger().debug(UDPReceiver.this.getName()+ "'s handler thread is exiting");
220     } // run
221   } // UDPHandlerThread
222 
223   class UDPReceiverThread extends Thread {
224     public UDPReceiverThread() {
225       setDaemon(true);
226     }
227     
228     public void run() {
229       byte[] b = new byte[PACKET_LENGTH];
230       DatagramPacket p = new DatagramPacket(b, b.length);
231 
232       while (!UDPReceiver.this.closed) {
233         try {
234           socket.receive(p);
235           
236           //this string constructor which accepts a charset throws an exception if it is 
237           //null
238           if (encoding == null) {
239             handlerThread.append(
240               new String(p.getData(), 0, p.getLength()));
241           } else {
242             handlerThread.append(
243               new String(p.getData(), 0, p.getLength(), encoding));
244           }
245         } catch (SocketException se) {
246           //disconnected
247         } catch (IOException ioe) {
248           ioe.printStackTrace();
249         }
250       }
251 
252       //LogLog.debug(UDPReceiver.this.getName() + "'s thread is ending.");
253     }
254   }
255 }