1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.io.Serializable;
21 import java.util.Objects;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25
26 import org.apache.logging.log4j.core.AbstractLifeCycle;
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.Layout;
30 import org.apache.logging.log4j.core.LogEvent;
31 import org.apache.logging.log4j.core.appender.AbstractAppender;
32 import org.apache.logging.log4j.core.config.Configuration;
33 import org.apache.logging.log4j.core.config.Node;
34 import org.apache.logging.log4j.core.config.Property;
35 import org.apache.logging.log4j.core.config.plugins.Plugin;
36 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
37 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
38 import org.apache.logging.log4j.core.layout.SerializedLayout;
39
40
41
42
43 @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
44 public final class KafkaAppender extends AbstractAppender {
45
46 private final Integer retryCount;
47
48
49
50
51
52
53 public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
54 implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
55
56 @PluginAttribute("retryCount")
57 private String retryCount;
58
59 @PluginAttribute("topic")
60 private String topic;
61
62 @PluginAttribute("key")
63 private String key;
64
65 @PluginAttribute(value = "syncSend", defaultBoolean = true)
66 private boolean syncSend;
67
68 @SuppressWarnings("resource")
69 @Override
70 public KafkaAppender build() {
71 final Layout<? extends Serializable> layout = getLayout();
72 if (layout == null) {
73 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
74 return null;
75 }
76 final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(), getName(),
77 topic, syncSend, getPropertyArray(), key);
78 return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
79 getPropertyArray(), getRetryCount());
80 }
81
82 public String getTopic() {
83 return topic;
84 }
85
86 public boolean isSyncSend() {
87 return syncSend;
88 }
89
90 public B setTopic(final String topic) {
91 this.topic = topic;
92 return asBuilder();
93 }
94
95 public B setSyncSend(final boolean syncSend) {
96 this.syncSend = syncSend;
97 return asBuilder();
98 }
99
100 public B setKey(final String key) {
101 this.key = key;
102 return asBuilder();
103 }
104
105 public Integer getRetryCount() {
106 Integer intRetryCount = null;
107 try {
108 intRetryCount = Integer.valueOf(retryCount);
109 } catch (NumberFormatException e) {
110
111 }
112 return intRetryCount;
113
114 }
115
116 }
117
118 @Deprecated
119 public static KafkaAppender createAppender(final Layout<? extends Serializable> layout, final Filter filter,
120 final String name, final boolean ignoreExceptions, final String topic, final Property[] properties,
121 final Configuration configuration,
122 final String key) {
123
124 if (layout == null) {
125 AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
126 return null;
127 }
128 final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic, true,
129 properties, key);
130 return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, null);
131 }
132
133
134
135
136
137
138 @PluginBuilderFactory
139 public static <B extends Builder<B>> B newBuilder() {
140 return new Builder<B>().asBuilder();
141 }
142
143 private final KafkaManager manager;
144
145 private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
146 final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties,
147 final Integer retryCount) {
148 super(name, filter, layout, ignoreExceptions, properties);
149 this.manager = Objects.requireNonNull(manager, "manager");
150 this.retryCount = retryCount;
151 }
152
153 @Override
154 public void append(final LogEvent event) {
155 if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
156 LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
157 } else {
158 try {
159 tryAppend(event);
160 } catch (final Exception e) {
161
162 if (this.retryCount != null) {
163 int currentRetryAttempt = 0;
164 while (currentRetryAttempt < this.retryCount) {
165 currentRetryAttempt++;
166 try {
167 tryAppend(event);
168 break;
169 } catch (Exception e1) {
170
171 }
172 }
173 }
174 error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
175 }
176 }
177 }
178
179 private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
180 final Layout<? extends Serializable> layout = getLayout();
181 byte[] data;
182 if (layout instanceof SerializedLayout) {
183 final byte[] header = layout.getHeader();
184 final byte[] body = layout.toByteArray(event);
185 data = new byte[header.length + body.length];
186 System.arraycopy(header, 0, data, 0, header.length);
187 System.arraycopy(body, 0, data, header.length, body.length);
188 } else {
189 data = layout.toByteArray(event);
190 }
191 manager.send(data);
192 }
193
194 @Override
195 public void start() {
196 super.start();
197 manager.startup();
198 }
199
200 @Override
201 public boolean stop(final long timeout, final TimeUnit timeUnit) {
202 setStopping();
203 boolean stopped = super.stop(timeout, timeUnit, false);
204 stopped &= manager.stop(timeout, timeUnit);
205 setStopped();
206 return stopped;
207 }
208
209 @Override
210 public String toString() {
211 return "KafkaAppender{" + "name=" + getName() + ", state=" + getState() + ", topic=" + manager.getTopic() + '}';
212 }
213 }