View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.io.Serializable;
21  import java.util.Objects;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TimeoutException;
25  
26  import org.apache.logging.log4j.core.AbstractLifeCycle;
27  import org.apache.logging.log4j.core.Appender;
28  import org.apache.logging.log4j.core.Filter;
29  import org.apache.logging.log4j.core.Layout;
30  import org.apache.logging.log4j.core.LogEvent;
31  import org.apache.logging.log4j.core.appender.AbstractAppender;
32  import org.apache.logging.log4j.core.config.Configuration;
33  import org.apache.logging.log4j.core.config.Node;
34  import org.apache.logging.log4j.core.config.Property;
35  import org.apache.logging.log4j.core.config.plugins.Plugin;
36  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
37  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
38  import org.apache.logging.log4j.core.config.plugins.PluginElement;
39  import org.apache.logging.log4j.core.layout.SerializedLayout;
40  
41  /**
42   * Sends log events to an Apache Kafka topic.
43   */
44  @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
45  public final class KafkaAppender extends AbstractAppender {
46  
47      /**
48       * Builds KafkaAppender instances.
49       * @param <B> The type to build
50       */
51      public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
52              implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
53  
54          @PluginAttribute("topic") 
55          private String topic;
56  
57          @PluginAttribute("key")
58          private String key;
59          
60          @PluginAttribute(value = "syncSend", defaultBoolean = true)
61          private boolean syncSend;
62  
63          @PluginElement("Properties") 
64          private Property[] properties;
65  
66          @SuppressWarnings("resource")
67          @Override
68          public KafkaAppender build() {
69              final Layout<? extends Serializable> layout = getLayout();
70              if (layout == null) {
71                  AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
72                  return null;
73              }
74              final KafkaManager kafkaManager =
75                      new KafkaManager(getConfiguration().getLoggerContext(), getName(), topic, syncSend, properties, key);
76              return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager);
77          }
78  
79          public String getTopic() {
80              return topic;
81          }
82  
83          public boolean isSyncSend() {
84              return syncSend;
85          }
86  
87          public Property[] getProperties() {
88              return properties;
89          }
90  
91          public B setTopic(final String topic) {
92              this.topic = topic;
93              return asBuilder();
94          }
95  
96          public B setSyncSend(final boolean syncSend) {
97              this.syncSend = syncSend;
98              return asBuilder();
99          }
100 
101         public B setProperties(final Property[] properties) {
102             this.properties = properties;
103             return asBuilder();
104         }
105     }
106     
107     @Deprecated
108     public static KafkaAppender createAppender(
109             final Layout<? extends Serializable> layout,
110             final Filter filter,
111             final String name,
112             final boolean ignoreExceptions,
113             final String topic,
114             final Property[] properties,
115             final Configuration configuration,
116             final String key) {
117 
118         if (layout == null) {
119             AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
120             return null;
121         }
122         final KafkaManager kafkaManager =
123                 new KafkaManager(configuration.getLoggerContext(), name, topic, true, properties, key);
124         return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager);
125     }
126 
127     /**
128      * Creates a builder for a KafkaAppender.
129      * @return a builder for a KafkaAppender.
130      */
131     @PluginBuilderFactory
132     public static <B extends Builder<B>> B newBuilder() {
133         return new Builder<B>().asBuilder();
134     }
135 
136     private final KafkaManager manager;
137 
138     private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
139             final boolean ignoreExceptions, final KafkaManager manager) {
140         super(name, filter, layout, ignoreExceptions);
141         this.manager = Objects.requireNonNull(manager, "manager");
142     }
143 
144     @Override
145     public void append(final LogEvent event) {
146         if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
147             LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
148         } else {
149             try {
150                 tryAppend(event);
151             } catch (final Exception e) {
152                 error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
153             }
154         }
155     }
156 
157     private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
158         final Layout<? extends Serializable> layout = getLayout();
159         byte[] data;
160         if (layout instanceof SerializedLayout) {
161             final byte[] header = layout.getHeader();
162             final byte[] body = layout.toByteArray(event);
163             data = new byte[header.length + body.length];
164             System.arraycopy(header, 0, data, 0, header.length);
165             System.arraycopy(body, 0, data, header.length, body.length);
166         } else {
167             data = layout.toByteArray(event);
168         }
169         manager.send(data);
170     }
171 
172     @Override
173     public void start() {
174         super.start();
175         manager.startup();
176     }
177 
178     @Override
179     public boolean stop(final long timeout, final TimeUnit timeUnit) {
180         setStopping();
181         boolean stopped = super.stop(timeout, timeUnit, false);
182         stopped &= manager.stop(timeout, timeUnit);
183         setStopped();
184         return stopped;
185     }
186 
187     @Override
188     public String toString() {
189         return "KafkaAppender{" +
190             "name=" + getName() +
191             ", state=" + getState() +
192             ", topic=" + manager.getTopic() +
193             '}';
194     }
195 }