1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.io.Serializable;
21 import java.nio.charset.StandardCharsets;
22
23 import org.apache.logging.log4j.core.Appender;
24 import org.apache.logging.log4j.core.Filter;
25 import org.apache.logging.log4j.core.Layout;
26 import org.apache.logging.log4j.core.LogEvent;
27 import org.apache.logging.log4j.core.appender.AbstractAppender;
28 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
29 import org.apache.logging.log4j.core.config.Node;
30 import org.apache.logging.log4j.core.config.Property;
31 import org.apache.logging.log4j.core.config.plugins.Plugin;
32 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
33 import org.apache.logging.log4j.core.config.plugins.PluginElement;
34 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
35 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
36 import org.apache.logging.log4j.core.layout.SerializedLayout;
37 import org.apache.logging.log4j.core.util.StringEncoder;
38
39
40
41
42 @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
43 public final class KafkaAppender extends AbstractAppender {
44
45 @PluginFactory
46 public static KafkaAppender createAppender(
47 @PluginElement("Layout") final Layout<? extends Serializable> layout,
48 @PluginElement("Filter") final Filter filter,
49 @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
50 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
51 @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
52 @PluginElement("Properties") final Property[] properties) {
53 final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
54 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
55 }
56
57 private final KafkaManager manager;
58
59 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
60 super(name, filter, layout, ignoreExceptions);
61 this.manager = manager;
62 }
63
64 @Override
65 public void append(final LogEvent event) {
66 if (event.getLoggerName().startsWith("org.apache.kafka")) {
67 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
68 } else {
69 try {
70 final Layout<? extends Serializable> layout = getLayout();
71 byte[] data;
72 if (layout != null) {
73 if (layout instanceof SerializedLayout) {
74 final byte[] header = layout.getHeader();
75 final byte[] body = layout.toByteArray(event);
76 data = new byte[header.length + body.length];
77 System.arraycopy(header, 0, data, 0, header.length);
78 System.arraycopy(body, 0, data, header.length, body.length);
79 } else {
80 data = layout.toByteArray(event);
81 }
82 } else {
83 data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
84 }
85 manager.send(data);
86 } catch (final Exception e) {
87 LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
88 throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
89 }
90 }
91 }
92
93 @Override
94 public void start() {
95 super.start();
96 manager.startup();
97 }
98
99 @Override
100 public void stop() {
101 super.stop();
102 manager.release();
103 }
104
105 @Override
106 public String toString() {
107 return "KafkaAppender{" +
108 "name=" + getName() +
109 ", state=" + getState() +
110 ", topic=" + manager.getTopic() +
111 '}';
112 }
113 }