View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    * 
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.log4j.net;
19  
20  import org.apache.log4j.plugins.Pauseable;
21  import org.apache.log4j.plugins.Receiver;
22  import org.apache.log4j.spi.Decoder;
23  import org.apache.log4j.spi.LoggingEvent;
24  
25  import java.io.IOException;
26  
27  import java.net.DatagramPacket;
28  import java.net.InetAddress;
29  import java.net.MulticastSocket;
30  import java.net.SocketException;
31  import java.net.UnknownHostException;
32  
33  import java.util.ArrayList;
34  import java.util.Iterator;
35  import java.util.List;
36  
37  
38  /***
39   *  Multicast-based receiver.  Accepts LoggingEvents encoded using
40   *  MulticastAppender and XMLLayout. The the XML data is converted
41   *  back to a LoggingEvent and is posted.
42   *
43   *  @author Scott Deboy <sdeboy@apache.org>
44   *
45   */
46  public class MulticastReceiver extends Receiver implements PortBased,
47    AddressBased, Pauseable {
48    private static final int PACKET_LENGTH = 16384;
49    private boolean isActive = false;
50    private int port;
51    private String address;
52    private String encoding;
53    private MulticastSocket socket = null;
54  
55    //default to log4j xml decoder
56    private String decoder = "org.apache.log4j.xml.XMLDecoder";
57    private Decoder decoderImpl;
58    private MulticastHandlerThread handlerThread;
59    private MulticastReceiverThread receiverThread;
60    private boolean paused;
61  
62    public String getDecoder() {
63      return decoder;
64    }
65  
66    public void setDecoder(String decoder) {
67      this.decoder = decoder;
68    }
69  
70    public int getPort() {
71      return port;
72    }
73  
74    public void setPort(int port) {
75      this.port = port;
76    }
77  
78    public String getAddress() {
79      return address;
80    }
81  
82    /***
83        The <b>Encoding</b> option specifies how the bytes are encoded.  If this option is not specified,
84        the system encoding will be used.
85      */
86    public void setEncoding(String encoding) {
87      this.encoding = encoding;
88    }
89  
90    /***
91       Returns value of the <b>Encoding</b> option.
92     */
93    public String getEncoding() {
94      return encoding;
95    }
96  
97    public synchronized void shutdown() {
98      isActive = false;
99      handlerThread.interrupt();
100     receiverThread.interrupt();
101     socket.close();
102   }
103 
104   public void setAddress(String address) {
105     this.address = address;
106   }
107 
108   public boolean isPaused() {
109     return paused;
110   }
111 
112   public void setPaused(boolean b) {
113     paused = b;
114   }
115 
116   /***
117     Returns true if this receiver is active. */
118   public synchronized boolean isActive() {
119     return isActive;
120   }
121 
122   public void activateOptions() {
123     InetAddress addr = null;
124 
125     try {
126       Class c = Class.forName(decoder);
127       Object o = c.newInstance();
128 
129       if (o instanceof Decoder) {
130         this.decoderImpl = (Decoder) o;
131       }
132     } catch (ClassNotFoundException cnfe) {
133       getLogger().warn("Unable to find decoder", cnfe);
134     } catch (IllegalAccessException iae) {
135       getLogger().warn("Could not construct decoder", iae);
136     } catch (InstantiationException ie) {
137       getLogger().warn("Could not construct decoder", ie);
138     }
139 
140     try {
141       addr = InetAddress.getByName(address);
142     } catch (UnknownHostException uhe) {
143       uhe.printStackTrace();
144     }
145 
146     try {
147       isActive = true;
148       socket = new MulticastSocket(port);
149       socket.joinGroup(addr);
150       receiverThread = new MulticastReceiverThread();
151       receiverThread.start();
152       handlerThread = new MulticastHandlerThread();
153       handlerThread.start();
154     } catch (IOException ioe) {
155       ioe.printStackTrace();
156     }
157   }
158 
159   class MulticastHandlerThread extends Thread {
160     private List list = new ArrayList();
161 
162     public MulticastHandlerThread() {
163       setDaemon(true);
164     }
165 
166     public void append(String data) {
167       synchronized (list) {
168         list.add(data);
169         list.notify();
170       }
171     }
172 
173     public void run() {
174       ArrayList list2 = new ArrayList();
175 
176       while (isAlive()) {
177         synchronized (list) {
178           try {
179             while (list.size() == 0) {
180               list.wait();
181             }
182 
183             if (list.size() > 0) {
184               list2.addAll(list);
185               list.clear();
186             }
187           } catch (InterruptedException ie) {
188           }
189         }
190 
191         if (list2.size() > 0) {
192           Iterator iter = list2.iterator();
193 
194           while (iter.hasNext()) {
195             String data = (String) iter.next();
196             List v = decoderImpl.decodeEvents(data.trim());
197 
198             if (v != null) {
199               Iterator eventIter = v.iterator();
200 
201               while (eventIter.hasNext()) {
202                 if (!isPaused()) {
203                   doPost((LoggingEvent) eventIter.next());
204                 }
205               }
206             }
207           }
208 
209           list2.clear();
210         } else {
211           try {
212             synchronized (this) {
213               wait(1000);
214             }
215           } catch (InterruptedException ie) {
216           }
217         }
218       }
219     }
220   }
221 
222   class MulticastReceiverThread extends Thread {
223     public MulticastReceiverThread() {
224       setDaemon(true);
225     }
226 
227     public void run() {
228       isActive = true;
229 
230       byte[] b = new byte[PACKET_LENGTH];
231       DatagramPacket p = new DatagramPacket(b, b.length);
232 
233       while (isActive) {
234         try {
235           socket.receive(p);
236 
237           //this string constructor which accepts a charset throws an exception if it is 
238           //null
239             if (encoding == null) {
240             handlerThread.append(
241               new String(p.getData(), 0, p.getLength()));
242           } else {
243             handlerThread.append(
244               new String(p.getData(), 0, p.getLength(), encoding));
245           }
246         } catch (SocketException se) {
247           //disconnected
248         } catch (IOException ioe) {
249           ioe.printStackTrace();
250         }
251       }
252 
253       getLogger().debug("{}'s thread is ending.", MulticastReceiver.this.getName());
254     }
255   }
256 }