1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.io.Serializable;
21 import java.nio.charset.StandardCharsets;
22
23 import org.apache.logging.log4j.core.Filter;
24 import org.apache.logging.log4j.core.Layout;
25 import org.apache.logging.log4j.core.LogEvent;
26 import org.apache.logging.log4j.core.appender.AbstractAppender;
27 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.config.plugins.Plugin;
30 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
31 import org.apache.logging.log4j.core.config.plugins.PluginElement;
32 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
33 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
34
35
36
37
38 @Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
39 public final class KafkaAppender extends AbstractAppender {
40
41
42
43
44 private static final long serialVersionUID = 1L;
45 @PluginFactory
46 public static KafkaAppender createAppender(
47 @PluginElement("Layout") final Layout<? extends Serializable> layout,
48 @PluginElement("Filter") final Filter filter,
49 @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
50 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
51 @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
52 @PluginElement("Properties") final Property[] properties) {
53 final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
54 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
55 }
56
57 private final KafkaManager manager;
58
59 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
60 super(name, filter, layout, ignoreExceptions);
61 this.manager = manager;
62 }
63
64 @Override
65 public void append(final LogEvent event) {
66 if (event.getLoggerName().startsWith("org.apache.kafka")) {
67 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
68 } else {
69 try {
70 if (getLayout() != null) {
71 manager.send(getLayout().toByteArray(event));
72 } else {
73 manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8));
74 }
75 } catch (final Exception e) {
76 LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
77 throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
78 }
79 }
80 }
81
82 @Override
83 public void start() {
84 super.start();
85 manager.startup();
86 }
87
88 @Override
89 public void stop() {
90 super.stop();
91 manager.release();
92 }
93
94 }