View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.flume.Channel;
20  import org.apache.flume.ChannelFactory;
21  import org.apache.flume.ChannelSelector;
22  import org.apache.flume.Context;
23  import org.apache.flume.Sink;
24  import org.apache.flume.SinkFactory;
25  import org.apache.flume.SinkProcessor;
26  import org.apache.flume.SinkRunner;
27  import org.apache.flume.Source;
28  import org.apache.flume.SourceFactory;
29  import org.apache.flume.SourceRunner;
30  import org.apache.flume.channel.ChannelProcessor;
31  import org.apache.flume.channel.ChannelSelectorFactory;
32  import org.apache.flume.channel.DefaultChannelFactory;
33  import org.apache.flume.conf.BasicConfigurationConstants;
34  import org.apache.flume.conf.ComponentConfiguration;
35  import org.apache.flume.conf.Configurables;
36  import org.apache.flume.conf.FlumeConfiguration;
37  import org.apache.flume.conf.FlumeConfigurationError;
38  import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
39  import org.apache.flume.conf.file.SimpleNodeConfiguration;
40  import org.apache.flume.conf.sink.SinkConfiguration;
41  import org.apache.flume.conf.sink.SinkGroupConfiguration;
42  import org.apache.flume.conf.source.SourceConfiguration;
43  import org.apache.flume.node.NodeConfiguration;
44  import org.apache.flume.node.nodemanager.NodeConfigurationAware;
45  import org.apache.flume.sink.DefaultSinkFactory;
46  import org.apache.flume.sink.DefaultSinkProcessor;
47  import org.apache.flume.sink.SinkGroup;
48  import org.apache.flume.source.DefaultSourceFactory;
49  import org.apache.logging.log4j.Logger;
50  import org.apache.logging.log4j.core.config.ConfigurationException;
51  import org.apache.logging.log4j.status.StatusLogger;
52  
53  import java.util.ArrayList;
54  import java.util.HashMap;
55  import java.util.List;
56  import java.util.Map;
57  import java.util.Properties;
58  import java.util.Set;
59  import java.util.TreeSet;
60  
61  /**
62   * See Flume's PropertiesFileConfigurationProvider. This class would extend that if it were possible.
63   */
64  
65  public class FlumeConfigurationBuilder {
66  
67      private static final Logger LOGGER = StatusLogger.getLogger();
68  
69      private final ChannelFactory channelFactory = new DefaultChannelFactory();
70      private final SourceFactory sourceFactory = new DefaultSourceFactory();
71      private final SinkFactory sinkFactory = new DefaultSinkFactory();
72  
73      public NodeConfiguration load(String name, Properties props, NodeConfigurationAware configurationAware) {
74          NodeConfiguration conf = new SimpleNodeConfiguration();
75          FlumeConfiguration fconfig;
76          try {
77              fconfig = new FlumeConfiguration(props);
78              List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
79              if (errors.size() > 0) {
80                  boolean isError = false;
81                  for (FlumeConfigurationError error : errors) {
82                      StringBuilder sb = new StringBuilder();
83                      sb.append("Component: ").append(error.getComponentName()).append(" ");
84                      sb.append("Key: ").append(error.getKey()).append(" ");
85                      sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
86                      switch (error.getErrorOrWarning()) {
87                          case ERROR:
88                              isError = true;
89                              LOGGER.error(sb.toString());
90                              break;
91                          case WARNING:
92                              LOGGER.warn(sb.toString());
93                              break;
94                      }
95                  }
96                  if (isError) {
97                      throw new ConfigurationException("Unable to configure Flume due to errors");
98                  }
99              }
100         } catch (RuntimeException ex) {
101             printProps(props);
102             throw ex;
103         }
104 
105         FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
106 
107         if (agentConf != null) {
108 
109             loadChannels(agentConf, conf);
110             loadSources(agentConf, conf);
111             loadSinks(agentConf, conf);
112 
113             configurationAware.startAllComponents(conf);
114         } else {
115             LOGGER.warn("No configuration found for: {}", name);
116         }
117         return conf;
118     }
119 
120     private void printProps(Properties props) {
121         for (String key : new TreeSet<String>(props.stringPropertyNames())) {
122             LOGGER.error(key + "=" + props.getProperty(key));
123         }
124     }
125 
126     protected void loadChannels(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
127         LOGGER.info("Creating channels");
128         Set<String> channels = agentConf.getChannelSet();
129         Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
130         for (String chName : channels) {
131             ComponentConfiguration comp = compMap.get(chName);
132             if (comp != null) {
133                 Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
134 
135                 Configurables.configure(channel, comp);
136 
137                 conf.getChannels().put(comp.getComponentName(), channel);
138             }
139         }
140 
141         for (String ch : channels) {
142             Context context = agentConf.getChannelContext().get(ch);
143             if (context != null) {
144                 Channel channel = channelFactory.create(ch, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
145                 Configurables.configure(channel, context);
146                 conf.getChannels().put(ch, channel);
147                 LOGGER.info("created channel " + ch);
148             }
149         }
150     }
151 
152     protected void loadSources(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
153 
154         Set<String> sources = agentConf.getSourceSet();
155         Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
156         for (String sourceName : sources) {
157             ComponentConfiguration comp = compMap.get(sourceName);
158             if (comp != null) {
159                 SourceConfiguration config = (SourceConfiguration) comp;
160 
161                 Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
162 
163                 Configurables.configure(source, config);
164                 Set<String> channelNames = config.getChannels();
165                 List<Channel> channels = new ArrayList<Channel>();
166                 for (String chName : channelNames) {
167                     channels.add(conf.getChannels().get(chName));
168                 }
169 
170                 ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
171 
172                 ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
173 
174                 ChannelProcessor channelProcessor = new ChannelProcessor(selector);
175                 Configurables.configure(channelProcessor, config);
176 
177                 source.setChannelProcessor(channelProcessor);
178                 conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
179             }
180         }
181         Map<String, Context> sourceContexts = agentConf.getSourceContext();
182 
183         for (String src : sources) {
184             Context context = sourceContexts.get(src);
185             if (context != null){
186                 Source source = sourceFactory.create(src, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
187                 List<Channel> channels = new ArrayList<Channel>();
188                 Configurables.configure(source, context);
189                 String[] channelNames = context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
190                 for (String chName : channelNames) {
191                     channels.add(conf.getChannels().get(chName));
192                 }
193 
194                 Map<String, String> selectorConfig = context.getSubProperties(
195                     BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
196 
197                 ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
198 
199                 ChannelProcessor channelProcessor = new ChannelProcessor(selector);
200                 Configurables.configure(channelProcessor, context);
201 
202                 source.setChannelProcessor(channelProcessor);
203                 conf.getSourceRunners().put(src, SourceRunner.forSource(source));
204             }
205         }
206     }
207 
208     protected void loadSinks(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
209         Set<String> sinkNames = agentConf.getSinkSet();
210         Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
211         Map<String, Sink> sinks = new HashMap<String, Sink>();
212         for (String sinkName : sinkNames) {
213             ComponentConfiguration comp = compMap.get(sinkName);
214             if (comp != null) {
215                 SinkConfiguration config = (SinkConfiguration) comp;
216                 Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
217 
218                 Configurables.configure(sink, config);
219 
220                 sink.setChannel(conf.getChannels().get(config.getChannel()));
221                 sinks.put(comp.getComponentName(), sink);
222             }
223         }
224 
225         Map<String, Context> sinkContexts = agentConf.getSinkContext();
226         for (String sinkName : sinkNames) {
227             Context context = sinkContexts.get(sinkName);
228             if (context != null) {
229                 Sink sink = sinkFactory.create(sinkName, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
230                 Configurables.configure(sink, context);
231 
232                 sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
233                 sinks.put(sinkName, sink);
234             }
235         }
236 
237         loadSinkGroups(agentConf, sinks, conf);
238     }
239 
240     protected void loadSinkGroups(FlumeConfiguration.AgentConfiguration agentConf,
241                                   Map<String, Sink> sinks, NodeConfiguration conf) {
242         Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
243         Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
244         Map<String, String> usedSinks = new HashMap<String, String>();
245         for (String groupName : sinkgroupNames) {
246             ComponentConfiguration comp = compMap.get(groupName);
247             if (comp != null) {
248                 SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
249                 List<String> groupSinkList = groupConf.getSinks();
250                 List<Sink> groupSinks = new ArrayList<Sink>();
251                 for (String sink : groupSinkList) {
252                     Sink s = sinks.remove(sink);
253                     if (s == null) {
254                         String sinkUser = usedSinks.get(sink);
255                         if (sinkUser != null) {
256                             throw new ConfigurationException(String.format(
257                                 "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
258                         } else {
259                             throw new ConfigurationException(String.format(
260                                 "Sink %s of group %s does not exist or is not properly configured", sink,
261                                 groupName));
262                         }
263                     }
264                     groupSinks.add(s);
265                     usedSinks.put(sink, groupName);
266                 }
267                 SinkGroup group = new SinkGroup(groupSinks);
268                 Configurables.configure(group, groupConf);
269                 conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
270             }
271         }
272         // add any unasigned sinks to solo collectors
273         for (Map.Entry<String, Sink> entry : sinks.entrySet()) {
274             if (!usedSinks.containsValue(entry.getKey())) {
275                 SinkProcessor pr = new DefaultSinkProcessor();
276                 List<Sink> sinkMap = new ArrayList<Sink>();
277                 sinkMap.add(entry.getValue());
278                 pr.setSinks(sinkMap);
279                 Configurables.configure(pr, new Context());
280                 conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
281             }
282         }
283     }
284 }