1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import com.sleepycat.je.Cursor;
20 import com.sleepycat.je.Database;
21 import com.sleepycat.je.DatabaseConfig;
22 import com.sleepycat.je.DatabaseEntry;
23 import com.sleepycat.je.Environment;
24 import com.sleepycat.je.EnvironmentConfig;
25 import com.sleepycat.je.LockMode;
26 import com.sleepycat.je.OperationStatus;
27 import com.sleepycat.je.StatsConfig;
28 import org.apache.flume.Event;
29 import org.apache.flume.event.SimpleEvent;
30 import org.apache.logging.log4j.LoggingException;
31 import org.apache.logging.log4j.core.appender.ManagerFactory;
32 import org.apache.logging.log4j.core.config.Property;
33 import org.apache.logging.log4j.core.config.plugins.PluginManager;
34 import org.apache.logging.log4j.core.config.plugins.PluginType;
35 import org.apache.logging.log4j.core.helpers.FileUtils;
36 import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
37
38 import javax.crypto.Cipher;
39 import javax.crypto.SecretKey;
40 import java.io.ByteArrayInputStream;
41 import java.io.ByteArrayOutputStream;
42 import java.io.DataInputStream;
43 import java.io.DataOutputStream;
44 import java.io.File;
45 import java.nio.charset.Charset;
46 import java.util.HashMap;
47 import java.util.Map;
48 import java.util.concurrent.LinkedBlockingQueue;
49 import java.util.concurrent.TimeUnit;
50
51
52
53
54 public class FlumePersistentManager extends FlumeAvroManager {
55
56 public static final String KEY_PROVIDER = "keyProvider";
57
58 private static final Charset UTF8 = Charset.forName("UTF-8");
59
60 private static final String SHUTDOWN = "Shutdown";
61
62 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
63
64 private static BDBManagerFactory factory = new BDBManagerFactory();
65
66 private Database database;
67
68 private final WriterThread worker;
69
70 private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
71
72 private final SecretKey secretKey;
73
74 private final int delay;
75
76
77
78
79
80
81
82
83 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
84 final int batchSize, final int retries, final int connectionTimeout,
85 final int requestTimeout, final int delay, final Database database,
86 SecretKey secretKey) {
87 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
88 this.delay = delay;
89 this.database = database;
90 this.worker = new WriterThread(database, this, queue, batchSize, secretKey);
91 this.worker.start();
92 this.secretKey = secretKey;
93 }
94
95
96
97
98
99
100
101
102
103 public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
104 int batchSize, final int retries, final int connectionTimeout,
105 final int requestTimeout, final int delay, final String dataDir) {
106 if (agents == null || agents.length == 0) {
107 throw new IllegalArgumentException("At least one agent is required");
108 }
109
110 if (batchSize <= 0) {
111 batchSize = 1;
112 }
113 String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
114
115 final StringBuilder sb = new StringBuilder("FlumePersistent[");
116 boolean first = true;
117 for (final Agent agent : agents) {
118 if (!first) {
119 sb.append(",");
120 }
121 sb.append(agent.getHost()).append(":").append(agent.getPort());
122 first = false;
123 }
124 sb.append("]");
125 sb.append(" ").append(dataDirectory);
126 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
127 connectionTimeout, requestTimeout, delay, dataDir, properties));
128 }
129
130 @Override
131 public synchronized void send(final Event event) {
132 if (worker.isShutdown()) {
133 throw new LoggingException("Unable to record event");
134 }
135
136 Map<String, String> headers = event.getHeaders();
137 byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
138 try {
139 ByteArrayOutputStream baos = new ByteArrayOutputStream();
140 DataOutputStream daos = new DataOutputStream(baos);
141 daos.writeInt(event.getBody().length);
142 daos.write(event.getBody(), 0, event.getBody().length);
143 daos.writeInt(event.getHeaders().size());
144 for (Map.Entry<String, String> entry : headers.entrySet()) {
145 daos.writeUTF(entry.getKey());
146 daos.writeUTF(entry.getValue());
147 }
148 byte[] eventData = baos.toByteArray();
149 if (secretKey != null) {
150 Cipher cipher = Cipher.getInstance("AES");
151 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
152 eventData = cipher.doFinal(eventData);
153 }
154 final DatabaseEntry key = new DatabaseEntry(keyData);
155 final DatabaseEntry data = new DatabaseEntry(eventData);
156 database.put(null, key, data);
157 queue.add(keyData);
158 } catch (Exception ex) {
159 throw new LoggingException("Exception occurred writing log event", ex);
160 }
161 }
162
163 @Override
164 protected void releaseSub() {
165 LOGGER.debug("Shutting down FlumePersistentManager");
166 worker.shutdown();
167 try {
168 worker.join();
169 } catch (InterruptedException ex) {
170 LOGGER.debug("Interrupted while waiting for worker to complete");
171 }
172 try {
173 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
174 database.close();
175 } catch (final Exception ex) {
176 LOGGER.warn("Failed to close database", ex);
177 }
178 super.releaseSub();
179 }
180
181 private void doSend(final SimpleEvent event) {
182 LOGGER.debug("Sending event to Flume");
183 super.send(event);
184 }
185
186
187
188
189 private static class FactoryData {
190 private final String name;
191 private final Agent[] agents;
192 private final int batchSize;
193 private final String dataDir;
194 private final int retries;
195 private final int connectionTimeout;
196 private final int requestTimeout;
197 private final int delay;
198 private final Property[] properties;
199
200
201
202
203
204
205
206
207 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
208 final int connectionTimeout, final int requestTimeout, final int delay,
209 final String dataDir, final Property[] properties) {
210 this.name = name;
211 this.agents = agents;
212 this.batchSize = batchSize;
213 this.dataDir = dataDir;
214 this.retries = retries;
215 this.connectionTimeout = connectionTimeout;
216 this.requestTimeout = requestTimeout;
217 this.delay = delay;
218 this.properties = properties;
219 }
220 }
221
222
223
224
225 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
226
227
228
229
230
231
232
233 public FlumePersistentManager createManager(final String name, final FactoryData data) {
234 SecretKey secretKey = null;
235
236 Database database;
237
238 Map<String, String> properties = new HashMap<String, String>();
239 if (data.properties != null) {
240 for (Property property : data.properties) {
241 properties.put(property.getName(), property.getValue());
242 }
243 }
244
245 try {
246
247 File dir = new File(data.dataDir);
248 FileUtils.mkdir(dir, true);
249 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
250 dbEnvConfig.setTransactional(false);
251 dbEnvConfig.setAllowCreate(true);
252 final Environment environment = new Environment(dir, dbEnvConfig);
253 final DatabaseConfig dbConfig = new DatabaseConfig();
254 dbConfig.setTransactional(false);
255 dbConfig.setAllowCreate(true);
256 database = environment.openDatabase(null, name, dbConfig);
257 } catch (final Exception ex) {
258 LOGGER.error("Could not create FlumePersistentManager", ex);
259 return null;
260 }
261
262 try {
263 String key = null;
264 for (Map.Entry<String, String> entry : properties.entrySet()) {
265 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
266 key = entry.getValue();
267 }
268 }
269 if (key != null) {
270 final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
271 manager.collectPlugins();
272 final Map<String, PluginType> plugins = manager.getPlugins();
273 if (plugins != null) {
274 boolean found = false;
275 for (Map.Entry<String, PluginType> entry : plugins.entrySet()) {
276 if (entry.getKey().equalsIgnoreCase(key)) {
277 found = true;
278 Class cl = entry.getValue().getPluginClass();
279 try {
280 SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
281 secretKey = provider.getSecretKey();
282 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
283 } catch (Exception ex) {
284 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
285 cl.getName());
286 }
287 break;
288 }
289 }
290 if (!found) {
291 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
292 }
293 } else {
294 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
295 }
296 }
297 } catch (Exception ex) {
298 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
299 }
300 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
301 data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey);
302 }
303 }
304
305 private static class WriterThread extends Thread {
306 private volatile boolean shutdown = false;
307 private final Database database;
308 private final FlumePersistentManager manager;
309 private final LinkedBlockingQueue<byte[]> queue;
310 private final SecretKey secretKey;
311 private final int batchSize;
312
313 public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]> queue,
314 int batchsize, SecretKey secretKey) {
315 this.database = database;
316 this.manager = manager;
317 this.queue = queue;
318 this.batchSize = batchsize;
319 this.secretKey = secretKey;
320 this.setDaemon(true);
321 }
322
323 public void shutdown() {
324 LOGGER.debug("Writer thread shutting down");
325 this.shutdown = true;
326 if (queue.size() == 0) {
327 queue.add(SHUTDOWN.getBytes(UTF8));
328 }
329 }
330
331 public boolean isShutdown() {
332 return shutdown;
333 }
334
335 @Override
336 public void run() {
337 LOGGER.trace("WriterThread started");
338 long lastBatch = System.currentTimeMillis();
339 while (!shutdown) {
340 if (database.count() >= batchSize ||
341 database.count() > 0 && lastBatch + manager.delay > System.currentTimeMillis()) {
342 lastBatch = System.currentTimeMillis();
343 try {
344 boolean errors = false;
345 DatabaseEntry key = new DatabaseEntry();
346 final DatabaseEntry data = new DatabaseEntry();
347 final Cursor cursor = database.openCursor(null, null);
348 try {
349 queue.clear();
350 OperationStatus status;
351 try {
352 status = cursor.getFirst(key, data, LockMode.RMW);
353 if (batchSize > 1) {
354 BatchEvent batch = new BatchEvent();
355 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
356 SimpleEvent event = createEvent(data);
357 if (event != null) {
358 batch.addEvent(event);
359 }
360 status = cursor.getNext(key, data, LockMode.RMW);
361 }
362 try {
363 manager.send(batch);
364 } catch (Exception ioe) {
365 LOGGER.error("Error sending events", ioe);
366 break;
367 }
368 for (Event event : batch.getEvents()) {
369 try {
370 Map<String, String> headers = event.getHeaders();
371 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
372 database.delete(null, key);
373 } catch (Exception ex) {
374 LOGGER.error("Error deleting key from database", ex);
375 }
376 }
377 } else {
378 while (status == OperationStatus.SUCCESS) {
379 SimpleEvent event = createEvent(data);
380 if (event != null) {
381 try {
382 manager.doSend(event);
383 } catch (Exception ioe) {
384 errors = true;
385 LOGGER.error("Error sending event", ioe);
386 break;
387 }
388 if (!errors) {
389 try {
390 cursor.delete();
391 } catch (Exception ex) {
392 LOGGER.error("Unable to delete event", ex);
393 }
394 }
395 }
396 status = cursor.getNext(key, data, LockMode.RMW);
397 }
398 }
399 } catch (Exception ex) {
400 LOGGER.error("Error reading database", ex);
401 shutdown = true;
402 break;
403 }
404
405 } finally {
406 cursor.close();
407 }
408 if (errors) {
409 Thread.sleep(manager.delay);
410 continue;
411 }
412 } catch (Exception ex) {
413 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
414 }
415 } else {
416 try {
417 if (database.count() >= batchSize) {
418 continue;
419 }
420 queue.poll(manager.delay, TimeUnit.MILLISECONDS);
421 LOGGER.debug("WriterThread notified of work");
422 } catch (InterruptedException ie) {
423 LOGGER.warn("WriterThread interrupted, continuing");
424 } catch (Exception ex) {
425 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
426 break;
427 }
428 }
429 }
430 LOGGER.trace("WriterThread exiting");
431 }
432
433 private SimpleEvent createEvent(DatabaseEntry data) {
434 SimpleEvent event = new SimpleEvent();
435 try {
436 byte[] eventData = data.getData();
437 if (secretKey != null) {
438 Cipher cipher = Cipher.getInstance("AES");
439 cipher.init(Cipher.DECRYPT_MODE, secretKey);
440 eventData = cipher.doFinal(eventData);
441 }
442 ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
443 DataInputStream dais = new DataInputStream(bais);
444 int length = dais.readInt();
445 byte[] bytes = new byte[length];
446 dais.read(bytes, 0, length);
447 event.setBody(bytes);
448 length = dais.readInt();
449 Map<String, String> map = new HashMap<String, String>(length);
450 for (int i = 0; i < length; ++i) {
451 String headerKey = dais.readUTF();
452 String value = dais.readUTF();
453 map.put(headerKey, value);
454 }
455 event.setHeaders(map);
456 return event;
457 } catch (Exception ex) {
458 LOGGER.error("Error retrieving event", ex);
459 return null;
460 }
461 }
462
463 }
464 }