View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.Event;
20  import org.apache.flume.SourceRunner;
21  import org.apache.flume.node.NodeConfiguration;
22  import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
23  import org.apache.logging.log4j.core.appender.ManagerFactory;
24  import org.apache.logging.log4j.core.config.ConfigurationException;
25  import org.apache.logging.log4j.core.config.Property;
26  import org.apache.logging.log4j.core.helpers.NameUtil;
27  import org.apache.logging.log4j.util.PropertiesUtil;
28  
29  import java.util.Locale;
30  import java.util.Properties;
31  
32  /**
33   *
34   */
35  public class FlumeEmbeddedManager extends AbstractFlumeManager {
36  
37      /** Name for the Flume source */
38      protected static final String SOURCE_NAME = "log4j-source";
39  
40      private static FlumeManagerFactory factory = new FlumeManagerFactory();
41  
42      private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
43  
44      private static final String IN_MEMORY = "InMemory";
45  
46      private final FlumeNode node;
47  
48      private NodeConfiguration conf;
49  
50      private final Log4jEventSource source;
51  
52      private final String shortName;
53  
54  
55      /**
56       * Constructor
57       * @param name The unique name of this manager.
58       * @param node The Flume Node.
59       */
60      protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
61          super(name);
62          this.node = node;
63          this.shortName = shortName;
64          final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
65          if (runner == null || runner.getSource() == null) {
66              throw new IllegalStateException("No Source has been created for Appender " + shortName);
67          }
68          source  = (Log4jEventSource) runner.getSource();
69      }
70  
71      /**
72       * Returns a FlumeEmbeddedManager.
73       * @param name The name of the manager.
74       * @param agents The agents to use.
75       * @param properties Properties for the embedded manager.
76       * @param batchSize The number of events to include in a batch.
77       * @param dataDir The directory where the Flume FileChannel should write to.
78       * @return A FlumeAvroManager.
79       */
80      public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
81                                                    int batchSize, final String dataDir) {
82  
83          if (batchSize <= 0) {
84              batchSize = 1;
85          }
86  
87          if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
88              throw new IllegalArgumentException("Either an Agent or properties are required");
89          } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
90              throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
91          }
92  
93          final StringBuilder sb = new StringBuilder();
94          boolean first = true;
95  
96          if (agents != null && agents.length > 0) {
97              sb.append("FlumeEmbedded[");
98              for (final Agent agent : agents) {
99                  if (!first) {
100                     sb.append(",");
101                 }
102                 sb.append(agent.getHost()).append(":").append(agent.getPort());
103                 first = false;
104             }
105             sb.append("]");
106         } else {
107             String sep = "";
108             sb.append(name).append(":");
109             final StringBuilder props = new StringBuilder();
110             for (final Property prop : properties) {
111                 props.append(sep);
112                 props.append(prop.getName()).append("=").append(prop.getValue());
113                 sep = ",";
114             }
115             sb.append(NameUtil.md5(props.toString()));
116         }
117         return getManager(sb.toString(), factory,
118                 new FactoryData(name, agents, properties, batchSize, dataDir));
119     }
120 
121     @Override
122     public void send(final Event event) {
123         source.send(event);
124     }
125 
126     @Override
127     protected void releaseSub() {
128         node.stop();
129     }
130 
131     /**
132      * Factory data.
133      */
134     private static class FactoryData {
135         private final Agent[] agents;
136         private final Property[] properties;
137         private final int batchSize;
138         private final String dataDir;
139         private final String name;
140 
141         /**
142          * Constructor.
143          * @param name The name of the Appender.
144          * @param agents The agents.
145          * @param properties The Flume configuration properties.
146          * @param batchSize The number of events to include in a batch.
147          * @param dataDir The directory where Flume should write to.
148          */
149         public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
150                            final String dataDir) {
151             this.name = name;
152             this.agents = agents;
153             this.batchSize = batchSize;
154             this.properties = properties;
155             this.dataDir = dataDir;
156         }
157     }
158 
159     /**
160      * Avro Manager Factory.
161      */
162     private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
163         private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
164 
165         /**
166          * Create the FlumeAvroManager.
167          * @param name The name of the entity to manage.
168          * @param data The data required to create the entity.
169          * @return The FlumeAvroManager.
170          */
171         public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
172             try {
173                 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
174                 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
175                     data.dataDir);
176                 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
177                 final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
178 
179                 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
180 
181                 node.start();
182 
183                 return new FlumeEmbeddedManager(name, data.name, node);
184             } catch (final Exception ex) {
185                 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
186             }
187             return null;
188         }
189 
190         private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
191                                             final int batchSize, String dataDir) {
192             final Properties props = new Properties();
193 
194             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
195                 LOGGER.error("No Flume configuration provided");
196                 throw new ConfigurationException("No Flume configuration provided");
197             }
198 
199             if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
200                 LOGGER.error("Agents and Flume configuration cannot both be specified");
201                 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
202             }
203 
204             if (agents != null && agents.length > 0) {
205                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
206                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
207 
208                 if (dataDir != null && dataDir.length() > 0) {
209                     if (dataDir.equals(IN_MEMORY)) {
210                         props.put(name + ".channels", "primary");
211                         props.put(name + ".channels.primary.type", "memory");
212                     } else {
213                         props.put(name + ".channels", "primary");
214                         props.put(name + ".channels.primary.type", "file");
215 
216                         if (!dataDir.endsWith(FiLE_SEP)) {
217                             dataDir = dataDir + FiLE_SEP;
218                         }
219 
220                         props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
221                         props.put(name + ".channels.primary.dataDirs", dataDir + "data");
222                     }
223 
224                 } else {
225                     props.put(name + ".channels", "primary");
226                     props.put(name + ".channels.primary.type", "file");
227                 }
228 
229                 final StringBuilder sb = new StringBuilder();
230                 String leading = "";
231                 int priority = agents.length;
232                 for (int i = 0; i < agents.length; ++i) {
233                     sb.append(leading).append("agent").append(i);
234                     leading = " ";
235                     final String prefix = name + ".sinks.agent" + i;
236                     props.put(prefix + ".channel", "primary");
237                     props.put(prefix + ".type", "avro");
238                     props.put(prefix + ".hostname", agents[i].getHost());
239                     props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
240                     props.put(prefix + ".batch-size", Integer.toString(batchSize));
241                     props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
242                     --priority;
243                 }
244                 props.put(name + ".sinks", sb.toString());
245                 props.put(name + ".sinkgroups", "group1");
246                 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
247                 props.put(name + ".sinkgroups.group1.processor.type", "failover");
248                 final String sourceChannels = "primary";
249                 props.put(name + ".channels", sourceChannels);
250                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
251             } else {
252                 String channels = null;
253                 String[] sinks = null;
254 
255                 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
256                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
257 
258                 for (final Property property : properties) {
259                     final String key = property.getName();
260 
261                     if (key == null || key.length() == 0) {
262                         final String msg = "A property name must be provided";
263                         LOGGER.error(msg);
264                         throw new ConfigurationException(msg);
265                     }
266 
267                     final String upperKey = key.toUpperCase(Locale.ENGLISH);
268 
269                     if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
270                         final String msg =
271                             "Specification of the agent name is allowed in Flume Appender configuration: " + key;
272                         LOGGER.error(msg);
273                         throw new ConfigurationException(msg);
274                     }
275 
276                     if (upperKey.startsWith("SOURCES.")) {
277                         final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
278                         LOGGER.error(msg);
279                         throw new ConfigurationException(msg);
280                     }
281 
282                     final String value = property.getValue();
283                     if (value == null || value.length() == 0) {
284                         final String msg = "A value for property " + key + " must be provided";
285                         LOGGER.error(msg);
286                         throw new ConfigurationException(msg);
287                     }
288 
289                     if (upperKey.equals("CHANNELS")) {
290                         channels = value.trim();
291                     } else if (upperKey.equals("SINKS")) {
292                         sinks = value.trim().split(" ");
293                     }
294 
295                     props.put(name + '.' + key, value);
296                 }
297 
298                 String sourceChannels = channels;
299 
300                 if (channels == null) {
301                     sourceChannels = "primary";
302                     props.put(name + ".channels", sourceChannels);
303                 }
304 
305                 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
306 
307                 if (sinks == null || sinks.length == 0) {
308                     final String msg = "At least one Sink must be specified";
309                     LOGGER.error(msg);
310                     throw new ConfigurationException(msg);
311                 }
312             }
313             return props;
314         }
315 
316     }
317 
318 }