1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.appender.mom.kafka;
19
20 import java.util.Properties;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24
25 import org.apache.kafka.clients.producer.Producer;
26 import org.apache.kafka.clients.producer.ProducerRecord;
27 import org.apache.logging.log4j.core.appender.AbstractManager;
28 import org.apache.logging.log4j.core.config.Property;
29
30 public class KafkaManager extends AbstractManager {
31
32 public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
33
34
35
36
37 static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
38
39 private final Properties config = new Properties();
40 private Producer<byte[], byte[]> producer = null;
41 private final int timeoutMillis;
42
43 private final String topic;
44
45 public KafkaManager(final String name, final String topic, final Property[] properties) {
46 super(name);
47 this.topic = topic;
48 config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
49 config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
50 config.setProperty("batch.size", "0");
51 for (final Property property : properties) {
52 config.setProperty(property.getName(), property.getValue());
53 }
54 this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
55 }
56
57 @Override
58 public void releaseSub() {
59 if (producer != null) {
60
61 final Thread closeThread = new Thread(new Runnable() {
62 @Override
63 public void run() {
64 producer.close();
65 }
66 });
67 closeThread.setName("KafkaManager-CloseThread");
68 closeThread.setDaemon(true);
69 closeThread.start();
70 try {
71 closeThread.join(timeoutMillis);
72 } catch (final InterruptedException ignore) {
73
74 }
75 }
76 }
77
78 public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
79 if (producer != null) {
80 producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
81 }
82 }
83
84 public void startup() {
85 producer = producerFactory.newKafkaProducer(config);
86 }
87
88 }