View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.SourceRunner;
20  import org.apache.flume.lifecycle.LifecycleController;
21  import org.apache.flume.lifecycle.LifecycleState;
22  import org.apache.flume.node.NodeConfiguration;
23  import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
24  import org.apache.logging.log4j.core.appender.ManagerFactory;
25  import org.apache.logging.log4j.core.config.ConfigurationException;
26  import org.apache.logging.log4j.core.config.Property;
27  
28  import java.security.MessageDigest;
29  import java.util.Locale;
30  import java.util.Properties;
31  
32  /**
33   *
34   */
35  public class FlumeEmbeddedManager extends AbstractFlumeManager {
36  
37      private static ManagerFactory factory = new FlumeManagerFactory();
38  
39      private final FlumeNode node;
40  
41      private NodeConfiguration conf;
42  
43      protected static final String SOURCE_NAME = "log4j-source";
44  
45      private static final String LINE_SEP = System.getProperty("file.separator");
46  
47      private final Log4jEventSource source;
48  
49      private final String shortName;
50  
51  
52      /**
53       * Constructor
54       * @param name The unique name of this manager.
55       * @param node The Flume Node.
56       */
57      protected FlumeEmbeddedManager(String name, String shortName, FlumeNode node) {
58          super(name);
59          this.node = node;
60          this.shortName = shortName;
61          SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
62          if (runner == null || runner.getSource() == null) {
63              throw new IllegalStateException("No Source has been created for Appender " + shortName);
64          }
65          source  = (Log4jEventSource) runner.getSource();
66      }
67  
68      /**
69       * Returns a FlumeEmbeddedManager.
70       * @param agents The agents to use.
71       * @param batchSize The number of events to include in a batch.
72       * @return A FlumeAvroManager.
73       */
74      public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize,
75                                                    String dataDir) {
76  
77          if (batchSize <= 0) {
78              batchSize = 1;
79          }
80  
81          if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
82              throw new IllegalArgumentException("Either an Agent or properties are required");
83          } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
84              throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
85          }
86  
87          StringBuilder sb = new StringBuilder();
88          boolean first = true;
89  
90          if (agents != null && agents.length > 0) {
91              sb.append("FlumeEmbedded[");
92              for (Agent agent : agents) {
93                  if (!first) {
94                      sb.append(",");
95                  }
96                  sb.append(agent.getHost()).append(":").append(agent.getPort());
97                  first = false;
98              }
99              sb.append("]");
100         } else {
101             String sep = "";
102             sb.append(name).append(":");
103             StringBuilder props = new StringBuilder();
104             for (Property prop : properties) {
105                 props.append(sep);
106                 props.append(prop.getName()).append("=").append(prop.getValue());
107                 sep = ",";
108             }
109             try {
110                 MessageDigest digest = MessageDigest.getInstance("MD5");
111                 digest.update(sb.toString().getBytes());
112                 byte[] bytes = digest.digest();
113                 StringBuilder md5 = new StringBuilder();
114                 for (byte b : bytes) {
115                     String hex = Integer.toHexString(0xff & b);
116                     if (hex.length() == 1) {
117                         md5.append('0');
118                     }
119                     md5.append(hex);
120                 }
121                 sb.append(md5.toString());
122             } catch (Exception ex) {
123                 sb.append(props);
124             }
125         }
126         return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
127             new FactoryData(name, agents, properties, batchSize, dataDir));
128     }
129 
130     @Override
131     public void send(FlumeEvent event, int delay, int retries) {
132         source.send(event);
133     }
134 
135     @Override
136     protected void releaseSub() {
137         node.stop();
138     }
139 
140     /**
141      * Factory data.
142      */
143     private static class FactoryData {
144         private Agent[] agents;
145         private Property[] properties;
146         private int batchSize;
147         private String dataDir;
148         private String name;
149 
150         /**
151          * Constructor.
152          * @param name The name of the Appender.
153          * @param agents The agents.
154          * @param properties The Flume configuration properties.
155          * @param batchSize The number of events to include in a batch.
156          * @param dataDir The directory where Flume should write to.
157          */
158         public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize, String dataDir) {
159             this.name = name;
160             this.agents = agents;
161             this.batchSize = batchSize;
162             this.properties = properties;
163             this.dataDir = dataDir;
164         }
165     }
166 
167     /**
168      * Avro Manager Factory.
169      */
170     private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
171         private static final String sourceType = Log4jEventSource.class.getName();
172 
173         /**
174          * Create the FlumeAvroManager.
175          * @param name The name of the entity to manage.
176          * @param data The data required to create the entity.
177          * @return The FlumeAvroManager.
178          */
179         public FlumeEmbeddedManager createManager(String name, FactoryData data) {
180             try {
181                 DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
182                 Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
183                     data.dataDir);
184                 FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
185                 NodeConfiguration conf = builder.load(data.name, props, nodeManager);
186 
187                 FlumeNode node = new FlumeNode(nodeManager, conf);
188 
189                 node.start();
190                 LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
191 
192                 return new FlumeEmbeddedManager(name, data.name, node);
193             } catch (Exception ex) {
194                 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
195             }
196             return null;
197         }
198 
199         private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize,
200                                             String dataDir) {
201             Properties props = new Properties();
202 
203             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
204                 LOGGER.error("No Flume configuration provided");
205                 throw new ConfigurationException("No Flume configuration provided");
206             }
207 
208             if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
209                 LOGGER.error("Agents and Flume configuration cannot both be specified");
210                 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
211             }
212 
213             if (agents != null && agents.length > 0) {
214                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
215                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
216                 props.put(name + ".channels", "file");
217                 props.put(name + ".channels.file.type", "file");
218                 if (dataDir != null && dataDir.length() > 0) {
219                     if (!dataDir.endsWith(LINE_SEP)) {
220                         dataDir = dataDir + LINE_SEP;
221                     }
222 
223                     props.put(name + ".channels.file.checkpointDir", dataDir + "checkpoint");
224                     props.put(name + ".channels.file.dataDirs", dataDir + "data");
225                 }
226 
227                 StringBuilder sb = new StringBuilder();
228                 String leading = "";
229                 int priority = agents.length;
230                 for (int i=0; i < agents.length; ++i) {
231                     sb.append(leading).append("agent").append(i);
232                     leading = " ";
233                     String prefix = name + ".sinks.agent" + i;
234                     props.put(prefix + ".channel", "file");
235                     props.put(prefix + ".type", "avro");
236                     props.put(prefix + ".hostname", agents[i].getHost());
237                     props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
238                     props.put(prefix + ".batch-size", Integer.toString(batchSize));
239                     props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
240                     --priority;
241                 }
242                 props.put(name + ".sinks", sb.toString());
243                 props.put(name + ".sinkgroups", "group1");
244                 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
245                 props.put(name + ".sinkgroups.group1.processor.type", "failover");
246                 String sourceChannels = "file";
247                 props.put(name + ".channels", sourceChannels);
248                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
249             } else {
250                 String channels = null;
251                 String[] sinks = null;
252 
253                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
254                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
255 
256                 for (Property property : properties) {
257                     String key = property.getName();
258 
259                     if (key == null || key.length() == 0) {
260                         String msg = "A property name must be provided";
261                         LOGGER.error(msg);
262                         throw new ConfigurationException(msg);
263                     }
264 
265                     String upperKey = key.toUpperCase(Locale.ENGLISH);
266 
267                     if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
268                         String msg = "Specification of the agent name is allowed in Flume Appender configuration: " + key;
269                         LOGGER.error(msg);
270                         throw new ConfigurationException(msg);
271                     }
272 
273                     if (upperKey.startsWith("SOURCES.")) {
274                         String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
275                         LOGGER.error(msg);
276                         throw new ConfigurationException(msg);
277                     }
278 
279                     String value = property.getValue();
280                     if (value == null || value.length() == 0) {
281                         String msg = "A value for property " + key + " must be provided";
282                         LOGGER.error(msg);
283                         throw new ConfigurationException(msg);
284                     }
285 
286                     if (upperKey.equals("CHANNELS")) {
287                         channels = value.trim();
288                     } else if (upperKey.equals("SINKS")) {
289                         sinks = value.trim().split(" ");
290                     }
291 
292                     props.put(name + '.' + key, value);
293                 }
294 
295                 String sourceChannels = channels;
296 
297                 if (channels == null) {
298                     sourceChannels = "file";
299                     props.put(name + ".channels", sourceChannels);
300                 }
301 
302                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
303 
304                 if (sinks == null || sinks.length == 0) {
305                     String msg = "At least one Sink must be specified";
306                     LOGGER.error(msg);
307                     throw new ConfigurationException(msg);
308                 }
309             }
310             return props;
311         }
312 
313     }
314 
315 }