View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.net;
19  
20  import java.io.IOException;
21  import java.net.DatagramPacket;
22  import java.net.InetAddress;
23  import java.net.MulticastSocket;
24  import java.net.SocketException;
25  import java.net.UnknownHostException;
26  import java.util.ArrayList;
27  import java.util.Iterator;
28  import java.util.List;
29  
30  import org.apache.log4j.plugins.Pauseable;
31  import org.apache.log4j.plugins.Receiver;
32  import org.apache.log4j.spi.Decoder;
33  import org.apache.log4j.spi.LoggingEvent;
34  
35  
36  /**
37   *  Multicast-based receiver.  Accepts LoggingEvents encoded using
38   *  MulticastAppender and XMLLayout. The the XML data is converted
39   *  back to a LoggingEvent and is posted.
40   *
41   *  @author Scott Deboy <sdeboy@apache.org>
42   *
43   */
44  public class MulticastReceiver extends Receiver implements PortBased,
45    AddressBased, Pauseable {
46    private static final int PACKET_LENGTH = 16384;
47    private int port;
48    private String address;
49    private String encoding;
50    private MulticastSocket socket = null;
51  
52    //default to log4j xml decoder
53    private String decoder = "org.apache.log4j.xml.XMLDecoder";
54    private Decoder decoderImpl;
55    private MulticastHandlerThread handlerThread;
56    private MulticastReceiverThread receiverThread;
57    private boolean paused;
58    private boolean advertiseViaMulticastDNS;
59    private ZeroConfSupport zeroConf;
60  
61    /**
62     * The MulticastDNS zone advertised by a MulticastReceiver
63     */
64    public static final String ZONE = "_log4j_xml_mcast_receiver.local.";
65  
66    public String getDecoder() {
67      return decoder;
68    }
69  
70    public void setDecoder(String decoder) {
71      this.decoder = decoder;
72    }
73  
74    public int getPort() {
75      return port;
76    }
77  
78    public void setPort(int port) {
79      this.port = port;
80    }
81  
82    public String getAddress() {
83      return address;
84    }
85  
86    /**
87        The <b>Encoding</b> option specifies how the bytes are encoded.  If this option is not specified,
88        the system encoding will be used.
89      */
90    public void setEncoding(String encoding) {
91      this.encoding = encoding;
92    }
93  
94    /**
95       Returns value of the <b>Encoding</b> option.
96     */
97    public String getEncoding() {
98      return encoding;
99    }
100 
101   public synchronized void shutdown() {
102     active = false;
103     if (advertiseViaMulticastDNS) {
104         zeroConf.unadvertise();
105     }
106     if (handlerThread != null) {
107         handlerThread.interrupt();
108     }
109     if (receiverThread != null) {
110         receiverThread.interrupt();
111     }
112     if (socket != null) {
113         socket.close();
114     }
115   }
116 
117   public void setAddress(String address) {
118     this.address = address;
119   }
120 
121   public boolean isPaused() {
122     return paused;
123   }
124 
125   public void setPaused(boolean b) {
126     paused = b;
127   }
128 
129   public void activateOptions() {
130     InetAddress addr = null;
131 
132     try {
133       Class c = Class.forName(decoder);
134       Object o = c.newInstance();
135 
136       if (o instanceof Decoder) {
137         this.decoderImpl = (Decoder) o;
138       }
139     } catch (ClassNotFoundException cnfe) {
140       getLogger().warn("Unable to find decoder", cnfe);
141     } catch (IllegalAccessException iae) {
142       getLogger().warn("Could not construct decoder", iae);
143     } catch (InstantiationException ie) {
144       getLogger().warn("Could not construct decoder", ie);
145     }
146 
147     try {
148       addr = InetAddress.getByName(address);
149     } catch (UnknownHostException uhe) {
150       uhe.printStackTrace();
151     }
152 
153     try {
154       active = true;
155       socket = new MulticastSocket(port);
156       socket.joinGroup(addr);
157       receiverThread = new MulticastReceiverThread();
158       receiverThread.start();
159       handlerThread = new MulticastHandlerThread();
160       handlerThread.start();
161       if (advertiseViaMulticastDNS) {
162         zeroConf = new ZeroConfSupport(ZONE, port, getName());
163         zeroConf.advertise();
164       }
165 
166     } catch (IOException ioe) {
167       ioe.printStackTrace();
168     }
169   }
170 
171     public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
172         this.advertiseViaMulticastDNS = advertiseViaMulticastDNS;
173     }
174 
175     public boolean isAdvertiseViaMulticastDNS() {
176         return advertiseViaMulticastDNS;
177     }
178 
179     class MulticastHandlerThread extends Thread {
180     private List list = new ArrayList();
181 
182     public MulticastHandlerThread() {
183       setDaemon(true);
184     }
185 
186     public void append(String data) {
187       synchronized (list) {
188         list.add(data);
189         list.notify();
190       }
191     }
192 
193     public void run() {
194       ArrayList list2 = new ArrayList();
195 
196       while (isAlive()) {
197         synchronized (list) {
198           try {
199             while (list.size() == 0) {
200               list.wait();
201             }
202 
203             if (list.size() > 0) {
204               list2.addAll(list);
205               list.clear();
206             }
207           } catch (InterruptedException ie) {
208           }
209         }
210 
211         if (list2.size() > 0) {
212           Iterator iter = list2.iterator();
213 
214           while (iter.hasNext()) {
215             String data = (String) iter.next();
216             List v = decoderImpl.decodeEvents(data.trim());
217 
218             if (v != null) {
219               Iterator eventIter = v.iterator();
220 
221               while (eventIter.hasNext()) {
222                 if (!isPaused()) {
223                   doPost((LoggingEvent) eventIter.next());
224                 }
225               }
226             }
227           }
228 
229           list2.clear();
230         } else {
231           try {
232             synchronized (this) {
233               wait(1000);
234             }
235           } catch (InterruptedException ie) {
236           }
237         }
238       }
239     }
240   }
241 
242   class MulticastReceiverThread extends Thread {
243     public MulticastReceiverThread() {
244       setDaemon(true);
245     }
246 
247     public void run() {
248       active = true;
249 
250       byte[] b = new byte[PACKET_LENGTH];
251       DatagramPacket p = new DatagramPacket(b, b.length);
252 
253       while (active) {
254         try {
255           socket.receive(p);
256 
257           //this string constructor which accepts a charset throws an exception if it is 
258           //null
259             if (encoding == null) {
260             handlerThread.append(
261               new String(p.getData(), 0, p.getLength()));
262           } else {
263             handlerThread.append(
264               new String(p.getData(), 0, p.getLength(), encoding));
265           }
266         } catch (SocketException se) {
267           //disconnected
268         } catch (IOException ioe) {
269           ioe.printStackTrace();
270         }
271       }
272 
273       getLogger().debug("{}'s thread is ending.", MulticastReceiver.this.getName());
274     }
275   }
276 }