View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.io.Serializable;
21  import java.util.Objects;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TimeoutException;
25  
26  import org.apache.logging.log4j.core.AbstractLifeCycle;
27  import org.apache.logging.log4j.core.Appender;
28  import org.apache.logging.log4j.core.Filter;
29  import org.apache.logging.log4j.core.Layout;
30  import org.apache.logging.log4j.core.LogEvent;
31  import org.apache.logging.log4j.core.appender.AbstractAppender;
32  import org.apache.logging.log4j.core.config.Configuration;
33  import org.apache.logging.log4j.core.config.Node;
34  import org.apache.logging.log4j.core.config.Property;
35  import org.apache.logging.log4j.core.config.plugins.Plugin;
36  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
37  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
38  import org.apache.logging.log4j.core.layout.SerializedLayout;
39  
40  /**
41   * Sends log events to an Apache Kafka topic.
42   */
43  @Plugin(name = "Kafka", category = Node.CATEGORY, elementType = Appender.ELEMENT_TYPE, printObject = true)
44  public final class KafkaAppender extends AbstractAppender {
45  
46  	private final Integer retryCount;
47  
48  	/**
49  	 * Builds KafkaAppender instances.
50  	 * 
51  	 * @param <B> The type to build
52  	 */
53  	public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B>
54  			implements org.apache.logging.log4j.core.util.Builder<KafkaAppender> {
55  
56  		@PluginAttribute("retryCount")
57  		private String retryCount;
58  
59  		@PluginAttribute("topic")
60  		private String topic;
61  
62  		@PluginAttribute("key")
63  		private String key;
64  
65  		@PluginAttribute(value = "syncSend", defaultBoolean = true)
66  		private boolean syncSend;
67  
68  		@SuppressWarnings("resource")
69  		@Override
70  		public KafkaAppender build() {
71  			final Layout<? extends Serializable> layout = getLayout();
72  			if (layout == null) {
73  				AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
74  				return null;
75  			}
76  			final KafkaManager kafkaManager = KafkaManager.getManager(getConfiguration().getLoggerContext(), getName(),
77  					topic, syncSend, getPropertyArray(), key);
78  			return new KafkaAppender(getName(), layout, getFilter(), isIgnoreExceptions(), kafkaManager,
79  					getPropertyArray(), getRetryCount());
80  		}
81  
82  		public String getTopic() {
83  			return topic;
84  		}
85  
86  		public boolean isSyncSend() {
87  			return syncSend;
88  		}
89  
90  		public B setTopic(final String topic) {
91  			this.topic = topic;
92  			return asBuilder();
93  		}
94  
95  		public B setSyncSend(final boolean syncSend) {
96  			this.syncSend = syncSend;
97  			return asBuilder();
98  		}
99  
100 		public B setKey(final String key) {
101 			this.key = key;
102 			return asBuilder();
103 		}
104 
105 		public Integer getRetryCount() {
106 			Integer intRetryCount = null;
107 			try {
108 				intRetryCount = Integer.valueOf(retryCount);
109 			} catch (NumberFormatException e) {
110 
111 			}
112 			return intRetryCount;
113 
114 		}
115 
116 	}
117 
118 	@Deprecated
119 	public static KafkaAppender createAppender(final Layout<? extends Serializable> layout, final Filter filter,
120 			final String name, final boolean ignoreExceptions, final String topic, final Property[] properties,
121             final Configuration configuration,
122             final String key) {
123 
124 		if (layout == null) {
125 			AbstractLifeCycle.LOGGER.error("No layout provided for KafkaAppender");
126 			return null;
127 		}
128 		final KafkaManager kafkaManager = KafkaManager.getManager(configuration.getLoggerContext(), name, topic, true,
129 				properties, key);
130 		return new KafkaAppender(name, layout, filter, ignoreExceptions, kafkaManager, null, null);
131 	}
132 
133 	/**
134 	 * Creates a builder for a KafkaAppender.
135 	 * 
136 	 * @return a builder for a KafkaAppender.
137 	 */
138 	@PluginBuilderFactory
139 	public static <B extends Builder<B>> B newBuilder() {
140 		return new Builder<B>().asBuilder();
141 	}
142 
143 	private final KafkaManager manager;
144 
145 	private KafkaAppender(final String name, final Layout<? extends Serializable> layout, final Filter filter,
146 			final boolean ignoreExceptions, final KafkaManager manager, final Property[] properties,
147 			final Integer retryCount) {
148 		super(name, filter, layout, ignoreExceptions, properties);
149 		this.manager = Objects.requireNonNull(manager, "manager");
150 		this.retryCount = retryCount;
151 	}
152 
153 	@Override
154 	public void append(final LogEvent event) {
155 		if (event.getLoggerName() != null && event.getLoggerName().startsWith("org.apache.kafka")) {
156 			LOGGER.warn("Recursive logging from [{}] for appender [{}].", event.getLoggerName(), getName());
157 		} else {
158 			try {
159 				tryAppend(event);
160 			} catch (final Exception e) {
161 
162 				if (this.retryCount != null) {
163 					int currentRetryAttempt = 0;
164 					while (currentRetryAttempt < this.retryCount) {
165 						currentRetryAttempt++;
166 						try {
167 							tryAppend(event);
168 							break;
169 						} catch (Exception e1) {
170 
171 						}
172 					}
173 				}
174 				error("Unable to write to Kafka in appender [" + getName() + "]", event, e);
175 			}
176 		}
177 	}
178 
179 	private void tryAppend(final LogEvent event) throws ExecutionException, InterruptedException, TimeoutException {
180 		final Layout<? extends Serializable> layout = getLayout();
181 		byte[] data;
182 		if (layout instanceof SerializedLayout) {
183 			final byte[] header = layout.getHeader();
184 			final byte[] body = layout.toByteArray(event);
185 			data = new byte[header.length + body.length];
186 			System.arraycopy(header, 0, data, 0, header.length);
187 			System.arraycopy(body, 0, data, header.length, body.length);
188 		} else {
189 			data = layout.toByteArray(event);
190 		}
191 		manager.send(data);
192 	}
193 
194 	@Override
195 	public void start() {
196 		super.start();
197 		manager.startup();
198 	}
199 
200 	@Override
201 	public boolean stop(final long timeout, final TimeUnit timeUnit) {
202 		setStopping();
203 		boolean stopped = super.stop(timeout, timeUnit, false);
204 		stopped &= manager.stop(timeout, timeUnit);
205 		setStopped();
206 		return stopped;
207 	}
208 
209 	@Override
210 	public String toString() {
211 		return "KafkaAppender{" + "name=" + getName() + ", state=" + getState() + ", topic=" + manager.getTopic() + '}';
212 	}
213 }