1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.Serializable;
20 import java.util.Locale;
21
22 import org.apache.logging.log4j.core.Filter;
23 import org.apache.logging.log4j.core.Layout;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.appender.AbstractAppender;
26 import org.apache.logging.log4j.core.config.Property;
27 import org.apache.logging.log4j.core.config.plugins.Plugin;
28 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
29 import org.apache.logging.log4j.core.config.plugins.PluginElement;
30 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
31 import org.apache.logging.log4j.core.layout.Rfc5424Layout;
32 import org.apache.logging.log4j.core.net.Facility;
33 import org.apache.logging.log4j.core.util.Booleans;
34 import org.apache.logging.log4j.core.util.Integers;
35
36
37
38
39 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
40 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
41
42 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
43 private static final int DEFAULT_MAX_DELAY = 60000;
44
45 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
46
47 private final AbstractFlumeManager manager;
48
49 private final String mdcIncludes;
50 private final String mdcExcludes;
51 private final String mdcRequired;
52
53 private final String eventPrefix;
54
55 private final String mdcPrefix;
56
57 private final boolean compressBody;
58
59 private final FlumeEventFactory factory;
60
61
62
63
64 private enum ManagerType {
65 AVRO, EMBEDDED, PERSISTENT;
66
67 public static ManagerType getType(final String type) {
68 return valueOf(type.toUpperCase(Locale.US));
69 }
70 }
71
72 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
73 final boolean ignoreExceptions, final String includes, final String excludes,
74 final String required, final String mdcPrefix, final String eventPrefix,
75 final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
76 super(name, filter, layout, ignoreExceptions);
77 this.manager = manager;
78 this.mdcIncludes = includes;
79 this.mdcExcludes = excludes;
80 this.mdcRequired = required;
81 this.eventPrefix = eventPrefix;
82 this.mdcPrefix = mdcPrefix;
83 this.compressBody = compress;
84 this.factory = factory == null ? this : factory;
85 }
86
87
88
89
90
91 @Override
92 public void append(final LogEvent event) {
93 final String name = event.getLoggerName();
94 if (name != null) {
95 for (final String pkg : EXCLUDED_PACKAGES) {
96 if (name.startsWith(pkg)) {
97 return;
98 }
99 }
100 }
101 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
102 eventPrefix, compressBody);
103 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
104 manager.send(flumeEvent);
105 }
106
107 @Override
108 public void stop() {
109 super.stop();
110 manager.release();
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124 @Override
125 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
126 final String required, final String mdcPrefix, final String eventPrefix,
127 final boolean compress) {
128 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
129 eventPrefix, compressBody);
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 @PluginFactory
163 public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
164 @PluginElement("Properties") final Property[] properties,
165 @PluginAttribute("embedded") final String embedded,
166 @PluginAttribute("type") final String type,
167 @PluginAttribute("dataDir") final String dataDir,
168 @PluginAttribute("connectTimeout") final String connectionTimeout,
169 @PluginAttribute("requestTimeout") final String requestTimeout,
170 @PluginAttribute("agentRetries") final String agentRetries,
171 @PluginAttribute("maxDelay") final String maxDelay,
172 @PluginAttribute("name") final String name,
173 @PluginAttribute("ignoreExceptions") final String ignore,
174 @PluginAttribute("mdcExcludes") final String excludes,
175 @PluginAttribute("mdcIncludes") final String includes,
176 @PluginAttribute("mdcRequired") final String required,
177 @PluginAttribute("mdcPrefix") final String mdcPrefix,
178 @PluginAttribute("eventPrefix") final String eventPrefix,
179 @PluginAttribute("compress") final String compressBody,
180 @PluginAttribute("batchSize") final String batchSize,
181 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
182 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
183 @PluginElement("Layout") Layout<? extends Serializable> layout,
184 @PluginElement("Filter") final Filter filter) {
185
186 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
187 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
188 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
189 final boolean compress = Booleans.parseBoolean(compressBody, true);
190 ManagerType managerType;
191 if (type != null) {
192 if (embed && embedded != null) {
193 try {
194 managerType = ManagerType.getType(type);
195 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
196 } catch (final Exception ex) {
197 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
198 " is invalid.");
199 managerType = ManagerType.EMBEDDED;
200 }
201 } else {
202 try {
203 managerType = ManagerType.getType(type);
204 } catch (final Exception ex) {
205 LOGGER.warn("Type " + type + " is invalid.");
206 managerType = ManagerType.EMBEDDED;
207 }
208 }
209 } else if (embed) {
210 managerType = ManagerType.EMBEDDED;
211 } else {
212 managerType = ManagerType.AVRO;
213 }
214
215 final int batchCount = Integers.parseInt(batchSize, 1);
216 final int connectTimeout = Integers.parseInt(connectionTimeout, 0);
217 final int reqTimeout = Integers.parseInt(requestTimeout, 0);
218 final int retries = Integers.parseInt(agentRetries, 0);
219 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
220 final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY);
221
222 if (layout == null) {
223 final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
224 layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
225 mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
226 null);
227 }
228
229 if (name == null) {
230 LOGGER.error("No name provided for Appender");
231 return null;
232 }
233
234 AbstractFlumeManager manager;
235
236 switch (managerType) {
237 case EMBEDDED:
238 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
239 break;
240 case AVRO:
241 if (agents == null || agents.length == 0) {
242 LOGGER.debug("No agents provided, using defaults");
243 agents = new Agent[] {Agent.createAgent(null, null)};
244 }
245 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
246 break;
247 case PERSISTENT:
248 if (agents == null || agents.length == 0) {
249 LOGGER.debug("No agents provided, using defaults");
250 agents = new Agent[] {Agent.createAgent(null, null)};
251 }
252 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
253 connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir);
254 break;
255 default:
256 LOGGER.debug("No manager type specified. Defaulting to AVRO");
257 if (agents == null || agents.length == 0) {
258 LOGGER.debug("No agents provided, using defaults");
259 agents = new Agent[] {Agent.createAgent(null, null)};
260 }
261 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
262 }
263
264 if (manager == null) {
265 return null;
266 }
267
268 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes,
269 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
270 }
271 }