1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.logging.log4j.core.Filter;
20 import org.apache.logging.log4j.core.Layout;
21 import org.apache.logging.log4j.core.LogEvent;
22 import org.apache.logging.log4j.core.appender.AbstractAppender;
23 import org.apache.logging.log4j.core.config.Property;
24 import org.apache.logging.log4j.core.config.plugins.Plugin;
25 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26 import org.apache.logging.log4j.core.config.plugins.PluginElement;
27 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28 import org.apache.logging.log4j.core.layout.RFC5424Layout;
29
30 import java.io.Serializable;
31 import java.util.Locale;
32
33
34
35
36 @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
37 public final class FlumeAppender<T extends Serializable> extends AbstractAppender<T> implements FlumeEventFactory {
38
39 private final AbstractFlumeManager manager;
40
41 private final String mdcIncludes;
42 private final String mdcExcludes;
43 private final String mdcRequired;
44
45 private final String eventPrefix;
46
47 private final String mdcPrefix;
48
49 private final boolean compressBody;
50
51 private final FlumeEventFactory factory;
52
53 private enum ManagerType {
54 AVRO, EMBEDDED, PERSISTENT;
55
56 public static ManagerType getType(String type) {
57 return valueOf(type.toUpperCase(Locale.US));
58 }
59 }
60
61 private FlumeAppender(final String name, final Filter filter, final Layout<T> layout, final boolean handleException,
62 final String includes, final String excludes, final String required, final String mdcPrefix,
63 final String eventPrefix, final boolean compress,
64 final FlumeEventFactory factory, final AbstractFlumeManager manager) {
65 super(name, filter, layout, handleException);
66 this.manager = manager;
67 this.mdcIncludes = includes;
68 this.mdcExcludes = excludes;
69 this.mdcRequired = required;
70 this.eventPrefix = eventPrefix;
71 this.mdcPrefix = mdcPrefix;
72 this.compressBody = compress;
73 this.factory = factory == null ? this : factory;
74 }
75
76
77
78
79
80 public void append(final LogEvent event) {
81
82 final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
83 eventPrefix, compressBody);
84 flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
85 manager.send(flumeEvent);
86 }
87
88 @Override
89 public void stop() {
90 super.stop();
91 manager.release();
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105 public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
106 final String required, final String mdcPrefix, final String eventPrefix,
107 final boolean compress) {
108 return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
109 eventPrefix, compressBody);
110 }
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 @PluginFactory
140 public static <S extends Serializable> FlumeAppender<S> createAppender(@PluginElement("agents") Agent[] agents,
141 @PluginElement("properties") final Property[] properties,
142 @PluginAttr("embedded") final String embedded,
143 @PluginAttr("type") final String type,
144 @PluginAttr("dataDir") final String dataDir,
145 @PluginAttr("connectTimeout") final String connectionTimeout,
146 @PluginAttr("requestTimeout") final String requestTimeout,
147 @PluginAttr("agentRetries") final String agentRetries,
148 @PluginAttr("maxDelay") final String maxDelay,
149 @PluginAttr("name") final String name,
150 @PluginAttr("suppressExceptions") final String suppress,
151 @PluginAttr("mdcExcludes") final String excludes,
152 @PluginAttr("mdcIncludes") final String includes,
153 @PluginAttr("mdcRequired") final String required,
154 @PluginAttr("mdcPrefix") final String mdcPrefix,
155 @PluginAttr("eventPrefix") final String eventPrefix,
156 @PluginAttr("compress") final String compressBody,
157 @PluginAttr("batchSize") final String batchSize,
158 @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
159 @PluginElement("layout") Layout<S> layout,
160 @PluginElement("filters") final Filter filter) {
161
162 final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
163 (agents == null || agents.length == 0) && properties != null && properties.length > 0;
164 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
165 final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
166 ManagerType managerType;
167 if (type != null) {
168 if (embed && embedded != null) {
169 try {
170 managerType = ManagerType.getType(type);
171 LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
172 } catch (Exception ex) {
173 LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
174 managerType = ManagerType.EMBEDDED;
175 }
176 } else {
177 try {
178 managerType = ManagerType.getType(type);
179 } catch (Exception ex) {
180 LOGGER.warn("Type " + type + " is invalid.");
181 managerType = ManagerType.EMBEDDED;
182 }
183 }
184 } else if (embed) {
185 managerType = ManagerType.EMBEDDED;
186 } else {
187 managerType = ManagerType.AVRO;
188 }
189
190 final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
191 final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
192 final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
193 final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
194 final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
195
196 if (layout == null) {
197 @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
198 Layout<S> l = (Layout<S>)RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix,
199 null, null, null, excludes, includes, required, null, null, null, null);
200 layout = l;
201 }
202
203 if (name == null) {
204 LOGGER.error("No name provided for Appender");
205 return null;
206 }
207
208 AbstractFlumeManager manager;
209
210 switch (managerType) {
211 case EMBEDDED:
212 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
213 break;
214 case AVRO:
215 if (agents == null || agents.length == 0) {
216 LOGGER.debug("No agents provided, using defaults");
217 agents = new Agent[] {Agent.createAgent(null, null)};
218 }
219 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
220 break;
221 case PERSISTENT:
222 if (agents == null || agents.length == 0) {
223 LOGGER.debug("No agents provided, using defaults");
224 agents = new Agent[] {Agent.createAgent(null, null)};
225 }
226 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
227 connectTimeout, reqTimeout, delay, dataDir);
228 break;
229 default:
230 LOGGER.debug("No manager type specified. Defaulting to AVRO");
231 if (agents == null || agents.length == 0) {
232 LOGGER.debug("No agents provided, using defaults");
233 agents = new Agent[] {Agent.createAgent(null, null)};
234 }
235 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
236 }
237
238 if (manager == null) {
239 return null;
240 }
241
242 return new FlumeAppender<S>(name, filter, layout, handleExceptions, includes,
243 excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
244 }
245 }