1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.Serializable;
20 import java.util.Locale;
21
22 import org.apache.logging.log4j.core.Filter;
23 import org.apache.logging.log4j.core.Layout;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.appender.AbstractAppender;
26 import org.apache.logging.log4j.core.config.Property;
27 import org.apache.logging.log4j.core.config.plugins.Plugin;
28 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
29 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
30 import org.apache.logging.log4j.core.config.plugins.PluginElement;
31 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
32 import org.apache.logging.log4j.core.layout.Rfc5424Layout;
33 import org.apache.logging.log4j.core.net.Facility;
34 import org.apache.logging.log4j.core.util.Booleans;
35 import org.apache.logging.log4j.core.util.Integers;
36
37
38
39
40 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
41 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
42
43 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
44 private static final int DEFAULT_MAX_DELAY = 60000;
45
46 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
47
48 private final AbstractFlumeManager manager;
49
50 private final String mdcIncludes;
51 private final String mdcExcludes;
52 private final String mdcRequired;
53
54 private final String eventPrefix;
55
56 private final String mdcPrefix;
57
58 private final boolean compressBody;
59
60 private final FlumeEventFactory factory;
61
62
63
64
65 private enum ManagerType {
66 AVRO, EMBEDDED, PERSISTENT;
67
68 public static ManagerType getType(final String type) {
69 return valueOf(type.toUpperCase(Locale.US));
70 }
71 }
72
73 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
74 final boolean ignoreExceptions, final String includes, final String excludes,
75 final String required, final String mdcPrefix, final String eventPrefix,
76 final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
77 super(name, filter, layout, ignoreExceptions);
78 this.manager = manager;
79 this.mdcIncludes = includes;
80 this.mdcExcludes = excludes;
81 this.mdcRequired = required;
82 this.eventPrefix = eventPrefix;
83 this.mdcPrefix = mdcPrefix;
84 this.compressBody = compress;
85 this.factory = factory == null ? this : factory;
86 }
87
88
89
90
91
92 @Override
93 public void append(final LogEvent event) {
94 final String name = event.getLoggerName();
95 if (name != null) {
96 for (final String pkg : EXCLUDED_PACKAGES) {
97 if (name.startsWith(pkg)) {
98 return;
99 }
100 }
101 }
102 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
103 eventPrefix, compressBody);
104 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
105 manager.send(flumeEvent);
106 }
107
108 @Override
109 public void stop() {
110 super.stop();
111 manager.release();
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125 @Override
126 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
127 final String required, final String mdcPrefix, final String eventPrefix,
128 final boolean compress) {
129 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
130 eventPrefix, compressBody);
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 @PluginFactory
164 public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
165 @PluginElement("Properties") final Property[] properties,
166 @PluginAttribute("embedded") final String embedded,
167 @PluginAttribute("type") final String type,
168 @PluginAttribute("dataDir") final String dataDir,
169 @PluginAliases("connectTimeout")
170 @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
171 @PluginAliases("requestTimeout")
172 @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
173 @PluginAttribute("agentRetries") final String agentRetries,
174 @PluginAliases("maxDelay")
175 @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
176 @PluginAttribute("name") final String name,
177 @PluginAttribute("ignoreExceptions") final String ignore,
178 @PluginAttribute("mdcExcludes") final String excludes,
179 @PluginAttribute("mdcIncludes") final String includes,
180 @PluginAttribute("mdcRequired") final String required,
181 @PluginAttribute("mdcPrefix") final String mdcPrefix,
182 @PluginAttribute("eventPrefix") final String eventPrefix,
183 @PluginAttribute("compress") final String compressBody,
184 @PluginAttribute("batchSize") final String batchSize,
185 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
186 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
187 @PluginElement("Layout") Layout<? extends Serializable> layout,
188 @PluginElement("Filter") final Filter filter) {
189
190 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
191 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
192 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
193 final boolean compress = Booleans.parseBoolean(compressBody, true);
194 ManagerType managerType;
195 if (type != null) {
196 if (embed && embedded != null) {
197 try {
198 managerType = ManagerType.getType(type);
199 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
200 } catch (final Exception ex) {
201 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
202 " is invalid.");
203 managerType = ManagerType.EMBEDDED;
204 }
205 } else {
206 try {
207 managerType = ManagerType.getType(type);
208 } catch (final Exception ex) {
209 LOGGER.warn("Type " + type + " is invalid.");
210 managerType = ManagerType.EMBEDDED;
211 }
212 }
213 } else if (embed) {
214 managerType = ManagerType.EMBEDDED;
215 } else {
216 managerType = ManagerType.AVRO;
217 }
218
219 final int batchCount = Integers.parseInt(batchSize, 1);
220 final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
221 final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
222 final int retries = Integers.parseInt(agentRetries, 0);
223 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
224 final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);
225
226 if (layout == null) {
227 final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
228 layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
229 mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
230 null);
231 }
232
233 if (name == null) {
234 LOGGER.error("No name provided for Appender");
235 return null;
236 }
237
238 AbstractFlumeManager manager;
239
240 switch (managerType) {
241 case EMBEDDED:
242 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
243 break;
244 case AVRO:
245 if (agents == null || agents.length == 0) {
246 LOGGER.debug("No agents provided, using defaults");
247 agents = new Agent[] {Agent.createAgent(null, null)};
248 }
249 manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
250 break;
251 case PERSISTENT:
252 if (agents == null || agents.length == 0) {
253 LOGGER.debug("No agents provided, using defaults");
254 agents = new Agent[] {Agent.createAgent(null, null)};
255 }
256 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
257 connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
258 break;
259 default:
260 LOGGER.debug("No manager type specified. Defaulting to AVRO");
261 if (agents == null || agents.length == 0) {
262 LOGGER.debug("No agents provided, using defaults");
263 agents = new Agent[] {Agent.createAgent(null, null)};
264 }
265 manager = FlumeAvroManager.getManager(name, agents, batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
266 }
267
268 if (manager == null) {
269 return null;
270 }
271
272 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes,
273 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
274 }
275 }