View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayOutputStream;
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.zip.GZIPOutputStream;
26  
27  import org.apache.flume.event.SimpleEvent;
28  import org.apache.logging.log4j.Level;
29  import org.apache.logging.log4j.LoggingException;
30  import org.apache.logging.log4j.Marker;
31  import org.apache.logging.log4j.ThreadContext;
32  import org.apache.logging.log4j.core.LogEvent;
33  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
34  import org.apache.logging.log4j.core.impl.ThrowableProxy;
35  import org.apache.logging.log4j.core.util.Patterns;
36  import org.apache.logging.log4j.core.util.UuidUtil;
37  import org.apache.logging.log4j.message.MapMessage;
38  import org.apache.logging.log4j.message.Message;
39  import org.apache.logging.log4j.message.StructuredDataId;
40  import org.apache.logging.log4j.message.StructuredDataMessage;
41  import org.apache.logging.log4j.util.ReadOnlyStringMap;
42  import org.apache.logging.log4j.util.Strings;
43  
44  /**
45   * Class that is both a Flume and Log4j Event.
46   */
47  public class FlumeEvent extends SimpleEvent implements LogEvent {
48  
49      static final String GUID = "guId";
50      /**
51       * Generated serial version ID.
52       */
53      private static final long serialVersionUID = -8988674608627854140L;
54  
55      private static final String DEFAULT_MDC_PREFIX = Strings.EMPTY;
56  
57      private static final String DEFAULT_EVENT_PREFIX = Strings.EMPTY;
58  
59      private static final String EVENT_TYPE = "eventType";
60  
61      private static final String EVENT_ID = "eventId";
62  
63      private static final String TIMESTAMP = "timeStamp";
64  
65      private final LogEvent event;
66  
67      private final Map<String, String> contextMap = new HashMap<>();
68  
69      private final boolean compress;
70  
71      /**
72       * Construct the FlumeEvent.
73       * @param event The Log4j LogEvent.
74       * @param includes A comma separated list of MDC elements to include.
75       * @param excludes A comma separated list of MDC elements to exclude.
76       * @param required A comma separated list of MDC elements that are required to be defined.
77       * @param mdcPrefix The value to prefix to MDC keys.
78       * @param eventPrefix The value to prefix to event keys.
79       * @param compress If true the event body should be compressed.
80       */
81      public FlumeEvent(final LogEvent event, final String includes, final String excludes, final String required,
82                        String mdcPrefix, String eventPrefix, final boolean compress) {
83          this.event = event;
84          this.compress = compress;
85          final Map<String, String> headers = getHeaders();
86          headers.put(TIMESTAMP, Long.toString(event.getTimeMillis()));
87          if (mdcPrefix == null) {
88              mdcPrefix = DEFAULT_MDC_PREFIX;
89          }
90          if (eventPrefix == null) {
91              eventPrefix = DEFAULT_EVENT_PREFIX;
92          }
93          final Map<String, String> mdc = event.getContextData().toMap();
94          if (includes != null) {
95              final String[] array = includes.split(Patterns.COMMA_SEPARATOR);
96              if (array.length > 0) {
97                  for (String str : array) {
98                      str = str.trim();
99                      if (mdc.containsKey(str)) {
100                         contextMap.put(str, mdc.get(str));
101                     }
102                 }
103             }
104         } else if (excludes != null) {
105             final String[] array = excludes.split(Patterns.COMMA_SEPARATOR);
106             if (array.length > 0) {
107                 final List<String> list = new ArrayList<>(array.length);
108                 for (final String value : array) {
109                     list.add(value.trim());
110                 }
111                 for (final Map.Entry<String, String> entry : mdc.entrySet()) {
112                     if (!list.contains(entry.getKey())) {
113                         contextMap.put(entry.getKey(), entry.getValue());
114                     }
115                 }
116             }
117         } else {
118             contextMap.putAll(mdc);
119         }
120 
121         if (required != null) {
122             final String[] array = required.split(Patterns.COMMA_SEPARATOR);
123             if (array.length > 0) {
124                 for (String str : array) {
125                     str = str.trim();
126                     if (!mdc.containsKey(str)) {
127                         throw new LoggingException("Required key " + str + " is missing from the MDC");
128                     }
129                 }
130             }
131         }
132         final String guid =  UuidUtil.getTimeBasedUuid().toString();
133         final Message message = event.getMessage();
134         if (message instanceof MapMessage) {
135             // Add the guid to the Map so that it can be included in the Layout.
136         	@SuppressWarnings("unchecked")
137             final
138 			MapMessage<?, String> stringMapMessage = (MapMessage<?, String>) message;
139         	stringMapMessage.put(GUID, guid);
140             if (message instanceof StructuredDataMessage) {
141                 addStructuredData(eventPrefix, headers, (StructuredDataMessage) message);
142             }
143             addMapData(eventPrefix, headers, stringMapMessage);
144         } else {
145             headers.put(GUID, guid);
146         }
147 
148         addContextData(mdcPrefix, headers, contextMap);
149     }
150 
151     protected void addStructuredData(final String prefix, final Map<String, String> fields,
152                                      final StructuredDataMessage msg) {
153         fields.put(prefix + EVENT_TYPE, msg.getType());
154         final StructuredDataId id = msg.getId();
155         fields.put(prefix + EVENT_ID, id.getName());
156     }
157 
158     protected void addMapData(final String prefix, final Map<String, String> fields, final MapMessage<?, String> msg) {
159         final Map<String, String> data = msg.getData();
160         for (final Map.Entry<String, String> entry : data.entrySet()) {
161             fields.put(prefix + entry.getKey(), entry.getValue());
162         }
163     }
164 
165     protected void addContextData(final String prefix, final Map<String, String> fields,
166                                   final Map<String, String> context) {
167         final Map<String, String> map = new HashMap<>();
168         for (final Map.Entry<String, String> entry : context.entrySet()) {
169             if (entry.getKey() != null && entry.getValue() != null) {
170                 fields.put(prefix + entry.getKey(), entry.getValue());
171                 map.put(prefix + entry.getKey(), entry.getValue());
172             }
173         }
174         context.clear();
175         context.putAll(map);
176     }
177 
178 	@Override
179 	public LogEvent toImmutable() {
180 		return Log4jLogEvent.createMemento(this);
181 	}
182 
183     /**
184      * Set the body in the event.
185      * @param body The body to add to the event.
186      */
187     @Override
188     public void setBody(final byte[] body) {
189         if (body == null || body.length == 0) {
190             super.setBody(new byte[0]);
191             return;
192         }
193         if (compress) {
194             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
195             try (GZIPOutputStream os = new GZIPOutputStream(baos)) {
196                 os.write(body);
197             } catch (final IOException ioe) {
198                 throw new LoggingException("Unable to compress message", ioe);
199             }
200             super.setBody(baos.toByteArray());
201         } else {
202             super.setBody(body);
203         }
204     }
205 
206     /**
207      * Get the Frequently Qualified Class Name.
208      * @return the FQCN String.
209      */
210     @Override
211     public String getLoggerFqcn() {
212         return event.getLoggerFqcn();
213     }
214 
215     /**
216      * Returns the logging Level.
217      * @return the Level.
218      */
219     @Override
220     public Level getLevel() {
221         return event.getLevel();
222     }
223 
224     /**
225      * Returns the logger name.
226      * @return the logger name.
227      */
228     @Override
229     public String getLoggerName() {
230         return event.getLoggerName();
231     }
232 
233     /**
234      * Returns the StackTraceElement for the caller of the logging API.
235      * @return the StackTraceElement of the caller.
236      */
237     @Override
238     public StackTraceElement getSource() {
239         return event.getSource();
240     }
241 
242     /**
243      * Returns the Message.
244      * @return the Message.
245      */
246     @Override
247     public Message getMessage() {
248         return event.getMessage();
249     }
250 
251     /**
252      * Returns the Marker.
253      * @return the Marker.
254      */
255     @Override
256     public Marker getMarker() {
257         return event.getMarker();
258     }
259 
260     /**
261      * Returns the ID of the Thread.
262      * @return the ID of the Thread.
263      */
264     @Override
265     public long getThreadId() {
266         return event.getThreadId();
267     }
268 
269     /**
270      * Returns the priority of the Thread.
271      * @return the priority of the Thread.
272      */
273     @Override
274     public int getThreadPriority() {
275         return event.getThreadPriority();
276     }
277 
278     /**
279      * Returns the name of the Thread.
280      * @return the name of the Thread.
281      */
282     @Override
283     public String getThreadName() {
284         return event.getThreadName();
285     }
286 
287     /**
288      * Returns the event timestamp.
289      * @return the event timestamp.
290      */
291     @Override
292     public long getTimeMillis() {
293         return event.getTimeMillis();
294     }
295 
296     /**
297      * Returns the value of the running Java Virtual Machine's high-resolution time source when this event was created,
298      * or a dummy value if it is known that this value will not be used downstream.
299      * @return the event nanosecond timestamp.
300      */
301     @Override
302     public long getNanoTime() {
303         return event.getNanoTime();
304     }
305 
306     /**
307      * Returns the Throwable associated with the event, if any.
308      * @return the Throwable.
309      */
310     @Override
311     public Throwable getThrown() {
312         return event.getThrown();
313     }
314 
315     /**
316      * Returns the Throwable associated with the event, if any.
317      * @return the Throwable.
318      */
319     @Override
320     public ThrowableProxy getThrownProxy() {
321         return event.getThrownProxy();
322     }
323 
324     /**
325      * Returns a copy of the context Map.
326      * @return a copy of the context Map.
327      */
328     @Override
329     public Map<String, String> getContextMap() {
330         return contextMap;
331     }
332 
333     /**
334      * Returns the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
335      * @return the context data of the {@code LogEvent} that this {@code FlumeEvent} was constructed with.
336      */
337     @Override
338     public ReadOnlyStringMap getContextData() {
339         return event.getContextData();
340     }
341 
342     /**
343      * Returns a copy of the context stack.
344      * @return a copy of the context stack.
345      */
346     @Override
347     public ThreadContext.ContextStack getContextStack() {
348         return event.getContextStack();
349     }
350 
351     @Override
352     public boolean isIncludeLocation() {
353         return event.isIncludeLocation();
354     }
355 
356     @Override
357     public void setIncludeLocation(final boolean includeLocation) {
358         event.setIncludeLocation(includeLocation);
359     }
360 
361     @Override
362     public boolean isEndOfBatch() {
363         return event.isEndOfBatch();
364     }
365 
366     @Override
367     public void setEndOfBatch(final boolean endOfBatch) {
368         event.setEndOfBatch(endOfBatch);
369     }
370 
371 }