1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.Channel;
20 import org.apache.flume.ChannelFactory;
21 import org.apache.flume.ChannelSelector;
22 import org.apache.flume.Context;
23 import org.apache.flume.Sink;
24 import org.apache.flume.SinkFactory;
25 import org.apache.flume.SinkProcessor;
26 import org.apache.flume.SinkRunner;
27 import org.apache.flume.Source;
28 import org.apache.flume.SourceFactory;
29 import org.apache.flume.SourceRunner;
30 import org.apache.flume.channel.ChannelProcessor;
31 import org.apache.flume.channel.ChannelSelectorFactory;
32 import org.apache.flume.channel.DefaultChannelFactory;
33 import org.apache.flume.conf.BasicConfigurationConstants;
34 import org.apache.flume.conf.ComponentConfiguration;
35 import org.apache.flume.conf.Configurables;
36 import org.apache.flume.conf.FlumeConfiguration;
37 import org.apache.flume.conf.FlumeConfigurationError;
38 import org.apache.flume.conf.channel.ChannelSelectorConfiguration;
39 import org.apache.flume.conf.file.SimpleNodeConfiguration;
40 import org.apache.flume.conf.sink.SinkConfiguration;
41 import org.apache.flume.conf.sink.SinkGroupConfiguration;
42 import org.apache.flume.conf.source.SourceConfiguration;
43 import org.apache.flume.node.NodeConfiguration;
44 import org.apache.flume.node.nodemanager.NodeConfigurationAware;
45 import org.apache.flume.sink.DefaultSinkFactory;
46 import org.apache.flume.sink.DefaultSinkProcessor;
47 import org.apache.flume.sink.SinkGroup;
48 import org.apache.flume.source.DefaultSourceFactory;
49 import org.apache.logging.log4j.Logger;
50 import org.apache.logging.log4j.core.config.ConfigurationException;
51 import org.apache.logging.log4j.status.StatusLogger;
52
53 import java.util.ArrayList;
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Properties;
58 import java.util.Set;
59 import java.util.TreeSet;
60
61
62
63
64
65 public class FlumeConfigurationBuilder {
66
67 private static final Logger LOGGER = StatusLogger.getLogger();
68
69 private final ChannelFactory channelFactory = new DefaultChannelFactory();
70 private final SourceFactory sourceFactory = new DefaultSourceFactory();
71 private final SinkFactory sinkFactory = new DefaultSinkFactory();
72
73 public NodeConfiguration load(String name, Properties props, NodeConfigurationAware configurationAware) {
74 NodeConfiguration conf = new SimpleNodeConfiguration();
75 FlumeConfiguration fconfig;
76 try {
77 fconfig = new FlumeConfiguration(props);
78 List<FlumeConfigurationError> errors = fconfig.getConfigurationErrors();
79 if (errors.size() > 0) {
80 boolean isError = false;
81 for (FlumeConfigurationError error : errors) {
82 StringBuilder sb = new StringBuilder();
83 sb.append("Component: ").append(error.getComponentName()).append(" ");
84 sb.append("Key: ").append(error.getKey()).append(" ");
85 sb.append(error.getErrorType().name()).append(" - ").append(error.getErrorType().getError());
86 switch (error.getErrorOrWarning()) {
87 case ERROR:
88 isError = true;
89 LOGGER.error(sb.toString());
90 break;
91 case WARNING:
92 LOGGER.warn(sb.toString());
93 break;
94 }
95 }
96 if (isError) {
97 throw new ConfigurationException("Unable to configure Flume due to errors");
98 }
99 }
100 } catch (RuntimeException ex) {
101 printProps(props);
102 throw ex;
103 }
104
105 FlumeConfiguration.AgentConfiguration agentConf = fconfig.getConfigurationFor(name);
106
107 if (agentConf != null) {
108
109 loadChannels(agentConf, conf);
110 loadSources(agentConf, conf);
111 loadSinks(agentConf, conf);
112
113 configurationAware.startAllComponents(conf);
114 } else {
115 LOGGER.warn("No configuration found for: {}", name);
116 }
117 return conf;
118 }
119
120 private void printProps(Properties props) {
121 for (String key : new TreeSet<String>(props.stringPropertyNames())) {
122 LOGGER.error(key + "=" + props.getProperty(key));
123 }
124 }
125
126 protected void loadChannels(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
127 LOGGER.info("Creating channels");
128 Set<String> channels = agentConf.getChannelSet();
129 Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
130 for (String chName : channels) {
131 ComponentConfiguration comp = compMap.get(chName);
132 if (comp != null) {
133 Channel channel = channelFactory.create(comp.getComponentName(), comp.getType());
134
135 Configurables.configure(channel, comp);
136
137 conf.getChannels().put(comp.getComponentName(), channel);
138 }
139 }
140
141 for (String ch : channels) {
142 Context context = agentConf.getChannelContext().get(ch);
143 if (context != null) {
144 Channel channel = channelFactory.create(ch, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
145 Configurables.configure(channel, context);
146 conf.getChannels().put(ch, channel);
147 LOGGER.info("created channel " + ch);
148 }
149 }
150 }
151
152 protected void loadSources(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
153
154 Set<String> sources = agentConf.getSourceSet();
155 Map<String, ComponentConfiguration> compMap = agentConf.getSourceConfigMap();
156 for (String sourceName : sources) {
157 ComponentConfiguration comp = compMap.get(sourceName);
158 if (comp != null) {
159 SourceConfiguration config = (SourceConfiguration) comp;
160
161 Source source = sourceFactory.create(comp.getComponentName(), comp.getType());
162
163 Configurables.configure(source, config);
164 Set<String> channelNames = config.getChannels();
165 List<Channel> channels = new ArrayList<Channel>();
166 for (String chName : channelNames) {
167 channels.add(conf.getChannels().get(chName));
168 }
169
170 ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration();
171
172 ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
173
174 ChannelProcessor channelProcessor = new ChannelProcessor(selector);
175 Configurables.configure(channelProcessor, config);
176
177 source.setChannelProcessor(channelProcessor);
178 conf.getSourceRunners().put(comp.getComponentName(), SourceRunner.forSource(source));
179 }
180 }
181 Map<String, Context> sourceContexts = agentConf.getSourceContext();
182
183 for (String src : sources) {
184 Context context = sourceContexts.get(src);
185 if (context != null){
186 Source source = sourceFactory.create(src, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
187 List<Channel> channels = new ArrayList<Channel>();
188 Configurables.configure(source, context);
189 String[] channelNames = context.getString(BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
190 for (String chName : channelNames) {
191 channels.add(conf.getChannels().get(chName));
192 }
193
194 Map<String, String> selectorConfig = context.getSubProperties(
195 BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
196
197 ChannelSelector selector = ChannelSelectorFactory.create(channels, selectorConfig);
198
199 ChannelProcessor channelProcessor = new ChannelProcessor(selector);
200 Configurables.configure(channelProcessor, context);
201
202 source.setChannelProcessor(channelProcessor);
203 conf.getSourceRunners().put(src, SourceRunner.forSource(source));
204 }
205 }
206 }
207
208 protected void loadSinks(FlumeConfiguration.AgentConfiguration agentConf, NodeConfiguration conf) {
209 Set<String> sinkNames = agentConf.getSinkSet();
210 Map<String, ComponentConfiguration> compMap = agentConf.getSinkConfigMap();
211 Map<String, Sink> sinks = new HashMap<String, Sink>();
212 for (String sinkName : sinkNames) {
213 ComponentConfiguration comp = compMap.get(sinkName);
214 if (comp != null) {
215 SinkConfiguration config = (SinkConfiguration) comp;
216 Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
217
218 Configurables.configure(sink, config);
219
220 sink.setChannel(conf.getChannels().get(config.getChannel()));
221 sinks.put(comp.getComponentName(), sink);
222 }
223 }
224
225 Map<String, Context> sinkContexts = agentConf.getSinkContext();
226 for (String sinkName : sinkNames) {
227 Context context = sinkContexts.get(sinkName);
228 if (context != null) {
229 Sink sink = sinkFactory.create(sinkName, context.getString(BasicConfigurationConstants.CONFIG_TYPE));
230 Configurables.configure(sink, context);
231
232 sink.setChannel(conf.getChannels().get(context.getString(BasicConfigurationConstants.CONFIG_CHANNEL)));
233 sinks.put(sinkName, sink);
234 }
235 }
236
237 loadSinkGroups(agentConf, sinks, conf);
238 }
239
240 protected void loadSinkGroups(FlumeConfiguration.AgentConfiguration agentConf,
241 Map<String, Sink> sinks, NodeConfiguration conf) {
242 Set<String> sinkgroupNames = agentConf.getSinkgroupSet();
243 Map<String, ComponentConfiguration> compMap = agentConf.getSinkGroupConfigMap();
244 Map<String, String> usedSinks = new HashMap<String, String>();
245 for (String groupName : sinkgroupNames) {
246 ComponentConfiguration comp = compMap.get(groupName);
247 if (comp != null) {
248 SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
249 List<String> groupSinkList = groupConf.getSinks();
250 List<Sink> groupSinks = new ArrayList<Sink>();
251 for (String sink : groupSinkList) {
252 Sink s = sinks.remove(sink);
253 if (s == null) {
254 String sinkUser = usedSinks.get(sink);
255 if (sinkUser != null) {
256 throw new ConfigurationException(String.format(
257 "Sink %s of group %s already in use by group %s", sink, groupName, sinkUser));
258 } else {
259 throw new ConfigurationException(String.format(
260 "Sink %s of group %s does not exist or is not properly configured", sink,
261 groupName));
262 }
263 }
264 groupSinks.add(s);
265 usedSinks.put(sink, groupName);
266 }
267 SinkGroup group = new SinkGroup(groupSinks);
268 Configurables.configure(group, groupConf);
269 conf.getSinkRunners().put(comp.getComponentName(), new SinkRunner(group.getProcessor()));
270 }
271 }
272
273 for (Map.Entry<String, Sink> entry : sinks.entrySet()) {
274 if (!usedSinks.containsValue(entry.getKey())) {
275 SinkProcessor pr = new DefaultSinkProcessor();
276 List<Sink> sinkMap = new ArrayList<Sink>();
277 sinkMap.add(entry.getValue());
278 pr.setSinks(sinkMap);
279 Configurables.configure(pr, new Context());
280 conf.getSinkRunners().put(entry.getKey(), new SinkRunner(pr));
281 }
282 }
283 }
284 }