View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.nio.charset.StandardCharsets;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import javax.crypto.Cipher;
38  import javax.crypto.SecretKey;
39  
40  import org.apache.flume.Event;
41  import org.apache.flume.event.SimpleEvent;
42  import org.apache.logging.log4j.LoggingException;
43  import org.apache.logging.log4j.core.appender.ManagerFactory;
44  import org.apache.logging.log4j.core.config.Property;
45  import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
46  import org.apache.logging.log4j.core.config.plugins.util.PluginType;
47  import org.apache.logging.log4j.core.util.FileUtils;
48  import org.apache.logging.log4j.core.util.Log4jThread;
49  import org.apache.logging.log4j.core.util.SecretKeyProvider;
50  import org.apache.logging.log4j.util.Strings;
51  
52  import com.sleepycat.je.Cursor;
53  import com.sleepycat.je.CursorConfig;
54  import com.sleepycat.je.Database;
55  import com.sleepycat.je.DatabaseConfig;
56  import com.sleepycat.je.DatabaseEntry;
57  import com.sleepycat.je.Environment;
58  import com.sleepycat.je.EnvironmentConfig;
59  import com.sleepycat.je.LockConflictException;
60  import com.sleepycat.je.LockMode;
61  import com.sleepycat.je.OperationStatus;
62  import com.sleepycat.je.StatsConfig;
63  import com.sleepycat.je.Transaction;
64  
65  /**
66   * Manager that persists data to Berkeley DB before passing it on to Flume.
67   */
68  public class FlumePersistentManager extends FlumeAvroManager {
69  
70      /** Attribute name for the key provider. */
71      public static final String KEY_PROVIDER = "keyProvider";
72  
73      private static final Charset UTF8 = StandardCharsets.UTF_8;
74  
75      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
76  
77      private static final int SHUTDOWN_WAIT = 60;
78  
79      private static final int MILLIS_PER_SECOND = 1000;
80  
81      private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
82  
83      private static BDBManagerFactory factory = new BDBManagerFactory();
84  
85      private final Database database;
86  
87      private final Environment environment;
88  
89      private final WriterThread worker;
90  
91      private final Gate gate = new Gate();
92  
93      private final SecretKey secretKey;
94  
95      private final int lockTimeoutRetryCount;
96  
97      private final ExecutorService threadPool;
98  
99      private final AtomicLong dbCount = new AtomicLong();
100 
101     /**
102      * Constructor
103      * @param name The unique name of this manager.
104      * @param shortName Original name for the Manager.
105      * @param agents An array of Agents.
106      * @param batchSize The number of events to include in a batch.
107      * @param retries The number of times to retry connecting before giving up.
108      * @param connectionTimeout The amount of time to wait for a connection to be established.
109      * @param requestTimeout The amount of time to wair for a response to a request.
110      * @param delay The amount of time to wait between retries.
111      * @param database The database to write to.
112      * @param environment The database environment.
113      * @param secretKey The SecretKey to use for encryption.
114      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
115      */
116     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117                                      final int batchSize, final int retries, final int connectionTimeout,
118                                      final int requestTimeout, final int delay, final Database database,
119                                      final Environment environment, final SecretKey secretKey,
120                                      final int lockTimeoutRetryCount) {
121         super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
122         this.database = database;
123         this.environment = environment;
124         dbCount.set(database.count());
125         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
126             lockTimeoutRetryCount);
127         this.worker.start();
128         this.secretKey = secretKey;
129         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
130         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
131     }
132 
133 
134     /**
135      * Returns a FlumeAvroManager.
136      * @param name The name of the manager.
137      * @param agents The agents to use.
138      * @param properties Properties to pass to the Manager.
139      * @param batchSize The number of events to include in a batch.
140      * @param retries The number of times to retry connecting before giving up.
141      * @param connectionTimeout The amount of time to wait to establish a connection.
142      * @param requestTimeout The amount of time to wait for a response to a request.
143      * @param delayMillis Amount of time to delay before delivering a batch.
144      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
145      * @param dataDir The location of the Berkeley database.
146      * @return A FlumeAvroManager.
147      */
148     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
149                                                     final Property[] properties, int batchSize, final int retries,
150                                                     final int connectionTimeout, final int requestTimeout,
151                                                     final int delayMillis, final int lockTimeoutRetryCount,
152                                                     final String dataDir) {
153         if (agents == null || agents.length == 0) {
154             throw new IllegalArgumentException("At least one agent is required");
155         }
156 
157         if (batchSize <= 0) {
158             batchSize = 1;
159         }
160         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
161 
162         final StringBuilder sb = new StringBuilder("FlumePersistent[");
163         boolean first = true;
164         for (final Agent agent : agents) {
165             if (!first) {
166                 sb.append(',');
167             }
168             sb.append(agent.getHost()).append(':').append(agent.getPort());
169             first = false;
170         }
171         sb.append(']');
172         sb.append(' ').append(dataDirectory);
173         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
174             connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
175     }
176 
177     @Override
178     public void send(final Event event)  {
179         if (worker.isShutdown()) {
180             throw new LoggingException("Unable to record event");
181         }
182 
183         final Map<String, String> headers = event.getHeaders();
184         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
185         try {
186             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
187             final DataOutputStream daos = new DataOutputStream(baos);
188             daos.writeInt(event.getBody().length);
189             daos.write(event.getBody(), 0, event.getBody().length);
190             daos.writeInt(event.getHeaders().size());
191             for (final Map.Entry<String, String> entry : headers.entrySet()) {
192                 daos.writeUTF(entry.getKey());
193                 daos.writeUTF(entry.getValue());
194             }
195             byte[] eventData = baos.toByteArray();
196             if (secretKey != null) {
197                 final Cipher cipher = Cipher.getInstance("AES");
198                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
199                 eventData = cipher.doFinal(eventData);
200             }
201             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
202                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
203             boolean interrupted = false;
204             int ieCount = 0;
205             do {
206                 try {
207                     future.get();
208                 } catch (final InterruptedException ie) {
209                     interrupted = true;
210                     ++ieCount;
211                 }
212             } while (interrupted && ieCount <= 1);
213 
214         } catch (final Exception ex) {
215             throw new LoggingException("Exception occurred writing log event", ex);
216         }
217     }
218 
219     @Override
220     protected void releaseSub() {
221         LOGGER.debug("Shutting down FlumePersistentManager");
222         worker.shutdown();
223         try {
224             worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
225         } catch (final InterruptedException ie) {
226             // Ignore the exception and shutdown.
227         }
228         threadPool.shutdown();
229         try {
230             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
231         } catch (final InterruptedException e) {
232             logWarn("PersistentManager Thread pool failed to shut down", e);
233         }
234         try {
235             worker.join();
236         } catch (final InterruptedException ex) {
237             logDebug("interrupted while waiting for worker to complete", ex);
238         }
239         try {
240             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
241             database.close();
242         } catch (final Exception ex) {
243             logWarn("failed to close database", ex);
244         }
245         try {
246             environment.cleanLog();
247             environment.close();
248         } catch (final Exception ex) {
249             logWarn("failed to close environment", ex);
250         }
251         super.releaseSub();
252     }
253 
254     private void doSend(final SimpleEvent event) {
255         LOGGER.debug("Sending event to Flume");
256         super.send(event);
257     }
258 
259     /**
260      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
261      */
262     private static class BDBWriter implements Callable<Integer> {
263         private final byte[] eventData;
264         private final byte[] keyData;
265         private final Environment environment;
266         private final Database database;
267         private final Gate gate;
268         private final AtomicLong dbCount;
269         private final long batchSize;
270         private final int lockTimeoutRetryCount;
271 
272         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
273                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
274                          final int lockTimeoutRetryCount) {
275             this.keyData = keyData;
276             this.eventData = eventData;
277             this.environment = environment;
278             this.database = database;
279             this.gate = gate;
280             this.dbCount = dbCount;
281             this.batchSize = batchSize;
282             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
283         }
284 
285         @Override
286         public Integer call() throws Exception {
287             final DatabaseEntry key = new DatabaseEntry(keyData);
288             final DatabaseEntry data = new DatabaseEntry(eventData);
289             Exception exception = null;
290             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
291                 Transaction txn = null;
292                 try {
293                     txn = environment.beginTransaction(null, null);
294                     try {
295                         database.put(txn, key, data);
296                         txn.commit();
297                         txn = null;
298                         if (dbCount.incrementAndGet() >= batchSize) {
299                             gate.open();
300                         }
301                         exception = null;
302                         break;
303                     } catch (final LockConflictException lce) {
304                         exception = lce;
305                         // Fall through and retry.
306                     } catch (final Exception ex) {
307                         if (txn != null) {
308                             txn.abort();
309                         }
310                         throw ex;
311                     } finally {
312                         if (txn != null) {
313                             txn.abort();
314                             txn = null;
315                         }
316                     }
317                 } catch (final LockConflictException lce) {
318                     exception = lce;
319                     if (txn != null) {
320                         try {
321                             txn.abort();
322                             txn = null;
323                         } catch (final Exception ex) {
324                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
325                         }
326                     }
327 
328                 }
329                 try {
330                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
331                 } catch (final InterruptedException ie) {
332                     // Ignore the error
333                 }
334             }
335             if (exception != null) {
336                 throw exception;
337             }
338             return eventData.length;
339         }
340     }
341 
342     /**
343      * Factory data.
344      */
345     private static class FactoryData {
346         private final String name;
347         private final Agent[] agents;
348         private final int batchSize;
349         private final String dataDir;
350         private final int retries;
351         private final int connectionTimeout;
352         private final int requestTimeout;
353         private final int delayMillis;
354         private final int lockTimeoutRetryCount;
355         private final Property[] properties;
356 
357         /**
358          * Constructor.
359          * @param name The name of the Appender.
360          * @param agents The agents.
361          * @param batchSize The number of events to include in a batch.
362          * @param dataDir The directory for data.
363          */
364         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
365                            final int connectionTimeout, final int requestTimeout, final int delayMillis,
366                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
367             this.name = name;
368             this.agents = agents;
369             this.batchSize = batchSize;
370             this.dataDir = dataDir;
371             this.retries = retries;
372             this.connectionTimeout = connectionTimeout;
373             this.requestTimeout = requestTimeout;
374             this.delayMillis = delayMillis;
375             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
376             this.properties = properties;
377         }
378     }
379 
380     /**
381      * Avro Manager Factory.
382      */
383     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
384 
385         /**
386          * Create the FlumeKratiManager.
387          * @param name The name of the entity to manage.
388          * @param data The data required to create the entity.
389          * @return The FlumeKratiManager.
390          */
391         @Override
392         public FlumePersistentManager createManager(final String name, final FactoryData data) {
393             SecretKey secretKey = null;
394             Database database = null;
395             Environment environment = null;
396 
397             final Map<String, String> properties = new HashMap<>();
398             if (data.properties != null) {
399                 for (final Property property : data.properties) {
400                     properties.put(property.getName(), property.getValue());
401                 }
402             }
403 
404             try {
405                 final File dir = new File(data.dataDir);
406                 FileUtils.mkdir(dir, true);
407                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
408                 dbEnvConfig.setTransactional(true);
409                 dbEnvConfig.setAllowCreate(true);
410                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
411                 environment = new Environment(dir, dbEnvConfig);
412                 final DatabaseConfig dbConfig = new DatabaseConfig();
413                 dbConfig.setTransactional(true);
414                 dbConfig.setAllowCreate(true);
415                 database = environment.openDatabase(null, name, dbConfig);
416             } catch (final Exception ex) {
417                 LOGGER.error("Could not create FlumePersistentManager", ex);
418                 // For consistency, close database as well as environment even though it should never happen since the
419                 // database is that last thing in the block above, but this does guard against a future line being
420                 // inserted at the end that would bomb (like some debug logging).
421                 if (database != null) {
422                     database.close();
423                     database = null;
424                 }
425                 if (environment != null) {
426                     environment.close();
427                     environment = null;
428                 }
429                 return null;
430             }
431 
432             try {
433                 String key = null;
434                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
435                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
436                         key = entry.getValue();
437                         break;
438                     }
439                 }
440                 if (key != null) {
441                     final PluginManager manager = new PluginManager("KeyProvider");
442                     manager.collectPlugins();
443                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
444                     if (plugins != null) {
445                         boolean found = false;
446                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
447                             if (entry.getKey().equalsIgnoreCase(key)) {
448                                 found = true;
449                                 final Class<?> cl = entry.getValue().getPluginClass();
450                                 try {
451                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
452                                     secretKey = provider.getSecretKey();
453                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
454                                 } catch (final Exception ex) {
455                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
456                                         cl.getName());
457                                 }
458                                 break;
459                             }
460                         }
461                         if (!found) {
462                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
463                         }
464                     } else {
465                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
466                     }
467                 }
468             } catch (final Exception ex) {
469                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
470             }
471             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
472                 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
473                 data.lockTimeoutRetryCount);
474         }
475     }
476 
477     /**
478      * Thread that sends data to Flume and pulls it from Berkeley DB.
479      */
480     private static class WriterThread extends Thread  {
481         private volatile boolean shutdown = false;
482         private final Database database;
483         private final Environment environment;
484         private final FlumePersistentManager manager;
485         private final Gate gate;
486         private final SecretKey secretKey;
487         private final int batchSize;
488         private final AtomicLong dbCounter;
489         private final int lockTimeoutRetryCount;
490 
491         public WriterThread(final Database database, final Environment environment,
492                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
493                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
494             this.database = database;
495             this.environment = environment;
496             this.manager = manager;
497             this.gate = gate;
498             this.batchSize = batchsize;
499             this.secretKey = secretKey;
500             this.setDaemon(true);
501             this.dbCounter = dbCount;
502             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
503         }
504 
505         public void shutdown() {
506             LOGGER.debug("Writer thread shutting down");
507             this.shutdown = true;
508             gate.open();
509         }
510 
511         public boolean isShutdown() {
512             return shutdown;
513         }
514 
515         @Override
516         public void run() {
517             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
518             long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
519             while (!shutdown) {
520                 final long nowMillis = System.currentTimeMillis();
521                 final long dbCount = database.count();
522                 dbCounter.set(dbCount);
523                 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
524                     nextBatchMillis = nowMillis + manager.getDelayMillis();
525                     try {
526                         boolean errors = false;
527                         final DatabaseEntry key = new DatabaseEntry();
528                         final DatabaseEntry data = new DatabaseEntry();
529 
530                         gate.close();
531                         OperationStatus status;
532                         if (batchSize > 1) {
533                             try {
534                                 errors = sendBatch(key, data);
535                             } catch (final Exception ex) {
536                                 break;
537                             }
538                         } else {
539                             Exception exception = null;
540                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
541                                 exception = null;
542                                 Transaction txn = null;
543                                 Cursor cursor = null;
544                                 try {
545                                     txn = environment.beginTransaction(null, null);
546                                     cursor = database.openCursor(txn, null);
547                                     try {
548                                         status = cursor.getFirst(key, data, LockMode.RMW);
549                                         while (status == OperationStatus.SUCCESS) {
550                                             final SimpleEvent event = createEvent(data);
551                                             if (event != null) {
552                                                 try {
553                                                     manager.doSend(event);
554                                                 } catch (final Exception ioe) {
555                                                     errors = true;
556                                                     LOGGER.error("Error sending event", ioe);
557                                                     break;
558                                                 }
559                                                 try {
560                                                     cursor.delete();
561                                                 } catch (final Exception ex) {
562                                                     LOGGER.error("Unable to delete event", ex);
563                                                 }
564                                             }
565                                             status = cursor.getNext(key, data, LockMode.RMW);
566                                         }
567                                         if (cursor != null) {
568                                             cursor.close();
569                                             cursor = null;
570                                         }
571                                         txn.commit();
572                                         txn = null;
573                                         dbCounter.decrementAndGet();
574                                         exception = null;
575                                         break;
576                                     } catch (final LockConflictException lce) {
577                                         exception = lce;
578                                         // Fall through and retry.
579                                     } catch (final Exception ex) {
580                                         LOGGER.error("Error reading or writing to database", ex);
581                                         shutdown = true;
582                                         break;
583                                     } finally {
584                                         if (cursor != null) {
585                                             cursor.close();
586                                             cursor = null;
587                                         }
588                                         if (txn != null) {
589                                             txn.abort();
590                                             txn = null;
591                                         }
592                                     }
593                                 } catch (final LockConflictException lce) {
594                                     exception = lce;
595                                     if (cursor != null) {
596                                         try {
597                                             cursor.close();
598                                             cursor = null;
599                                         } catch (final Exception ex) {
600                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
601                                         }
602                                     }
603                                     if (txn != null) {
604                                         try {
605                                             txn.abort();
606                                             txn = null;
607                                         } catch (final Exception ex) {
608                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
609                                         }
610                                     }
611                                 }
612                                 try {
613                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
614                                 } catch (final InterruptedException ie) {
615                                     // Ignore the error
616                                 }
617                             }
618                             if (exception != null) {
619                                 LOGGER.error("Unable to read or update data base", exception);
620                             }
621                         }
622                         if (errors) {
623                             Thread.sleep(manager.getDelayMillis());
624                             continue;
625                         }
626                     } catch (final Exception ex) {
627                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
628                     }
629                 } else {
630                     if (nextBatchMillis <= nowMillis) {
631                         nextBatchMillis = nowMillis + manager.getDelayMillis();
632                     }
633                     try {
634                         final long interval = nextBatchMillis - nowMillis;
635                         gate.waitForOpen(interval);
636                     } catch (final InterruptedException ie) {
637                         LOGGER.warn("WriterThread interrupted, continuing");
638                     } catch (final Exception ex) {
639                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
640                         break;
641                     }
642                 }
643             }
644 
645             if (batchSize > 1 && database.count() > 0) {
646                 final DatabaseEntry key = new DatabaseEntry();
647                 final DatabaseEntry data = new DatabaseEntry();
648                 try {
649                     sendBatch(key, data);
650                 } catch (final Exception ex) {
651                     LOGGER.warn("Unable to write final batch");
652                 }
653             }
654             LOGGER.trace("WriterThread exiting");
655         }
656 
657         private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
658             boolean errors = false;
659             OperationStatus status;
660             Cursor cursor = null;
661             try {
662             	final BatchEvent batch = new BatchEvent();
663             	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
664             		try {
665             			cursor = database.openCursor(null, CursorConfig.DEFAULT);
666             			status = cursor.getFirst(key, data, null);
667 
668             			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
669             				final SimpleEvent event = createEvent(data);
670             				if (event != null) {
671             					batch.addEvent(event);
672             				}
673             				status = cursor.getNext(key, data, null);
674             			}
675             			break;
676             		} catch (final LockConflictException lce) {
677             			if (cursor != null) {
678             				try {
679                                 cursor.close();
680                                 cursor = null;
681                             } catch (final Exception ex) {
682                                 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
683                             }
684                         }
685                     }
686             	}
687 
688                 try {
689                     manager.send(batch);
690                 } catch (final Exception ioe) {
691                     LOGGER.error("Error sending events", ioe);
692                     errors = true;
693                 }
694                 if (!errors) {
695                 	if (cursor != null) {
696 	                    cursor.close();
697 	                    cursor = null;
698                 	}
699                     Transaction txn = null;
700                     Exception exception = null;
701                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
702                         try {
703                             txn = environment.beginTransaction(null, null);
704                             try {
705                                 for (final Event event : batch.getEvents()) {
706                                     try {
707                                         final Map<String, String> headers = event.getHeaders();
708                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
709                                         database.delete(txn, key);
710                                     } catch (final Exception ex) {
711                                         LOGGER.error("Error deleting key from database", ex);
712                                     }
713                                 }
714                                 txn.commit();
715                                 long count = dbCounter.get();
716                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
717                                     count = dbCounter.get();
718                                 }
719                                 exception = null;
720                                 break;
721                             } catch (final LockConflictException lce) {
722                                 exception = lce;
723                                 if (cursor != null) {
724                                     try {
725                                         cursor.close();
726                                         cursor = null;
727                                     } catch (final Exception ex) {
728                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
729                                     }
730                                 }
731                                 if (txn != null) {
732                                     try {
733                                         txn.abort();
734                                         txn = null;
735                                     } catch (final Exception ex) {
736                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
737                                     }
738                                 }
739                             } catch (final Exception ex) {
740                                 LOGGER.error("Unable to commit transaction", ex);
741                                 if (txn != null) {
742                                     txn.abort();
743                                 }
744                             }
745                         } catch (final LockConflictException lce) {
746                             exception = lce;
747                             if (cursor != null) {
748                                 try {
749                                     cursor.close();
750                                     cursor = null;
751                                 } catch (final Exception ex) {
752                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
753                                 }
754                             }
755                             if (txn != null) {
756                                 try {
757                                     txn.abort();
758                                     txn = null;
759                                 } catch (final Exception ex) {
760                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
761                                 }
762                             }
763                         } finally {
764                             if (cursor != null) {
765                                 cursor.close();
766                                 cursor = null;
767                             }
768                             if (txn != null) {
769                                 txn.abort();
770                                 txn = null;
771                             }
772                         }
773                         try {
774                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
775                         } catch (final InterruptedException ie) {
776                             // Ignore the error
777                         }
778                     }
779                     if (exception != null) {
780                         LOGGER.error("Unable to delete events from data base", exception);
781                     }
782                 }
783             } catch (final Exception ex) {
784                 LOGGER.error("Error reading database", ex);
785                 shutdown = true;
786                 throw ex;
787             } finally {
788                 if (cursor != null) {
789                     cursor.close();
790                 }
791             }
792 
793             return errors;
794         }
795 
796         private SimpleEvent createEvent(final DatabaseEntry data) {
797             final SimpleEvent event = new SimpleEvent();
798             try {
799                 byte[] eventData = data.getData();
800                 if (secretKey != null) {
801                     final Cipher cipher = Cipher.getInstance("AES");
802                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
803                     eventData = cipher.doFinal(eventData);
804                 }
805                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
806                 final DataInputStream dais = new DataInputStream(bais);
807                 int length = dais.readInt();
808                 final byte[] bytes = new byte[length];
809                 dais.read(bytes, 0, length);
810                 event.setBody(bytes);
811                 length = dais.readInt();
812                 final Map<String, String> map = new HashMap<>(length);
813                 for (int i = 0; i < length; ++i) {
814                     final String headerKey = dais.readUTF();
815                     final String value = dais.readUTF();
816                     map.put(headerKey, value);
817                 }
818                 event.setHeaders(map);
819                 return event;
820             } catch (final Exception ex) {
821                 LOGGER.error("Error retrieving event", ex);
822                 return null;
823             }
824         }
825 
826     }
827 
828     /**
829      * Factory that creates Daemon threads that can be properly shut down.
830      */
831     private static class DaemonThreadFactory implements ThreadFactory {
832         private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
833         private final ThreadGroup group;
834         private final AtomicInteger threadNumber = new AtomicInteger(1);
835         private final String namePrefix;
836 
837         public DaemonThreadFactory() {
838             final SecurityManager securityManager = System.getSecurityManager();
839             group = securityManager != null ? securityManager.getThreadGroup() :
840                 Thread.currentThread().getThreadGroup();
841             namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
842         }
843 
844         @Override
845         public Thread newThread(final Runnable r) {
846             final Thread thread = new Log4jThread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
847             thread.setDaemon(true);
848             if (thread.getPriority() != Thread.NORM_PRIORITY) {
849                 thread.setPriority(Thread.NORM_PRIORITY);
850             }
851             return thread;
852         }
853     }
854 
855     /**
856      * An internal class.
857      */
858     private static class Gate {
859 
860         private boolean isOpen = false;
861 
862         public boolean isOpen() {
863             return isOpen;
864         }
865 
866         public synchronized void open() {
867             isOpen = true;
868             notifyAll();
869         }
870 
871         public synchronized void close() {
872             isOpen = false;
873         }
874 
875         public synchronized void waitForOpen(final long timeout) throws InterruptedException {
876             wait(timeout);
877         }
878     }
879 }