1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.io.Serializable;
21 import java.nio.charset.StandardCharsets;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.logging.log4j.core.Appender;
25 import org.apache.logging.log4j.core.Filter;
26 import org.apache.logging.log4j.core.Layout;
27 import org.apache.logging.log4j.core.LogEvent;
28 import org.apache.logging.log4j.core.appender.AbstractAppender;
29 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
30 import org.apache.logging.log4j.core.config.Configuration;
31 import org.apache.logging.log4j.core.config.Node;
32 import org.apache.logging.log4j.core.config.Property;
33 import org.apache.logging.log4j.core.config.plugins.Plugin;
34 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
35 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
36 import org.apache.logging.log4j.core.config.plugins.PluginElement;
37 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
38 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
39 import org.apache.logging.log4j.core.layout.SerializedLayout;
40 import org.apache.logging.log4j.core.util.StringEncoder;
41
42
43
44
45 @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
46 public final class KafkaAppender extends AbstractAppender {
47
48 @PluginFactory
49 public static KafkaAppender createAppender(
50 @PluginElement("Layout") final Layout<? extends Serializable> layout,
51 @PluginElement("Filter") final Filter filter,
52 @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
53 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
54 @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
55 @PluginElement("Properties") final Property[] properties,
56 @PluginConfiguration final Configuration configuration) {
57 final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties);
58 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
59 }
60
61 private final KafkaManager manager;
62
63 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
64 super(name, filter, layout, ignoreExceptions);
65 this.manager = manager;
66 }
67
68 @Override
69 public void append(final LogEvent event) {
70 if (event.getLoggerName().startsWith("org.apache.kafka")) {
71 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
72 } else {
73 try {
74 final Layout<? extends Serializable> layout = getLayout();
75 byte[] data;
76 if (layout != null) {
77 if (layout instanceof SerializedLayout) {
78 final byte[] header = layout.getHeader();
79 final byte[] body = layout.toByteArray(event);
80 data = new byte[header.length + body.length];
81 System.arraycopy(header, 0, data, 0, header.length);
82 System.arraycopy(body, 0, data, header.length, body.length);
83 } else {
84 data = layout.toByteArray(event);
85 }
86 } else {
87 data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
88 }
89 manager.send(data);
90 } catch (final Exception e) {
91 LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
92 throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
93 }
94 }
95 }
96
97 @Override
98 public void start() {
99 super.start();
100 manager.startup();
101 }
102
103 @Override
104 public boolean stop(final long timeout, final TimeUnit timeUnit) {
105 setStopping();
106 boolean stopped = super.stop(timeout, timeUnit, false);
107 stopped &= manager.stop(timeout, timeUnit);
108 setStopped();
109 return stopped;
110 }
111
112 @Override
113 public String toString() {
114 return "KafkaAppender{" +
115 "name=" + getName() +
116 ", state=" + getState() +
117 ", topic=" + manager.getTopic() +
118 '}';
119 }
120 }