1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.flume.event.SimpleEvent;
20 import org.apache.logging.log4j.Level;
21 import org.apache.logging.log4j.LoggingException;
22 import org.apache.logging.log4j.Marker;
23 import org.apache.logging.log4j.ThreadContext;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.helpers.UUIDUtil;
26 import org.apache.logging.log4j.message.MapMessage;
27 import org.apache.logging.log4j.message.Message;
28 import org.apache.logging.log4j.message.StructuredDataId;
29 import org.apache.logging.log4j.message.StructuredDataMessage;
30
31 import java.io.ByteArrayOutputStream;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.zip.GZIPOutputStream;
38
39
40
41
42 public class FlumeEvent extends SimpleEvent implements LogEvent {
43
44
45
46
47 private static final long serialVersionUID = -8988674608627854140L;
48
49 private static final String DEFAULT_MDC_PREFIX = "mdc:";
50
51 private static final String DEFAULT_EVENT_PREFIX = "";
52
53 private static final String EVENT_TYPE = "eventType";
54
55 private static final String EVENT_ID = "eventId";
56
57 static final String GUID = "guId";
58
59 private static final String TIMESTAMP = "timeStamp";;
60
61 private final LogEvent event;
62
63 private final Map<String, String> ctx = new HashMap<String, String>();
64
65 private final boolean compress;
66
67
68
69
70
71
72
73
74
75
76
77 public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
78 String mdcPrefix, String eventPrefix, final boolean compress) {
79 this.event = event;
80 this.compress = compress;
81 final Map<String, String> headers = getHeaders();
82 headers.put(TIMESTAMP, Long.toString(event.getMillis()));
83 if (mdcPrefix == null) {
84 mdcPrefix = DEFAULT_MDC_PREFIX;
85 }
86 if (eventPrefix == null) {
87 eventPrefix = DEFAULT_EVENT_PREFIX;
88 }
89 final Map<String, String> mdc = event.getContextMap();
90 if (includes != null) {
91 final String[] array = includes.split(",");
92 if (array.length > 0) {
93 for (String str : array) {
94 str = str.trim();
95 if (mdc.containsKey(str)) {
96 ctx.put(str, mdc.get(str));
97 }
98 }
99 }
100 } else if (excludes != null) {
101 final String[] array = excludes.split(",");
102 if (array.length > 0) {
103 final List<String> list = new ArrayList<String>(array.length);
104 for (final String value : array) {
105 list.add(value.trim());
106 }
107 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
108 if (!list.contains(entry.getKey())) {
109 ctx.put(entry.getKey(), entry.getValue());
110 }
111 }
112 }
113 } else {
114 ctx.putAll(mdc);
115 }
116
117 if (required != null) {
118 final String[] array = required.split(",");
119 if (array.length > 0) {
120 for (String str : array) {
121 str = str.trim();
122 if (!mdc.containsKey(str)) {
123 throw new LoggingException("Required key " + str + " is missing from the MDC");
124 }
125 }
126 }
127 }
128 final String guid = UUIDUtil.getTimeBasedUUID().toString();
129 final Message message = event.getMessage();
130 if (message instanceof MapMessage) {
131 ((MapMessage) message).put(GUID, guid);
132 if (message instanceof StructuredDataMessage) {
133 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
134 }
135 addMapData(eventPrefix, headers, (MapMessage) message);
136 }
137
138 addContextData(mdcPrefix, headers, ctx);
139 }
140
141 protected void addStructuredData(final String prefix, final Map<String, String> fields,
142 final StructuredDataMessage msg) {
143 fields.put(prefix + EVENT_TYPE, msg.getType());
144 final StructuredDataId id = msg.getId();
145 fields.put(prefix + EVENT_ID, id.getName());
146 }
147
148 protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage msg) {
149 final Map<String, String> data = msg.getData();
150 for (final Map.Entry<String, String> entry : data.entrySet()) {
151 fields.put(prefix + entry.getKey(), entry.getValue());
152 }
153 }
154
155 protected void addContextData(final String prefix, final Map<String, String> fields,
156 final Map<String, String> context) {
157 for (final Map.Entry<String, String> entry : context.entrySet()) {
158 if (entry.getKey() != null && entry.getValue() != null) {
159 fields.put(prefix + entry.getKey(), entry.getValue());
160 }
161 }
162 }
163
164
165
166
167
168 @Override
169 public void setBody(final byte[] body) {
170 if (body == null || body.length == 0) {
171 super.setBody(new byte[0]);
172 return;
173 }
174 if (compress) {
175 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
176 try {
177 final GZIPOutputStream os = new GZIPOutputStream(baos);
178 os.write(body);
179 os.close();
180 } catch (final IOException ioe) {
181 throw new LoggingException("Unable to compress message", ioe);
182 }
183 super.setBody(baos.toByteArray());
184 } else {
185 super.setBody(body);
186 }
187 }
188
189
190
191
192
193 public String getFQCN() {
194 return event.getFQCN();
195 }
196
197
198
199
200
201 public Level getLevel() {
202 return event.getLevel();
203 }
204
205
206
207
208
209 public String getLoggerName() {
210 return event.getLoggerName();
211 }
212
213
214
215
216
217 public StackTraceElement getSource() {
218 return event.getSource();
219 }
220
221
222
223
224
225 public Message getMessage() {
226 return event.getMessage();
227 }
228
229
230
231
232
233 public Marker getMarker() {
234 return event.getMarker();
235 }
236
237
238
239
240
241 public String getThreadName() {
242 return event.getThreadName();
243 }
244
245
246
247
248
249 public long getMillis() {
250 return event.getMillis();
251 }
252
253
254
255
256
257 public Throwable getThrown() {
258 return event.getThrown();
259 }
260
261
262
263
264
265 public Map<String, String> getContextMap() {
266 return ctx;
267 }
268
269
270
271
272
273 public ThreadContext.ContextStack getContextStack() {
274 return event.getContextStack();
275 }
276
277 @Override
278 public boolean isIncludeLocation() {
279 return event.isIncludeLocation();
280 }
281
282 @Override
283 public void setIncludeLocation(boolean includeLocation) {
284 event.setIncludeLocation(includeLocation);
285 }
286
287 @Override
288 public boolean isEndOfBatch() {
289 return event.isEndOfBatch();
290 }
291
292 @Override
293 public void setEndOfBatch(boolean endOfBatch) {
294 event.setEndOfBatch(endOfBatch);
295 }
296 }