View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.util.Properties;
21  import java.util.concurrent.ExecutionException;
22  import java.util.concurrent.TimeUnit;
23  import java.util.concurrent.TimeoutException;
24  
25  import org.apache.kafka.clients.producer.Producer;
26  import org.apache.kafka.clients.producer.ProducerRecord;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.appender.AbstractManager;
29  import org.apache.logging.log4j.core.config.Property;
30  
31  public class KafkaManager extends AbstractManager {
32  
33      public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
34  
35      /**
36       * package-private access for testing.
37       */
38      static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
39  
40      private final Properties config = new Properties();
41      private Producer<byte[], byte[]> producer;
42      private final int timeoutMillis;
43  
44      private final String topic;
45  
46      public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final Property[] properties) {
47          super(loggerContext, name);
48          this.topic = topic;
49          config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
50          config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
51          config.setProperty("batch.size", "0");
52          for (final Property property : properties) {
53              config.setProperty(property.getName(), property.getValue());
54          }
55          this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
56      }
57  
58      @Override
59      public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
60          if (timeout > 0) {
61              closeProducer(timeout, timeUnit);
62          } else {
63              closeProducer(timeoutMillis, TimeUnit.MILLISECONDS);
64          }
65          return true;
66      }
67  
68      private void closeProducer(final long timeout, final TimeUnit timeUnit) {
69          if (producer != null) {
70              // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
71              final Runnable task = new Runnable() {
72                  @Override
73                  public void run() {
74                      if (producer != null) {
75                          producer.close();
76                      }
77                  }
78              };
79              try {
80                  getLoggerContext().submitDaemon(task).get(timeout, timeUnit);
81              } catch (InterruptedException | ExecutionException | TimeoutException e) {
82                  // ignore
83              }
84          }
85      }
86  
87      public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
88          if (producer != null) {
89              producer.send(new ProducerRecord<byte[], byte[]>(topic, msg)).get(timeoutMillis, TimeUnit.MILLISECONDS);
90          }
91      }
92  
93      public void startup() {
94          producer = producerFactory.newKafkaProducer(config);
95      }
96  
97      public String getTopic() {
98          return topic;
99      }
100 
101 }