1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.util.Objects;
21 import java.util.Properties;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Future;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26
27 import org.apache.kafka.clients.producer.Callback;
28 import org.apache.kafka.clients.producer.Producer;
29 import org.apache.kafka.clients.producer.ProducerRecord;
30 import org.apache.kafka.clients.producer.RecordMetadata;
31 import org.apache.logging.log4j.core.LoggerContext;
32 import org.apache.logging.log4j.core.appender.AbstractManager;
33 import org.apache.logging.log4j.core.config.Property;
34 import org.apache.logging.log4j.core.util.Log4jThread;
35
36 public class KafkaManager extends AbstractManager {
37
38 public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
39
40
41
42
43 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
44
45 private final Properties config = new Properties();
46 private Producer<byte[], byte[]> producer;
47 private final int timeoutMillis;
48
49 private final String topic;
50 private final boolean syncSend;
51
52 public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend, final Property[] properties) {
53 super(loggerContext, name);
54 this.topic = Objects.requireNonNull(topic, "topic");
55 this.syncSend = syncSend;
56 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
57 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
58 config.setProperty("batch.size", "0");
59 for (final Property property : properties) {
60 config.setProperty(property.getName(), property.getValue());
61 }
62 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
63 }
64
65 @Override
66 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
67 if (timeout > 0) {
68 closeProducer(timeout, timeUnit);
69 } else {
70 closeProducer(timeoutMillis, TimeUnit.MILLISECONDS);
71 }
72 return true;
73 }
74
75 private void closeProducer(final long timeout, final TimeUnit timeUnit) {
76 if (producer != null) {
77
78 final Thread closeThread = new Log4jThread(new Runnable() {
79 @Override
80 public void run() {
81 if (producer != null) {
82 producer.close();
83 }
84 }
85 }, "KafkaManager-CloseThread");
86 closeThread.setDaemon(true);
87 closeThread.start();
88 try {
89 closeThread.join(timeUnit.toMillis(timeout));
90 } catch (final InterruptedException ignore) {
91 Thread.currentThread().interrupt();
92
93 }
94 }
95 }
96
97 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
98 if (producer != null) {
99 final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, msg);
100 if (syncSend) {
101 final Future<RecordMetadata> response = producer.send(newRecord);
102 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
103 } else {
104 producer.send(newRecord, new Callback() {
105 @Override
106 public void onCompletion(final RecordMetadata metadata, final Exception e) {
107 if (e != null) {
108 LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
109 }
110 }
111 });
112 }
113 }
114 }
115
116 public void startup() {
117 producer = producerFactory.newKafkaProducer(config);
118 }
119
120 public String getTopic() {
121 return topic;
122 }
123
124 }