View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.util.Objects;
21  import java.util.Properties;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.Future;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.TimeoutException;
26  
27  import org.apache.kafka.clients.producer.Callback;
28  import org.apache.kafka.clients.producer.Producer;
29  import org.apache.kafka.clients.producer.ProducerRecord;
30  import org.apache.kafka.clients.producer.RecordMetadata;
31  import org.apache.logging.log4j.core.LoggerContext;
32  import org.apache.logging.log4j.core.appender.AbstractManager;
33  import org.apache.logging.log4j.core.config.Property;
34  import org.apache.logging.log4j.core.util.Log4jThread;
35  
36  public class KafkaManager extends AbstractManager {
37  
38      public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
39  
40      /**
41       * package-private access for testing.
42       */
43      static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
44  
45      private final Properties config = new Properties();
46      private Producer<byte[], byte[]> producer;
47      private final int timeoutMillis;
48  
49      private final String topic;
50      private final boolean syncSend;
51  
52      public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final boolean syncSend, final Property[] properties) {
53          super(loggerContext, name);
54          this.topic = Objects.requireNonNull(topic, "topic");
55          this.syncSend = syncSend;
56          config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
57          config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
58          config.setProperty("batch.size", "0");
59          for (final Property property : properties) {
60              config.setProperty(property.getName(), property.getValue());
61          }
62          this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
63      }
64  
65      @Override
66      public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
67          if (timeout > 0) {
68              closeProducer(timeout, timeUnit);
69          } else {
70              closeProducer(timeoutMillis, TimeUnit.MILLISECONDS);
71          }
72          return true;
73      }
74  
75      private void closeProducer(final long timeout, final TimeUnit timeUnit) {
76          if (producer != null) {
77              // This thread is a workaround for this Kafka issue: https://issues.apache.org/jira/browse/KAFKA-1660
78             final Thread closeThread = new Log4jThread(new Runnable() {
79                  @Override
80                  public void run() {
81                      if (producer != null) {
82                          producer.close();
83                      }
84                  }
85              }, "KafkaManager-CloseThread");
86              closeThread.setDaemon(true); // avoid blocking JVM shutdown
87              closeThread.start();
88              try {
89                  closeThread.join(timeUnit.toMillis(timeout));
90              } catch (final InterruptedException ignore) {
91                  Thread.currentThread().interrupt();
92                  // ignore
93              }
94          }
95      }
96  
97      public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
98          if (producer != null) {
99              final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, msg);
100             if (syncSend) {
101                 final Future<RecordMetadata> response = producer.send(newRecord);
102                 response.get(timeoutMillis, TimeUnit.MILLISECONDS);
103             } else {
104                 producer.send(newRecord, new Callback() {
105                     @Override
106                     public void onCompletion(final RecordMetadata metadata, final Exception e) {
107                         if (e != null) {
108                             LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
109                         }
110                     }
111                 });
112             }
113         }
114     }
115 
116     public void startup() {
117         producer = producerFactory.newKafkaProducer(config);
118     }
119 
120     public String getTopic() {
121         return topic;
122     }
123 
124 }