1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.nio.charset.StandardCharsets;
21 import java.util.Objects;
22 import java.util.Properties;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27
28 import org.apache.kafka.clients.producer.Callback;
29 import org.apache.kafka.clients.producer.Producer;
30 import org.apache.kafka.clients.producer.ProducerRecord;
31 import org.apache.kafka.clients.producer.RecordMetadata;
32 import org.apache.logging.log4j.core.LoggerContext;
33 import org.apache.logging.log4j.core.appender.AbstractManager;
34 import org.apache.logging.log4j.core.config.Property;
35 import org.apache.logging.log4j.core.util.Log4jThread;
36
37 public class KafkaManager extends AbstractManager {
38
39 public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
40
41
42
43
44 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
45
46 private final Properties config = new Properties();
47 private Producer<byte[], byte[]> producer;
48 private final int timeoutMillis;
49
50 private final String topic;
51 private final String key;
52 private final boolean syncSend;
53
54 public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend,
55 final Property[] properties, final String key) {
56 super(loggerContext, name);
57 this.topic = Objects.requireNonNull(topic, "topic");
58 this.syncSend = syncSend;
59 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
60 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
61 config.setProperty("batch.size", "0");
62 for (final Property property : properties) {
63 config.setProperty(property.getName(), property.getValue());
64 }
65
66 this.key = key;
67
68 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
69 }
70
71 @Override
72 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
73 if (timeout > 0) {
74 closeProducer(timeout, timeUnit);
75 } else {
76 closeProducer(timeoutMillis, TimeUnit.MILLISECONDS);
77 }
78 return true;
79 }
80
81 private void closeProducer(final long timeout, final TimeUnit timeUnit) {
82 if (producer != null) {
83
84 final Thread closeThread = new Log4jThread(new Runnable() {
85 @Override
86 public void run() {
87 if (producer != null) {
88 producer.close();
89 }
90 }
91 }, "KafkaManager-CloseThread");
92 closeThread.setDaemon(true);
93 closeThread.start();
94 try {
95 closeThread.join(timeUnit.toMillis(timeout));
96 } catch (final InterruptedException ignore) {
97 Thread.currentThread().interrupt();
98
99 }
100 }
101 }
102
103 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
104 if (producer != null) {
105 byte[] newKey = null;
106
107 if(key != null && key.contains("${")) {
108 newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key).getBytes(StandardCharsets.UTF_8);
109 } else if (key != null) {
110 newKey = key.getBytes(StandardCharsets.UTF_8);
111 }
112
113 final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
114 if (syncSend) {
115 final Future<RecordMetadata> response = producer.send(newRecord);
116 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
117 } else {
118 producer.send(newRecord, new Callback() {
119 @Override
120 public void onCompletion(final RecordMetadata metadata, final Exception e) {
121 if (e != null) {
122 LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
123 }
124 }
125 });
126 }
127 }
128 }
129
130 public void startup() {
131 producer = producerFactory.newKafkaProducer(config);
132 }
133
134 public String getTopic() {
135 return topic;
136 }
137
138 }