1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.util.Properties;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24
25 import org.apache.kafka.clients.producer.Producer;
26 import org.apache.kafka.clients.producer.ProducerRecord;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.appender.AbstractManager;
29 import org.apache.logging.log4j.core.config.Property;
30
31 public class KafkaManager extends AbstractManager {
32
33 public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
34
35
36
37
38 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
39
40 private final Properties config = new Properties();
41 private Producer<byte[], byte[]> producer;
42 private final int timeoutMillis;
43
44 private final String topic;
45
46 public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) {
47 super(loggerContext, name);
48 this.topic = topic;
49 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
50 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
51 config.setProperty("batch.size", "0");
52 for (final Property property : properties) {
53 config.setProperty(property.getName(), property.getValue());
54 }
55 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
56 }
57
58 @Override
59 public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
60 if (timeout > 0) {
61 closeProducer(timeout, timeUnit);
62 } else {
63 closeProducer(timeoutMillis, TimeUnit.MILLISECONDS);
64 }
65 return true;
66 }
67
68 private void closeProducer(final long timeout, final TimeUnit timeUnit) {
69 if (producer != null) {
70
71 final Runnable task = new Runnable() {
72 @Override
73 public void run() {
74 if (producer != null) {
75 producer.close();
76 }
77 }
78 };
79 try {
80 getLoggerContext().submitDaemon(task).get(timeout, timeUnit);
81 } catch (InterruptedException | ExecutionException | TimeoutException e) {
82
83 }
84 }
85 }
86
87 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
88 if (producer != null) {
89 producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
90 }
91 }
92
93 public void startup() {
94 producer = producerFactory.newKafkaProducer(config);
95 }
96
97 public String getTopic() {
98 return topic;
99 }
100
101 }