1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.nio.charset.StandardCharsets;
21 import java.util.Objects;
22 import java.util.Properties;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27
28 import org.apache.kafka.clients.producer.Callback;
29 import org.apache.kafka.clients.producer.Producer;
30 import org.apache.kafka.clients.producer.ProducerRecord;
31 import org.apache.kafka.clients.producer.RecordMetadata;
32 import org.apache.logging.log4j.core.LoggerContext;
33 import org.apache.logging.log4j.core.appender.AbstractManager;
34 import org.apache.logging.log4j.core.appender.ManagerFactory;
35 import org.apache.logging.log4j.core.config.Property;
36 import org.apache.logging.log4j.core.util.Log4jThread;
37
38 public class KafkaManager extends AbstractManager {
39
40 public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
41
42
43
44
45 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
46
47 private final Properties config = new Properties();
48 private Producer<byte[], byte[]> producer;
49 private final int timeoutMillis;
50
51 private final String topic;
52 private final String key;
53 private final boolean syncSend;
54 private static final KafkaManagerFactory factory = new KafkaManagerFactory();
55
56
57
58
59
60 public KafkaManager(final LoggerContext loggerContext, final String name, final String topic,
61 final boolean syncSend, final Property[] properties, final String key) {
62 super(loggerContext, name);
63 this.topic = Objects.requireNonNull(topic, "topic");
64 this.syncSend = syncSend;
65
66 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
67 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
68 config.setProperty("batch.size", "0");
69
70 for (final Property property : properties) {
71 config.setProperty(property.getName(), property.getValue());
72 }
73
74 this.key = key;
75
76 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
77 }
78
79 @Override
80 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
81 if (timeout > 0) {
82 closeProducer(timeout, timeUnit);
83 } else {
84 closeProducer(timeoutMillis, TimeUnit.MILLISECONDS);
85 }
86 return true;
87 }
88
89 private void closeProducer(final long timeout, final TimeUnit timeUnit) {
90 if (producer != null) {
91
92
93 final Thread closeThread = new Log4jThread(new Runnable() {
94 @Override
95 public void run() {
96 if (producer != null) {
97 producer.close();
98 }
99 }
100 }, "KafkaManager-CloseThread");
101 closeThread.setDaemon(true);
102 closeThread.start();
103 try {
104 closeThread.join(timeUnit.toMillis(timeout));
105 } catch (final InterruptedException ignore) {
106 Thread.currentThread().interrupt();
107
108 }
109 }
110 }
111
112 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
113 if (producer != null) {
114 byte[] newKey = null;
115
116 if (key != null && key.contains("${")) {
117 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key)
118 .getBytes(StandardCharsets.UTF_8);
119 } else if (key != null) {
120 newKey = key.getBytes(StandardCharsets.UTF_8);
121 }
122
123 final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
124 if (syncSend) {
125 final Future<RecordMetadata> response = producer.send(newRecord);
126 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
127 } else {
128 producer.send(newRecord, new Callback() {
129 @Override
130 public void onCompletion(final RecordMetadata metadata, final Exception e) {
131 if (e != null) {
132 LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
133 }
134 }
135 });
136 }
137 }
138 }
139
140 public void startup() {
141 producer = producerFactory.newKafkaProducer(config);
142 }
143
144 public String getTopic() {
145 return topic;
146 }
147
148 public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
149 final boolean syncSend, final Property[] properties, final String key) {
150 StringBuilder sb = new StringBuilder(name);
151 for (Property prop : properties) {
152 sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
153 }
154 return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
155 }
156
157 private static class FactoryData {
158 private final LoggerContext loggerContext;
159 private final String topic;
160 private final boolean syncSend;
161 private final Property[] properties;
162 private final String key;
163
164 public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
165 final Property[] properties, final String key) {
166 this.loggerContext = loggerContext;
167 this.topic = topic;
168 this.syncSend = syncSend;
169 this.properties = properties;
170 this.key = key;
171 }
172
173 }
174
175 private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
176 @Override
177 public KafkaManager createManager(String name, FactoryData data) {
178 return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
179 }
180 }
181
182 }