View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.io.Serializable;
21  import java.nio.charset.StandardCharsets;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.logging.log4j.core.Appender;
25  import org.apache.logging.log4j.core.Filter;
26  import org.apache.logging.log4j.core.Layout;
27  import org.apache.logging.log4j.core.LogEvent;
28  import org.apache.logging.log4j.core.appender.AbstractAppender;
29  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
30  import org.apache.logging.log4j.core.config.Configuration;
31  import org.apache.logging.log4j.core.config.Node;
32  import org.apache.logging.log4j.core.config.Property;
33  import org.apache.logging.log4j.core.config.plugins.Plugin;
34  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
35  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
36  import org.apache.logging.log4j.core.config.plugins.PluginElement;
37  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
38  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
39  import org.apache.logging.log4j.core.layout.SerializedLayout;
40  import org.apache.logging.log4j.core.util.StringEncoder;
41  
42  /**
43   * Sends log events to an Apache Kafka topic.
44   */
45  @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
46  public final class KafkaAppender extends AbstractAppender {
47  
48      @PluginFactory
49      public static KafkaAppender createAppender(
50              @PluginElement("Layout") final Layout<? extends Serializable> layout,
51              @PluginElement("Filter") final Filter filter,
52              @Required(message = "No name provided for KafkaAppender") @PluginAttribute("name") final String name,
53              @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
54              @Required(message = "No topic provided for KafkaAppender") @PluginAttribute("topic") final String topic,
55              @PluginElement("Properties") final Property[] properties,
56              @PluginConfiguration final Configuration configuration) {
57          final KafkaManager kafkaManager = new KafkaManager(configuration.getLoggerContext(), name, topic, properties);
58          return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
59      }
60  
61      private final KafkaManager manager;
62  
63      private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter, final boolean ignoreExceptions, final KafkaManager manager) {
64          super(name, filter, layout, ignoreExceptions);
65          this.manager = manager;
66      }
67  
68      @Override
69      public void append(final LogEvent event) {
70          if (event.getLoggerName().startsWith("org.apache.kafka")) {
71              LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
72          } else {
73              try {
74                  final Layout<? extends Serializable> layout = getLayout();
75                  byte[] data;
76                  if (layout != null) {
77                      if (layout instanceof SerializedLayout) {
78                          final byte[] header = layout.getHeader();
79                          final byte[] body = layout.toByteArray(event);
80                          data = new byte[header.length + body.length];
81                          System.arraycopy(header, 0, data, 0, header.length);
82                          System.arraycopy(body, 0, data, header.length, body.length);
83                      } else {
84                          data = layout.toByteArray(event);
85                      }
86                  } else {
87                      data = StringEncoder.toBytes(event.getMessage().getFormattedMessage(), StandardCharsets.UTF_8);
88                  }
89                  manager.send(data);
90              } catch (final Exception e) {
91                  LOGGER.error("Unable to write to Kafka [{}] for appender [{}].", manager.getName(), getName(), e);
92                  throw new AppenderLoggingException("Unable to write to Kafka in appender: " + e.getMessage(), e);
93              }
94          }
95      }
96  
97      @Override
98      public void start() {
99          super.start();
100         manager.startup();
101     }
102 
103     @Override
104     public boolean stop(final long timeout, final TimeUnit timeUnit) {
105         setStopping();
106         boolean stopped = super.stop(timeout, timeUnit, false);
107         stopped &= manager.stop(timeout, timeUnit);
108         setStopped();
109         return stopped;
110     }
111 
112     @Override
113     public String toString() {
114         return "KafkaAppender{" +
115             "name=" + getName() +
116             ", state=" + getState() +
117             ", topic=" + manager.getTopic() +
118             '}';
119     }
120 }