View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.util.Properties;
21  import java.util.concurrent.ExecutionException;
22  import java.util.concurrent.TimeUnit;
23  import java.util.concurrent.TimeoutException;
24  
25  import org.apache.kafka.clients.producer.Producer;
26  import org.apache.kafka.clients.producer.ProducerRecord;
27  import org.apache.logging.log4j.core.appender.AbstractManager;
28  import org.apache.logging.log4j.core.config.Property;
29  
30  public class KafkaManager extends AbstractManager {
31  
32      public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
33  
34      /**
35       * package-private access for testing.
36       */
37      static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
38  
39      private final Properties config = new Properties();
40      private Producer<byte[], byte[]> producer = null;
41      private final int timeoutMillis;
42  
43      private final String topic;
44  
45      public KafkaManager(final String name, final String topic, final Property[] properties) {
46          super(name);
47          this.topic = topic;
48          config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
49          config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
50          config.setProperty("batch.size", "0");
51          for (final Property property : properties) {
52              config.setProperty(property.getName(), property.getValue());
53          }
54          this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
55      }
56  
57      @Override
58      public void releaseSub() {
59          if (producer != null) {
60              // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
61              final Thread closeThread = new Thread(new Runnable() {
62                  @Override
63                  public void run() {
64                      producer.close();
65                  }
66              });
67              closeThread.setName("KafkaManager-CloseThread");
68              closeThread.setDaemon(true); // avoid blocking JVM shutdown
69              closeThread.start();
70              try {
71                  closeThread.join(timeoutMillis);
72              } catch (final InterruptedException ignore) {
73                  // ignore
74              }
75          }
76      }
77  
78      public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
79          if (producer != null) {
80              producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
81          }
82      }
83  
84      public void startup() {
85          producer = producerFactory.newKafkaProducer(config);
86      }
87  
88  }