1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.zip.GZIPOutputStream;
26
27 import org.apache.flume.event.SimpleEvent;
28 import org.apache.logging.log4j.Level;
29 import org.apache.logging.log4j.LoggingException;
30 import org.apache.logging.log4j.Marker;
31 import org.apache.logging.log4j.ThreadContext;
32 import org.apache.logging.log4j.core.LogEvent;
33 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
34 import org.apache.logging.log4j.core.impl.ThrowableProxy;
35 import org.apache.logging.log4j.core.util.Patterns;
36 import org.apache.logging.log4j.core.util.UuidUtil;
37 import org.apache.logging.log4j.message.MapMessage;
38 import org.apache.logging.log4j.message.Message;
39 import org.apache.logging.log4j.message.StructuredDataId;
40 import org.apache.logging.log4j.message.StructuredDataMessage;
41 import org.apache.logging.log4j.util.ReadOnlyStringMap;
42 import org.apache.logging.log4j.util.Strings;
43
44
45
46
47 public class FlumeEvent extends SimpleEvent implements LogEvent {
48
49 static final String GUID = "guId";
50
51
52
53 private static final long serialVersionUID = -8988674608627854140L;
54
55 private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
56
57 private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
58
59 private static final String EVENT_TYPE = "eventType";
60
61 private static final String EVENT_ID = "eventId";
62
63 private static final String TIMESTAMP = "timeStamp";
64
65 private final LogEvent event;
66
67 private final Map<String, String> contextMap = new HashMap<>();
68
69 private final boolean compress;
70
71
72
73
74
75
76
77
78
79
80
81 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
82 String mdcPrefix, String eventPrefix, final boolean compress) {
83 this.event = event;
84 this.compress = compress;
85 final Map<String, String> headers = getHeaders();
86 headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
87 if (mdcPrefix == null) {
88 mdcPrefix = DEFAULT_MDC_PREFIX;
89 }
90 if (eventPrefix == null) {
91 eventPrefix = DEFAULT_EVENT_PREFIX;
92 }
93 final Map<String, String> mdc = event.getContextData().toMap();
94 if (includes != null) {
95 final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
96 if (array.length > 0) {
97 for (String str : array) {
98 str = str.trim();
99 if (mdc.containsKey(str)) {
100 contextMap.put(str, mdc.get(str));
101 }
102 }
103 }
104 } else if (excludes != null) {
105 final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
106 if (array.length > 0) {
107 final List<String> list = new ArrayList<>(array.length);
108 for (final String value : array) {
109 list.add(value.trim());
110 }
111 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
112 if (!list.contains(entry.getKey())) {
113 contextMap.put(entry.getKey(), entry.getValue());
114 }
115 }
116 }
117 } else {
118 contextMap.putAll(mdc);
119 }
120
121 if (required != null) {
122 final String[] array = required.split(Patterns.COMMA_SEPARATOR);
123 if (array.length > 0) {
124 for (String str : array) {
125 str = str.trim();
126 if (!mdc.containsKey(str)) {
127 throw new LoggingException("Required key " + str + " is missing from the MDC");
128 }
129 }
130 }
131 }
132 final String guid = UuidUtil.getTimeBasedUuid().toString();
133 final Message message = event.getMessage();
134 if (message instanceof MapMessage) {
135
136 @SuppressWarnings("unchecked")
137 final
138 MapMessage<?, String> stringMapMessage = (MapMessage<?, String>) message;
139 stringMapMessage.put(GUID, guid);
140 if (message instanceof StructuredDataMessage) {
141 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
142 }
143 addMapData(eventPrefix, headers, stringMapMessage);
144 } else {
145 headers.put(GUID, guid);
146 }
147
148 addContextData(mdcPrefix, headers, contextMap);
149 }
150
151 protected void addStructuredData(final String prefix, final Map<String, String> fields,
152 final StructuredDataMessage msg) {
153 fields.put(prefix + EVENT_TYPE, msg.getType());
154 final StructuredDataId id = msg.getId();
155 fields.put(prefix + EVENT_ID, id.getName());
156 }
157
158 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage<?, String> msg) {
159 final Map<String, String> data = msg.getData();
160 for (final Map.Entry<String, String> entry : data.entrySet()) {
161 fields.put(prefix + entry.getKey(), entry.getValue());
162 }
163 }
164
165 protected void addContextData(final String prefix, final Map<String, String> fields,
166 final Map<String, String> context) {
167 final Map<String, String> map = new HashMap<>();
168 for (final Map.Entry<String, String> entry : context.entrySet()) {
169 if (entry.getKey() != null && entry.getValue() != null) {
170 fields.put(prefix + entry.getKey(), entry.getValue());
171 map.put(prefix + entry.getKey(), entry.getValue());
172 }
173 }
174 context.clear();
175 context.putAll(map);
176 }
177
178 @Override
179 public LogEvent toImmutable() {
180 return Log4jLogEvent.createMemento(this);
181 }
182
183
184
185
186
187 @Override
188 public void setBody(final byte[] body) {
189 if (body == null || body.length == 0) {
190 super.setBody(new byte[0]);
191 return;
192 }
193 if (compress) {
194 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
195 try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
196 os.write(body);
197 } catch (final IOException ioe) {
198 throw new LoggingException("Unable to compress message", ioe);
199 }
200 super.setBody(baos.toByteArray());
201 } else {
202 super.setBody(body);
203 }
204 }
205
206
207
208
209
210 @Override
211 public String getLoggerFqcn() {
212 return event.getLoggerFqcn();
213 }
214
215
216
217
218
219 @Override
220 public Level getLevel() {
221 return event.getLevel();
222 }
223
224
225
226
227
228 @Override
229 public String getLoggerName() {
230 return event.getLoggerName();
231 }
232
233
234
235
236
237 @Override
238 public StackTraceElement getSource() {
239 return event.getSource();
240 }
241
242
243
244
245
246 @Override
247 public Message getMessage() {
248 return event.getMessage();
249 }
250
251
252
253
254
255 @Override
256 public Marker getMarker() {
257 return event.getMarker();
258 }
259
260
261
262
263
264 @Override
265 public long getThreadId() {
266 return event.getThreadId();
267 }
268
269
270
271
272
273 @Override
274 public int getThreadPriority() {
275 return event.getThreadPriority();
276 }
277
278
279
280
281
282 @Override
283 public String getThreadName() {
284 return event.getThreadName();
285 }
286
287
288
289
290
291 @Override
292 public long getTimeMillis() {
293 return event.getTimeMillis();
294 }
295
296
297
298
299
300
301 @Override
302 public long getNanoTime() {
303 return event.getNanoTime();
304 }
305
306
307
308
309
310 @Override
311 public Throwable getThrown() {
312 return event.getThrown();
313 }
314
315
316
317
318
319 @Override
320 public ThrowableProxy getThrownProxy() {
321 return event.getThrownProxy();
322 }
323
324
325
326
327
328 @Override
329 public Map<String, String> getContextMap() {
330 return contextMap;
331 }
332
333
334
335
336
337 @Override
338 public ReadOnlyStringMap getContextData() {
339 return event.getContextData();
340 }
341
342
343
344
345
346 @Override
347 public ThreadContext.ContextStack getContextStack() {
348 return event.getContextStack();
349 }
350
351 @Override
352 public boolean isIncludeLocation() {
353 return event.isIncludeLocation();
354 }
355
356 @Override
357 public void setIncludeLocation(final boolean includeLocation) {
358 event.setIncludeLocation(includeLocation);
359 }
360
361 @Override
362 public boolean isEndOfBatch() {
363 return event.isEndOfBatch();
364 }
365
366 @Override
367 public void setEndOfBatch(final boolean endOfBatch) {
368 event.setEndOfBatch(endOfBatch);
369 }
370
371 }