1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.io.Serializable;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25
26 import org.apache.logging.log4j.core.AbstractLifeCycle;
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.Layout;
30 import org.apache.logging.log4j.core.LogEvent;
31 import org.apache.logging.log4j.core.appender.AbstractAppender;
32 import org.apache.logging.log4j.core.config.Configuration;
33 import org.apache.logging.log4j.core.config.Node;
34 import org.apache.logging.log4j.core.config.Property;
35 import org.apache.logging.log4j.core.config.plugins.Plugin;
36 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
37 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
38 import org.apache.logging.log4j.core.config.plugins.PluginElement;
39 import org.apache.logging.log4j.core.layout.SerializedLayout;
40
41
42
43
44 @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
45 public final class KafkaAppender extends AbstractAppender {
46
47
48
49
50
51 public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
52 implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
53
54 @PluginAttribute("topic")
55 private String topic;
56
57 @PluginAttribute("key")
58 private String key;
59
60 @PluginAttribute(value = "syncSend", defaultBoolean = true)
61 private boolean syncSend;
62
63 @PluginElement("Properties")
64 private Property[] properties;
65
66 @SuppressWarnings("resource")
67 @Override
68 public KafkaAppender build() {
69 final Layout<? extends Serializable> layout = getLayout();
70 if (layout == null) {
71 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
72 return null;
73 }
74 final KafkaManager kafkaManager =
75 new KafkaManager(getConfiguration().getLoggerContext(), getName(), topic, syncSend, properties, key);
76 return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager);
77 }
78
79 public String getTopic() {
80 return topic;
81 }
82
83 public boolean isSyncSend() {
84 return syncSend;
85 }
86
87 public Property[] getProperties() {
88 return properties;
89 }
90
91 public B setTopic(final String topic) {
92 this.topic = topic;
93 return asBuilder();
94 }
95
96 public B setSyncSend(final boolean syncSend) {
97 this.syncSend = syncSend;
98 return asBuilder();
99 }
100
101 public B setProperties(final Property[] properties) {
102 this.properties = properties;
103 return asBuilder();
104 }
105 }
106
107 @Deprecated
108 public static KafkaAppender createAppender(
109 final Layout<? extends Serializable> layout,
110 final Filter filter,
111 final String name,
112 final boolean ignoreExceptions,
113 final String topic,
114 final Property[] properties,
115 final Configuration configuration,
116 final String key) {
117
118 if (layout == null) {
119 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
120 return null;
121 }
122 final KafkaManager kafkaManager =
123 new KafkaManager(configuration.getLoggerContext(), name, topic, true, properties, key);
124 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
125 }
126
127
128
129
130
131 @PluginBuilderFactory
132 public static <B extends Builder<B>> B newBuilder() {
133 return new Builder<B>().asBuilder();
134 }
135
136 private final KafkaManager manager;
137
138 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
139 final boolean ignoreExceptions, final KafkaManager manager) {
140 super(name, filter, layout, ignoreExceptions);
141 this.manager = Objects.requireNonNull(manager, "manager");
142 }
143
144 @Override
145 public void append(final LogEvent event) {
146 if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
147 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
148 } else {
149 try {
150 tryAppend(event);
151 } catch (final Exception e) {
152 error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
153 }
154 }
155 }
156
157 private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
158 final Layout<? extends Serializable> layout = getLayout();
159 byte[] data;
160 if (layout instanceof SerializedLayout) {
161 final byte[] header = layout.getHeader();
162 final byte[] body = layout.toByteArray(event);
163 data = new byte[header.length + body.length];
164 System.arraycopy(header, 0, data, 0, header.length);
165 System.arraycopy(body, 0, data, header.length, body.length);
166 } else {
167 data = layout.toByteArray(event);
168 }
169 manager.send(data);
170 }
171
172 @Override
173 public void start() {
174 super.start();
175 manager.startup();
176 }
177
178 @Override
179 public boolean stop(final long timeout, final TimeUnit timeUnit) {
180 setStopping();
181 boolean stopped = super.stop(timeout, timeUnit, false);
182 stopped &= manager.stop(timeout, timeUnit);
183 setStopped();
184 return stopped;
185 }
186
187 @Override
188 public String toString() {
189 return "KafkaAppender{" +
190 "name=" + getName() +
191 ", state=" + getState() +
192 ", topic=" + manager.getTopic() +
193 '}';
194 }
195 }