1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.Serializable;
20 import java.util.Locale;
21 import java.util.concurrent.TimeUnit;
22
23 import org.apache.logging.log4j.core.Appender;
24 import org.apache.logging.log4j.core.Filter;
25 import org.apache.logging.log4j.core.Layout;
26 import org.apache.logging.log4j.core.LogEvent;
27 import org.apache.logging.log4j.core.appender.AbstractAppender;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.config.plugins.Plugin;
30 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
31 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
32 import org.apache.logging.log4j.core.config.plugins.PluginElement;
33 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
34 import org.apache.logging.log4j.core.layout.Rfc5424Layout;
35 import org.apache.logging.log4j.core.net.Facility;
36 import org.apache.logging.log4j.core.util.Booleans;
37 import org.apache.logging.log4j.core.util.Integers;
38
39
40
41
42 @Plugin(name = "Flume", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
43 public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
44
45 private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
46 private static final int DEFAULT_MAX_DELAY = 60000;
47
48 private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
49
50 private final AbstractFlumeManager manager;
51
52 private final String mdcIncludes;
53 private final String mdcExcludes;
54 private final String mdcRequired;
55
56 private final String eventPrefix;
57
58 private final String mdcPrefix;
59
60 private final boolean compressBody;
61
62 private final FlumeEventFactory factory;
63
64
65
66
67 private enum ManagerType {
68 AVRO, EMBEDDED, PERSISTENT;
69
70 public static ManagerType getType(final String type) {
71 return valueOf(type.toUpperCase(Locale.US));
72 }
73 }
74
75 private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
76 final boolean ignoreExceptions, final String includes, final String excludes,
77 final String required, final String mdcPrefix, final String eventPrefix,
78 final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
79 super(name, filter, layout, ignoreExceptions);
80 this.manager = manager;
81 this.mdcIncludes = includes;
82 this.mdcExcludes = excludes;
83 this.mdcRequired = required;
84 this.eventPrefix = eventPrefix;
85 this.mdcPrefix = mdcPrefix;
86 this.compressBody = compress;
87 this.factory = factory == null ? this : factory;
88 }
89
90
91
92
93
94 @Override
95 public void append(final LogEvent event) {
96 final String name = event.getLoggerName();
97 if (name != null) {
98 for (final String pkg : EXCLUDED_PACKAGES) {
99 if (name.startsWith(pkg)) {
100 return;
101 }
102 }
103 }
104 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
105 eventPrefix, compressBody);
106 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
107 manager.send(flumeEvent);
108 }
109
110 @Override
111 public boolean stop(final long timeout, final TimeUnit timeUnit) {
112 setStopping();
113 boolean stopped = super.stop(timeout, timeUnit, false);
114 stopped &= manager.stop(timeout, timeUnit);
115 setStopped();
116 return stopped;
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130 @Override
131 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
132 final String required, final String mdcPrefix, final String eventPrefix,
133 final boolean compress) {
134 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
135 eventPrefix, compressBody);
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168 @PluginFactory
169 public static FlumeAppender createAppender(@PluginElement("Agents") final Agent[] agents,
170 @PluginElement("Properties") final Property[] properties,
171 @PluginAttribute("hosts") final String hosts,
172 @PluginAttribute("embedded") final String embedded,
173 @PluginAttribute("type") final String type,
174 @PluginAttribute("dataDir") final String dataDir,
175 @PluginAliases("connectTimeout")
176 @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis,
177 @PluginAliases("requestTimeout")
178 @PluginAttribute("requestTimeoutMillis") final String requestTimeoutMillis,
179 @PluginAttribute("agentRetries") final String agentRetries,
180 @PluginAliases("maxDelay")
181 @PluginAttribute("maxDelayMillis") final String maxDelayMillis,
182 @PluginAttribute("name") final String name,
183 @PluginAttribute("ignoreExceptions") final String ignore,
184 @PluginAttribute("mdcExcludes") final String excludes,
185 @PluginAttribute("mdcIncludes") final String includes,
186 @PluginAttribute("mdcRequired") final String required,
187 @PluginAttribute("mdcPrefix") final String mdcPrefix,
188 @PluginAttribute("eventPrefix") final String eventPrefix,
189 @PluginAttribute("compress") final String compressBody,
190 @PluginAttribute("batchSize") final String batchSize,
191 @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
192 @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
193 @PluginElement("Layout") Layout<? extends Serializable> layout,
194 @PluginElement("Filter") final Filter filter) {
195
196 final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
197 (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) && properties != null && properties.length > 0;
198 final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
199 final boolean compress = Booleans.parseBoolean(compressBody, true);
200 ManagerType managerType;
201 if (type != null) {
202 if (embed && embedded != null) {
203 try {
204 managerType = ManagerType.getType(type);
205 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
206 } catch (final Exception ex) {
207 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
208 " is invalid.");
209 managerType = ManagerType.EMBEDDED;
210 }
211 } else {
212 try {
213 managerType = ManagerType.getType(type);
214 } catch (final Exception ex) {
215 LOGGER.warn("Type " + type + " is invalid.");
216 managerType = ManagerType.EMBEDDED;
217 }
218 }
219 } else if (embed) {
220 managerType = ManagerType.EMBEDDED;
221 } else {
222 managerType = ManagerType.AVRO;
223 }
224
225 final int batchCount = Integers.parseInt(batchSize, 1);
226 final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0);
227 final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0);
228 final int retries = Integers.parseInt(agentRetries, 0);
229 final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
230 final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY);
231
232 if (layout == null) {
233 final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
234 layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
235 mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
236 null);
237 }
238
239 if (name == null) {
240 LOGGER.error("No name provided for Appender");
241 return null;
242 }
243
244 AbstractFlumeManager manager;
245
246 switch (managerType) {
247 case EMBEDDED:
248 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
249 break;
250 case AVRO:
251 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
252 break;
253 case PERSISTENT:
254 manager = FlumePersistentManager.getManager(name, getAgents(agents, hosts), properties, batchCount, retries,
255 connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir);
256 break;
257 default:
258 LOGGER.debug("No manager type specified. Defaulting to AVRO");
259 manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis);
260 }
261
262 if (manager == null) {
263 return null;
264 }
265
266 return new FlumeAppender(name, filter, layout, ignoreExceptions, includes,
267 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
268 }
269
270 private static Agent[] getAgents(Agent[] agents, final String hosts) {
271 if (agents == null || agents.length == 0) {
272 if (hosts != null && !hosts.isEmpty()) {
273 LOGGER.debug("Parsing agents from hosts parameter");
274 final String[] hostports = hosts.split(",");
275 agents = new Agent[hostports.length];
276 for(int i = 0; i < hostports.length; ++i) {
277 final String[] h = hostports[i].split(":");
278 agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null);
279 }
280 } else {
281 LOGGER.debug("No agents provided, using defaults");
282 agents = new Agent[] {Agent.createAgent(null, null)};
283 }
284 }
285
286 LOGGER.debug("Using agents {}", agents);
287 return agents;
288 }
289 }