View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.io.Serializable;
21  import java.nio.charset.StandardCharsets;
22  
23  import org.apache.logging.log4j.core.Filter;
24  import org.apache.logging.log4j.core.Layout;
25  import org.apache.logging.log4j.core.LogEvent;
26  import org.apache.logging.log4j.core.appender.AbstractAppender;
27  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
28  import org.apache.logging.log4j.core.config.Property;
29  import org.apache.logging.log4j.core.config.plugins.Plugin;
30  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
31  import org.apache.logging.log4j.core.config.plugins.PluginElement;
32  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
33  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
34  
35  /**
36   * Sends log events to an Apache Kafka topic.
37   */
38  @Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
39  public final class KafkaAppender extends AbstractAppender {
40  
41      /**
42       * 
43       */
44      private static final long serialVersionUID = 1L;
45      @PluginFactory
46      public static KafkaAppender createAppender(
47              @PluginElement("Layout") final Layout<? extends Serializable> layout,
48              @PluginElement("Filter") final Filter filter,
49              @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
50              @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
51              @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
52              @PluginElement("Properties") final Property[] properties) {
53          final KafkaManager kafkaManager = new KafkaManager(name, topic, properties);
54          return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
55      }
56  
57      private final KafkaManager manager;
58  
59      private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
60          super(name, filter, layout, ignoreExceptions);
61          this.manager = manager;
62      }
63  
64      @Override
65      public void append(final LogEvent event) {
66          if (event.getLoggerName().startsWith("org.apache.kafka")) {
67              LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
68          } else {
69              try {
70                  if (getLayout() != null) {
71                      manager.send(getLayout().toByteArray(event));
72                  } else {
73                      manager.send(event.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8));
74                  }
75              } catch (final Exception e) {
76                  LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
77                  throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
78              }
79          }
80      }
81  
82      @Override
83      public void start() {
84          super.start();
85          manager.startup();
86      }
87  
88      @Override
89      public void stop() {
90          super.stop();
91          manager.release();
92      }
93  
94  }