View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.appender.mom.kafka;
19  
20  import java.nio.charset.StandardCharsets;
21  import java.util.Objects;
22  import java.util.Properties;
23  import java.util.concurrent.ExecutionException;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.TimeoutException;
27  
28  import org.apache.kafka.clients.producer.Callback;
29  import org.apache.kafka.clients.producer.Producer;
30  import org.apache.kafka.clients.producer.ProducerRecord;
31  import org.apache.kafka.clients.producer.RecordMetadata;
32  import org.apache.logging.log4j.core.LoggerContext;
33  import org.apache.logging.log4j.core.appender.AbstractManager;
34  import org.apache.logging.log4j.core.appender.ManagerFactory;
35  import org.apache.logging.log4j.core.config.Property;
36  import org.apache.logging.log4j.core.util.Log4jThread;
37  
38  public class KafkaManager extends AbstractManager {
39  
40  	public static final String DEFAULT_TIMEOUT_MILLIS = "30000";
41  
42  	/**
43  	 * package-private access for testing.
44  	 */
45  	static KafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory();
46  
47  	private final Properties config = new Properties();
48  	private Producer<byte[], byte[]> producer;
49  	private final int timeoutMillis;
50  
51  	private final String topic;
52  	private final String key;
53  	private final boolean syncSend;
54  	private static final KafkaManagerFactory factory = new KafkaManagerFactory();
55  
56  	/*
57  	 * The Constructor should have been declared private as all Managers are create
58  	 * by the internal factory;
59  	 */
60  	public KafkaManager(final LoggerContext loggerContext, final String name, final String topic,
61  			final boolean syncSend, final Property[] properties, final String key) {
62  		super(loggerContext, name);
63  		this.topic = Objects.requireNonNull(topic, "topic");
64  		this.syncSend = syncSend;
65  
66  		config.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
67  		config.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
68  		config.setProperty("batch.size", "0");
69  
70  		for (final Property property : properties) {
71  			config.setProperty(property.getName(), property.getValue());
72  		}
73  
74  		this.key = key;
75  
76  		this.timeoutMillis = Integer.parseInt(config.getProperty("timeout.ms", DEFAULT_TIMEOUT_MILLIS));
77  	}
78  
79  	@Override
80  	public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
81  		if (timeout > 0) {
82  			closeProducer(timeout, timeUnit);
83  		} else {
84  			closeProducer(timeoutMillis, TimeUnit.MILLISECONDS);
85  		}
86  		return true;
87  	}
88  
89  	private void closeProducer(final long timeout, final TimeUnit timeUnit) {
90  		if (producer != null) {
91  			// This thread is a workaround for this Kafka issue:
92  			// https://issues.apache.org/jira/browse/KAFKA-1660
93  			final Thread closeThread = new Log4jThread(new Runnable() {
94  				@Override
95  				public void run() {
96  					if (producer != null) {
97  						producer.close();
98  					}
99  				}
100 			}, "KafkaManager-CloseThread");
101 			closeThread.setDaemon(true); // avoid blocking JVM shutdown
102 			closeThread.start();
103 			try {
104 				closeThread.join(timeUnit.toMillis(timeout));
105 			} catch (final InterruptedException ignore) {
106 				Thread.currentThread().interrupt();
107 				// ignore
108 			}
109 		}
110 	}
111 
112 	public void send(final byte[] msg) throws ExecutionException, InterruptedException, TimeoutException {
113 		if (producer != null) {
114 			byte[] newKey = null;
115 
116 			if (key != null && key.contains("${")) {
117 				newKey = getLoggerContext().getConfiguration().getStrSubstitutor().replace(key)
118 						.getBytes(StandardCharsets.UTF_8);
119 			} else if (key != null) {
120 				newKey = key.getBytes(StandardCharsets.UTF_8);
121 			}
122 
123 			final ProducerRecord<byte[], byte[]> newRecord = new ProducerRecord<>(topic, newKey, msg);
124 			if (syncSend) {
125 				final Future<RecordMetadata> response = producer.send(newRecord);
126 				response.get(timeoutMillis, TimeUnit.MILLISECONDS);
127 			} else {
128 				producer.send(newRecord, new Callback() {
129 					@Override
130 					public void onCompletion(final RecordMetadata metadata, final Exception e) {
131 						if (e != null) {
132 							LOGGER.error("Unable to write to Kafka in appender [" + getName() + "]", e);
133 						}
134 					}
135 				});
136 			}
137 		}
138 	}
139 
140 	public void startup() {
141 		producer = producerFactory.newKafkaProducer(config);
142 	}
143 
144 	public String getTopic() {
145 		return topic;
146 	}
147 
148 	public static KafkaManager getManager(final LoggerContext loggerContext, final String name, final String topic,
149 			final boolean syncSend, final Property[] properties, final String key) {
150 		StringBuilder sb = new StringBuilder(name);
151 		for (Property prop : properties) {
152 			sb.append(" ").append(prop.getName()).append("=").append(prop.getValue());
153 		}
154 		return getManager(sb.toString(), factory, new FactoryData(loggerContext, topic, syncSend, properties, key));
155 	}
156 
157 	private static class FactoryData {
158 		private final LoggerContext loggerContext;
159 		private final String topic;
160 		private final boolean syncSend;
161 		private final Property[] properties;
162 		private final String key;
163 
164 		public FactoryData(final LoggerContext loggerContext, final String topic, final boolean syncSend,
165 				final Property[] properties, final String key) {
166 			this.loggerContext = loggerContext;
167 			this.topic = topic;
168 			this.syncSend = syncSend;
169 			this.properties = properties;
170 			this.key = key;
171 		}
172 
173 	}
174 
175 	private static class KafkaManagerFactory implements ManagerFactory<KafkaManager, FactoryData> {
176 		@Override
177 		public KafkaManager createManager(String name, FactoryData data) {
178 			return new KafkaManager(data.loggerContext, name, data.topic, data.syncSend, data.properties, data.key);
179 		}
180 	}
181 
182 }