1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.SourceRunner;
20 import org.apache.flume.lifecycle.LifecycleController;
21 import org.apache.flume.lifecycle.LifecycleState;
22 import org.apache.flume.node.NodeConfiguration;
23 import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
24 import org.apache.logging.log4j.core.appender.ManagerFactory;
25 import org.apache.logging.log4j.core.config.ConfigurationException;
26 import org.apache.logging.log4j.core.config.Property;
27
28 import java.security.MessageDigest;
29 import java.util.Locale;
30 import java.util.Properties;
31
32
33
34
35 public class FlumeEmbeddedManager extends AbstractFlumeManager {
36
37 private static ManagerFactory factory = new FlumeManagerFactory();
38
39 private final FlumeNode node;
40
41 private NodeConfiguration conf;
42
43 protected static final String SOURCE_NAME = "log4j-source";
44
45 private static final String LINE_SEP = System.getProperty("file.separator");
46
47 private final Log4jEventSource source;
48
49 private final String shortName;
50
51
52
53
54
55
56
57 protected FlumeEmbeddedManager(String name, String shortName, FlumeNode node) {
58 super(name);
59 this.node = node;
60 this.shortName = shortName;
61 SourceRunner runner = node.getConfiguration().getSourceRunners().get(SOURCE_NAME);
62 if (runner == null || runner.getSource() == null) {
63 throw new IllegalStateException("No Source has been created for Appender " + shortName);
64 }
65 source = (Log4jEventSource) runner.getSource();
66 }
67
68
69
70
71
72
73
74 public static FlumeEmbeddedManager getManager(String name, Agent[] agents, Property[] properties, int batchSize,
75 String dataDir) {
76
77 if (batchSize <= 0) {
78 batchSize = 1;
79 }
80
81 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
82 throw new IllegalArgumentException("Either an Agent or properties are required");
83 } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
84 throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
85 }
86
87 StringBuilder sb = new StringBuilder();
88 boolean first = true;
89
90 if (agents != null && agents.length > 0) {
91 sb.append("FlumeEmbedded[");
92 for (Agent agent : agents) {
93 if (!first) {
94 sb.append(",");
95 }
96 sb.append(agent.getHost()).append(":").append(agent.getPort());
97 first = false;
98 }
99 sb.append("]");
100 } else {
101 String sep = "";
102 sb.append(name).append(":");
103 StringBuilder props = new StringBuilder();
104 for (Property prop : properties) {
105 props.append(sep);
106 props.append(prop.getName()).append("=").append(prop.getValue());
107 sep = ",";
108 }
109 try {
110 MessageDigest digest = MessageDigest.getInstance("MD5");
111 digest.update(sb.toString().getBytes());
112 byte[] bytes = digest.digest();
113 StringBuilder md5 = new StringBuilder();
114 for (byte b : bytes) {
115 String hex = Integer.toHexString(0xff & b);
116 if (hex.length() == 1) {
117 md5.append('0');
118 }
119 md5.append(hex);
120 }
121 sb.append(md5.toString());
122 } catch (Exception ex) {
123 sb.append(props);
124 }
125 }
126 return (FlumeEmbeddedManager) getManager(sb.toString(), factory,
127 new FactoryData(name, agents, properties, batchSize, dataDir));
128 }
129
130 @Override
131 public void send(FlumeEvent event, int delay, int retries) {
132 source.send(event);
133 }
134
135 @Override
136 protected void releaseSub() {
137 node.stop();
138 }
139
140
141
142
143 private static class FactoryData {
144 private Agent[] agents;
145 private Property[] properties;
146 private int batchSize;
147 private String dataDir;
148 private String name;
149
150
151
152
153
154
155
156
157
158 public FactoryData(String name, Agent[] agents, Property[] properties, int batchSize, String dataDir) {
159 this.name = name;
160 this.agents = agents;
161 this.batchSize = batchSize;
162 this.properties = properties;
163 this.dataDir = dataDir;
164 }
165 }
166
167
168
169
170 private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
171 private static final String sourceType = Log4jEventSource.class.getName();
172
173
174
175
176
177
178
179 public FlumeEmbeddedManager createManager(String name, FactoryData data) {
180 try {
181 DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
182 Properties props = createProperties(data.name, data.agents, data.properties, data.batchSize,
183 data.dataDir);
184 FlumeConfigurationBuilder builder = new FlumeConfigurationBuilder();
185 NodeConfiguration conf = builder.load(data.name, props, nodeManager);
186
187 FlumeNode node = new FlumeNode(nodeManager, conf);
188
189 node.start();
190 LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
191
192 return new FlumeEmbeddedManager(name, data.name, node);
193 } catch (Exception ex) {
194 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
195 }
196 return null;
197 }
198
199 private Properties createProperties(String name, Agent[] agents, Property[] properties, int batchSize,
200 String dataDir) {
201 Properties props = new Properties();
202
203 if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
204 LOGGER.error("No Flume configuration provided");
205 throw new ConfigurationException("No Flume configuration provided");
206 }
207
208 if ((agents != null && agents.length > 0 && properties != null && properties.length > 0)) {
209 LOGGER.error("Agents and Flume configuration cannot both be specified");
210 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
211 }
212
213 if (agents != null && agents.length > 0) {
214 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
215 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
216 props.put(name + ".channels", "file");
217 props.put(name + ".channels.file.type", "file");
218 if (dataDir != null && dataDir.length() > 0) {
219 if (!dataDir.endsWith(LINE_SEP)) {
220 dataDir = dataDir + LINE_SEP;
221 }
222
223 props.put(name + ".channels.file.checkpointDir", dataDir + "checkpoint");
224 props.put(name + ".channels.file.dataDirs", dataDir + "data");
225 }
226
227 StringBuilder sb = new StringBuilder();
228 String leading = "";
229 int priority = agents.length;
230 for (int i=0; i < agents.length; ++i) {
231 sb.append(leading).append("agent").append(i);
232 leading = " ";
233 String prefix = name + ".sinks.agent" + i;
234 props.put(prefix + ".channel", "file");
235 props.put(prefix + ".type", "avro");
236 props.put(prefix + ".hostname", agents[i].getHost());
237 props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
238 props.put(prefix + ".batch-size", Integer.toString(batchSize));
239 props.put(name + ".sinkgroups.group1.processor.priority.agent" + i, Integer.toString(priority));
240 --priority;
241 }
242 props.put(name + ".sinks", sb.toString());
243 props.put(name + ".sinkgroups", "group1");
244 props.put(name + ".sinkgroups.group1.sinks", sb.toString());
245 props.put(name + ".sinkgroups.group1.processor.type", "failover");
246 String sourceChannels = "file";
247 props.put(name + ".channels", sourceChannels);
248 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
249 } else {
250 String channels = null;
251 String[] sinks = null;
252
253 props.put(name + ".sources", FlumeEmbeddedManager.SOURCE_NAME);
254 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".type", sourceType);
255
256 for (Property property : properties) {
257 String key = property.getName();
258
259 if (key == null || key.length() == 0) {
260 String msg = "A property name must be provided";
261 LOGGER.error(msg);
262 throw new ConfigurationException(msg);
263 }
264
265 String upperKey = key.toUpperCase(Locale.ENGLISH);
266
267 if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
268 String msg = "Specification of the agent name is allowed in Flume Appender configuration: " + key;
269 LOGGER.error(msg);
270 throw new ConfigurationException(msg);
271 }
272
273 if (upperKey.startsWith("SOURCES.")) {
274 String msg = "Specification of Sources is not allowed in Flume Appender: " + key;
275 LOGGER.error(msg);
276 throw new ConfigurationException(msg);
277 }
278
279 String value = property.getValue();
280 if (value == null || value.length() == 0) {
281 String msg = "A value for property " + key + " must be provided";
282 LOGGER.error(msg);
283 throw new ConfigurationException(msg);
284 }
285
286 if (upperKey.equals("CHANNELS")) {
287 channels = value.trim();
288 } else if (upperKey.equals("SINKS")) {
289 sinks = value.trim().split(" ");
290 }
291
292 props.put(name + '.' + key, value);
293 }
294
295 String sourceChannels = channels;
296
297 if (channels == null) {
298 sourceChannels = "file";
299 props.put(name + ".channels", sourceChannels);
300 }
301
302 props.put(name + ".sources." + FlumeEmbeddedManager.SOURCE_NAME + ".channels", sourceChannels);
303
304 if (sinks == null || sinks.length == 0) {
305 String msg = "At least one Sink must be specified";
306 LOGGER.error(msg);
307 throw new ConfigurationException(msg);
308 }
309 }
310 return props;
311 }
312
313 }
314
315 }