1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.Event;
20 import org.apache.flume.SourceRunner;
21 import org.apache.flume.node.NodeConfiguration;
22 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
23 import org.apache.logging.log4j.core.appender.ManagerFactory;
24 import org.apache.logging.log4j.core.config.ConfigurationException;
25 import org.apache.logging.log4j.core.config.Property;
26 import org.apache.logging.log4j.core.helpers.NameUtil;
27 import org.apache.logging.log4j.util.PropertiesUtil;
28
29 import java.util.Locale;
30 import java.util.Properties;
31
32
33
34
35 public class FlumeEmbeddedManager extends AbstractFlumeManager {
36
37
38 protected static final String SOURCE_NAME = "log4j-source";
39
40 private static FlumeManagerFactory factory = new FlumeManagerFactory();
41
42 private static final String FiLE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
43
44 private static final String IN_MEMORY = "InMemory";
45
46 private final FlumeNode node;
47
48 private NodeConfiguration conf;
49
50 private final Log4jEventSource source;
51
52 private final String shortName;
53
54
55
56
57
58
59
60 protected FlumeEmbeddedManager(final String name, final String shortName, final FlumeNode node) {
61 super(name);
62 this.node = node;
63 this.shortName = shortName;
64 final SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
65 if (runner == null || runner.getSource() == null) {
66 throw new IllegalStateException("No Source has been created for Appender " + shortName);
67 }
68 source = (Log4jEventSource) runner.getSource();
69 }
70
71
72
73
74
75
76
77
78
79
80 public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
81 int batchSize, final String dataDir) {
82
83 if (batchSize <= 0) {
84 batchSize = 1;
85 }
86
87 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
88 throw new IllegalArgumentException("Either an Agent or properties are required");
89 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
90 throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
91 }
92
93 final StringBuilder sb = new StringBuilder();
94 boolean first = true;
95
96 if (agents != null && agents.length > 0) {
97 sb.append("FlumeEmbedded[");
98 for (final Agent agent : agents) {
99 if (!first) {
100 sb.append(",");
101 }
102 sb.append(agent.getHost()).append(":").append(agent.getPort());
103 first = false;
104 }
105 sb.append("]");
106 } else {
107 String sep = "";
108 sb.append(name).append(":");
109 final StringBuilder props = new StringBuilder();
110 for (final Property prop : properties) {
111 props.append(sep);
112 props.append(prop.getName()).append("=").append(prop.getValue());
113 sep = ",";
114 }
115 sb.append(NameUtil.md5(props.toString()));
116 }
117 return getManager(sb.toString(), factory,
118 new FactoryData(name, agents, properties, batchSize, dataDir));
119 }
120
121 @Override
122 public void send(final Event event) {
123 source.send(event);
124 }
125
126 @Override
127 protected void releaseSub() {
128 node.stop();
129 }
130
131
132
133
134 private static class FactoryData {
135 private final Agent[] agents;
136 private final Property[] properties;
137 private final int batchSize;
138 private final String dataDir;
139 private final String name;
140
141
142
143
144
145
146
147
148
149 public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
150 final String dataDir) {
151 this.name = name;
152 this.agents = agents;
153 this.batchSize = batchSize;
154 this.properties = properties;
155 this.dataDir = dataDir;
156 }
157 }
158
159
160
161
162 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
163 private static final String SOURCE_TYPE = Log4jEventSource.class.getName();
164
165
166
167
168
169
170
171 public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
172 try {
173 final DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
174 final Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
175 data.dataDir);
176 final FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
177 final NodeConfiguration conf = builder.load(data.name, props, nodeManager);
178
179 final FlumeNode node = new FlumeNode(nodeManager, nodeManager, conf);
180
181 node.start();
182
183 return new FlumeEmbeddedManager(name, data.name, node);
184 } catch (final Exception ex) {
185 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
186 }
187 return null;
188 }
189
190 private Properties createProperties(final String name, final Agent[] agents, final Property[] properties,
191 final int batchSize, String dataDir) {
192 final Properties props = new Properties();
193
194 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
195 LOGGER.error("No Flume configuration provided");
196 throw new ConfigurationException("No Flume configuration provided");
197 }
198
199 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
200 LOGGER.error("Agents and Flume configuration cannot both be specified");
201 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
202 }
203
204 if (agents != null && agents.length > 0) {
205 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
206 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
207
208 if (dataDir != null && dataDir.length() > 0) {
209 if (dataDir.equals(IN_MEMORY)) {
210 props.put(name + ".channels", "primary");
211 props.put(name + ".channels.primary.type", "memory");
212 } else {
213 props.put(name + ".channels", "primary");
214 props.put(name + ".channels.primary.type", "file");
215
216 if (!dataDir.endsWith(FiLE_SEP)) {
217 dataDir = dataDir + FiLE_SEP;
218 }
219
220 props.put(name + ".channels.primary.checkpointDir", dataDir + "checkpoint");
221 props.put(name + ".channels.primary.dataDirs", dataDir + "data");
222 }
223
224 } else {
225 props.put(name + ".channels", "primary");
226 props.put(name + ".channels.primary.type", "file");
227 }
228
229 final StringBuilder sb = new StringBuilder();
230 String leading = "";
231 int priority = agents.length;
232 for (int i = 0; i < agents.length; ++i) {
233 sb.append(leading).append("agent").append(i);
234 leading = " ";
235 final String prefix = name + ".sinks.agent" + i;
236 props.put(prefix + ".channel", "primary");
237 props.put(prefix + ".type", "avro");
238 props.put(prefix + ".hostname", agents[i].getHost());
239 props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
240 props.put(prefix + ".batch-size", Integer.toString(batchSize));
241 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
242 --priority;
243 }
244 props.put(name + ".sinks", sb.toString());
245 props.put(name + ".sinkgroups", "group1");
246 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
247 props.put(name + ".sinkgroups.group1.processor.type", "failover");
248 final String sourceChannels = "primary";
249 props.put(name + ".channels", sourceChannels);
250 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
251 } else {
252 String channels = null;
253 String[] sinks = null;
254
255 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
256 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", SOURCE_TYPE);
257
258 for (final Property property : properties) {
259 final String key = property.getName();
260
261 if (key == null || key.length() == 0) {
262 final String msg = "A property name must be provided";
263 LOGGER.error(msg);
264 throw new ConfigurationException(msg);
265 }
266
267 final String upperKey = key.toUpperCase(Locale.ENGLISH);
268
269 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
270 final String msg =
271 "Specification of the agent name is allowed in Flume Appender configuration: " + key;
272 LOGGER.error(msg);
273 throw new ConfigurationException(msg);
274 }
275
276 if (upperKey.startsWith("SOURCES.")) {
277 final String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
278 LOGGER.error(msg);
279 throw new ConfigurationException(msg);
280 }
281
282 final String value = property.getValue();
283 if (value == null || value.length() == 0) {
284 final String msg = "A value for property " + key + " must be provided";
285 LOGGER.error(msg);
286 throw new ConfigurationException(msg);
287 }
288
289 if (upperKey.equals("CHANNELS")) {
290 channels = value.trim();
291 } else if (upperKey.equals("SINKS")) {
292 sinks = value.trim().split(" ");
293 }
294
295 props.put(name + '.' + key, value);
296 }
297
298 String sourceChannels = channels;
299
300 if (channels == null) {
301 sourceChannels = "primary";
302 props.put(name + ".channels", sourceChannels);
303 }
304
305 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
306
307 if (sinks == null || sinks.length == 0) {
308 final String msg = "At least one Sink must be specified";
309 LOGGER.error(msg);
310 throw new ConfigurationException(msg);
311 }
312 }
313 return props;
314 }
315
316 }
317
318 }