1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import org.apache.logging.log4j.core.Filter;
21 import org.apache.logging.log4j.core.Layout;
22 import org.apache.logging.log4j.core.LogEvent;
23 import org.apache.logging.log4j.core.appender.AbstractAppender;
24 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
25 import org.apache.logging.log4j.core.config.Property;
26 import org.apache.logging.log4j.core.config.plugins.Plugin;
27 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
28 import org.apache.logging.log4j.core.config.plugins.PluginElement;
29 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
30 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
31 import org.apache.logging.log4j.core.layout.SerializedLayout;
32 import org.apache.logging.log4j.core.util.StringEncoder;
33
34 import java.io.Serializable;
35 import java.nio.charset.StandardCharsets;
36
37
38
39
40 @Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
41 public final class KafkaAppender extends AbstractAppender {
42
43
44
45
46 private static final long serialVersionUID = 1L;
47 @PluginFactory
48 public static KafkaAppender createAppender(
49 @PluginElement("Layout") final Layout<? extends Serializable> layout,
50 @PluginElement("Filter") final Filter filter,
51 @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
52 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
53 @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
54 @PluginElement("Properties") final Property[] properties) {
55 final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
56 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
57 }
58
59 private final KafkaManager manager;
60
61 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
62 super(name, filter, layout, ignoreExceptions);
63 this.manager = manager;
64 }
65
66 @Override
67 public void append(final LogEvent event) {
68 if (event.getLoggerName().startsWith("org.apache.kafka")) {
69 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
70 } else {
71 try {
72 Layout<? extends Serializable> layout = getLayout();
73 byte[] data;
74 if (layout != null) {
75 if (layout instanceof SerializedLayout) {
76 byte[] header = layout.getHeader();
77 byte[] body = layout.toByteArray(event);
78 data = new byte[header.length + body.length];
79 System.arraycopy(header, 0, data, 0, header.length);
80 System.arraycopy(body, 0, data, header.length, body.length);
81 } else {
82 data = layout.toByteArray(event);
83 }
84 } else {
85 data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
86 }
87 manager.send(data);
88 } catch (final Exception e) {
89 LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
90 throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
91 }
92 }
93 }
94
95 @Override
96 public void start() {
97 super.start();
98 manager.startup();
99 }
100
101 @Override
102 public void stop() {
103 super.stop();
104 manager.release();
105 }
106
107 }