View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.util.HashMap;
26  import java.util.Map;
27  import java.util.concurrent.Callable;
28  import java.util.concurrent.ExecutorService;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.Future;
31  import java.util.concurrent.ThreadFactory;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicLong;
35  import javax.crypto.Cipher;
36  import javax.crypto.SecretKey;
37  
38  import com.sleepycat.je.LockConflictException;
39  import org.apache.flume.Event;
40  import org.apache.flume.event.SimpleEvent;
41  import org.apache.logging.log4j.LoggingException;
42  import org.apache.logging.log4j.core.appender.ManagerFactory;
43  import org.apache.logging.log4j.core.config.Property;
44  import org.apache.logging.log4j.core.config.plugins.PluginManager;
45  import org.apache.logging.log4j.core.config.plugins.PluginType;
46  import org.apache.logging.log4j.core.helpers.FileUtils;
47  import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
48  import org.apache.logging.log4j.core.helpers.Strings;
49  
50  import com.sleepycat.je.Cursor;
51  import com.sleepycat.je.CursorConfig;
52  import com.sleepycat.je.Database;
53  import com.sleepycat.je.DatabaseConfig;
54  import com.sleepycat.je.DatabaseEntry;
55  import com.sleepycat.je.Environment;
56  import com.sleepycat.je.EnvironmentConfig;
57  import com.sleepycat.je.LockMode;
58  import com.sleepycat.je.OperationStatus;
59  import com.sleepycat.je.StatsConfig;
60  import com.sleepycat.je.Transaction;
61  
62  /**
63   * Manager that persists data to Berkeley DB before passing it on to Flume.
64   */
65  public class FlumePersistentManager extends FlumeAvroManager {
66  
67      /** Attribute name for the key provider. */
68      public static final String KEY_PROVIDER = "keyProvider";
69  
70      private static final Charset UTF8 = Charset.forName("UTF-8");
71  
72      private static final String SHUTDOWN = "Shutdown";
73  
74      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
75  
76      private static final int SHUTDOWN_WAIT = 60;
77  
78      private static final int MILLIS_PER_SECOND = 1000;
79  
80      private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
81  
82      private static BDBManagerFactory factory = new BDBManagerFactory();
83  
84      private final Database database;
85  
86      private final Environment environment;
87  
88      private final WriterThread worker;
89  
90      private final Gate gate = new Gate();
91  
92      private final SecretKey secretKey;
93  
94      private final int delay;
95  
96      private final int lockTimeoutRetryCount;
97  
98      private final ExecutorService threadPool;
99  
100     private AtomicLong dbCount = new AtomicLong();
101 
102     /**
103      * Constructor
104      * @param name The unique name of this manager.
105      * @param shortName Original name for the Manager.
106      * @param agents An array of Agents.
107      * @param batchSize The number of events to include in a batch.
108      * @param retries The number of times to retry connecting before giving up.
109      * @param connectionTimeout The amount of time to wait for a connection to be established.
110      * @param requestTimeout The amount of time to wair for a response to a request.
111      * @param delay The amount of time to wait between retries.
112      * @param database The database to write to.
113      * @param environment The database environment.
114      * @param secretKey The SecretKey to use for encryption.
115      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
116      */
117     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
118                                      final int batchSize, final int retries, final int connectionTimeout,
119                                      final int requestTimeout, final int delay, final Database database,
120                                      final Environment environment, final SecretKey secretKey,
121                                      final int lockTimeoutRetryCount) {
122         super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
123         this.delay = delay;
124         this.database = database;
125         this.environment = environment;
126         dbCount.set(database.count());
127         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
128             lockTimeoutRetryCount);
129         this.worker.start();
130         this.secretKey = secretKey;
131         this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
132         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
133     }
134 
135 
136     /**
137      * Returns a FlumeAvroManager.
138      * @param name The name of the manager.
139      * @param agents The agents to use.
140      * @param properties Properties to pass to the Manager.
141      * @param batchSize The number of events to include in a batch.
142      * @param retries The number of times to retry connecting before giving up.
143      * @param connectionTimeout The amount of time to wait to establish a connection.
144      * @param requestTimeout The amount of time to wait for a response to a request.
145      * @param delay Amount of time to delay before delivering a batch.
146      * @param dataDir The location of the Berkeley database.
147      * @return A FlumeAvroManager.
148      */
149     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150                                                     final Property[] properties, int batchSize, final int retries,
151                                                     final int connectionTimeout, final int requestTimeout,
152                                                     final int delay, final int lockTimeoutRetryCount,
153                                                     final String dataDir) {
154         if (agents == null || agents.length == 0) {
155             throw new IllegalArgumentException("At least one agent is required");
156         }
157 
158         if (batchSize <= 0) {
159             batchSize = 1;
160         }
161         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162 
163         final StringBuilder sb = new StringBuilder("FlumePersistent[");
164         boolean first = true;
165         for (final Agent agent : agents) {
166             if (!first) {
167                 sb.append(",");
168             }
169             sb.append(agent.getHost()).append(":").append(agent.getPort());
170             first = false;
171         }
172         sb.append("]");
173         sb.append(" ").append(dataDirectory);
174         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175             connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
176     }
177 
178     @Override
179     public void send(final Event event)  {
180         if (worker.isShutdown()) {
181             throw new LoggingException("Unable to record event");
182         }
183 
184         final Map<String, String> headers = event.getHeaders();
185         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186         try {
187             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188             final DataOutputStream daos = new DataOutputStream(baos);
189             daos.writeInt(event.getBody().length);
190             daos.write(event.getBody(), 0, event.getBody().length);
191             daos.writeInt(event.getHeaders().size());
192             for (final Map.Entry<String, String> entry : headers.entrySet()) {
193                 daos.writeUTF(entry.getKey());
194                 daos.writeUTF(entry.getValue());
195             }
196             byte[] eventData = baos.toByteArray();
197             if (secretKey != null) {
198                 final Cipher cipher = Cipher.getInstance("AES");
199                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200                 eventData = cipher.doFinal(eventData);
201             }
202             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204             boolean interrupted = false;
205             int count = 0;
206             do {
207                 try {
208                     future.get();
209                 } catch (final InterruptedException ie) {
210                     interrupted = true;
211                     ++count;
212                 }
213             } while (interrupted && count <= 1);
214 
215         } catch (final Exception ex) {
216             throw new LoggingException("Exception occurred writing log event", ex);
217         }
218     }
219 
220     @Override
221     protected void releaseSub() {
222         LOGGER.debug("Shutting down FlumePersistentManager");
223         worker.shutdown();
224         try {
225             worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226         } catch(InterruptedException ie) {
227             // Ignore the exception and shutdown.
228         }
229         threadPool.shutdown();
230         try {
231             threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232         } catch (final InterruptedException ie) {
233             LOGGER.warn("PersistentManager Thread pool failed to shut down");
234         }
235         try {
236             worker.join();
237         } catch (final InterruptedException ex) {
238             LOGGER.debug("Interrupted while waiting for worker to complete");
239         }
240         try {
241             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242             database.close();
243         } catch (final Exception ex) {
244             LOGGER.warn("Failed to close database", ex);
245         }
246         try {
247             environment.cleanLog();
248             environment.close();
249         } catch (final Exception ex) {
250             LOGGER.warn("Failed to close environment", ex);
251         }
252         super.releaseSub();
253     }
254 
255     private void doSend(final SimpleEvent event) {
256         LOGGER.debug("Sending event to Flume");
257         super.send(event);
258     }
259 
260     /**
261      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
262      */
263     private static class BDBWriter implements Callable<Integer> {
264         private final byte[] eventData;
265         private final byte[] keyData;
266         private final Environment environment;
267         private final Database database;
268         private final Gate gate;
269         private final AtomicLong dbCount;
270         private final long batchSize;
271         private final int lockTimeoutRetryCount;
272 
273         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275                          final int lockTimeoutRetryCount) {
276             this.keyData = keyData;
277             this.eventData = eventData;
278             this.environment = environment;
279             this.database = database;
280             this.gate = gate;
281             this.dbCount = dbCount;
282             this.batchSize = batchSize;
283             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284         }
285 
286         @Override
287         public Integer call() throws Exception {
288             final DatabaseEntry key = new DatabaseEntry(keyData);
289             final DatabaseEntry data = new DatabaseEntry(eventData);
290             Exception exception = null;
291             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292                 Transaction txn = null;
293                 try {
294                     txn = environment.beginTransaction(null, null);
295                     try {
296                         database.put(txn, key, data);
297                         txn.commit();
298                         txn = null;
299                         if (dbCount.incrementAndGet() >= batchSize) {
300                             gate.open();
301                         }
302                         exception = null;
303                         break;
304                     } catch (final LockConflictException lce) {
305                         exception = lce;
306                         // Fall through and retry.
307                     } catch (final Exception ex) {
308                         if (txn != null) {
309                             txn.abort();
310                         }
311                         throw ex;
312                     } finally {
313                         if (txn != null) {
314                             txn.abort();
315                             txn = null;
316                         }
317                     }
318                 } catch (LockConflictException lce) {
319                     exception = lce;
320                     if (txn != null) {
321                         try {
322                             txn.abort();
323                             txn = null;
324                         } catch (Exception ex) {
325                             // Ignore exception
326                         }
327                     }
328 
329                 }
330                 try {
331                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332                 } catch (InterruptedException ie) {
333                     // Ignore the error
334                 }
335             }
336             if (exception != null) {
337                 throw exception;
338             }
339             return eventData.length;
340         }
341     }
342 
343     /**
344      * Factory data.
345      */
346     private static class FactoryData {
347         private final String name;
348         private final Agent[] agents;
349         private final int batchSize;
350         private final String dataDir;
351         private final int retries;
352         private final int connectionTimeout;
353         private final int requestTimeout;
354         private final int delay;
355         private final int lockTimeoutRetryCount;
356         private final Property[] properties;
357 
358         /**
359          * Constructor.
360          * @param name The name of the Appender.
361          * @param agents The agents.
362          * @param batchSize The number of events to include in a batch.
363          * @param dataDir The directory for data.
364          */
365         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366                            final int connectionTimeout, final int requestTimeout, final int delay,
367                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368             this.name = name;
369             this.agents = agents;
370             this.batchSize = batchSize;
371             this.dataDir = dataDir;
372             this.retries = retries;
373             this.connectionTimeout = connectionTimeout;
374             this.requestTimeout = requestTimeout;
375             this.delay = delay;
376             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377             this.properties = properties;
378         }
379     }
380 
381     /**
382      * Avro Manager Factory.
383      */
384     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385 
386         /**
387          * Create the FlumeKratiManager.
388          * @param name The name of the entity to manage.
389          * @param data The data required to create the entity.
390          * @return The FlumeKratiManager.
391          */
392         @Override
393         public FlumePersistentManager createManager(final String name, final FactoryData data) {
394             SecretKey secretKey = null;
395 
396             Database database;
397             Environment environment;
398 
399             final Map<String, String> properties = new HashMap<String, String>();
400             if (data.properties != null) {
401                 for (final Property property : data.properties) {
402                     properties.put(property.getName(), property.getValue());
403                 }
404             }
405 
406             try {
407 
408                 final File dir = new File(data.dataDir);
409                 FileUtils.mkdir(dir, true);
410                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
411                 dbEnvConfig.setTransactional(true);
412                 dbEnvConfig.setAllowCreate(true);
413                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
414                 environment = new Environment(dir, dbEnvConfig);
415                 final DatabaseConfig dbConfig = new DatabaseConfig();
416                 dbConfig.setTransactional(true);
417                 dbConfig.setAllowCreate(true);
418                 database = environment.openDatabase(null, name, dbConfig);
419             } catch (final Exception ex) {
420                 LOGGER.error("Could not create FlumePersistentManager", ex);
421                 return null;
422             }
423 
424             try {
425                 String key = null;
426                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
427                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
428                         key = entry.getValue();
429                         break;
430                     }
431                 }
432                 if (key != null) {
433                     final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
434                     manager.collectPlugins();
435                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
436                     if (plugins != null) {
437                         boolean found = false;
438                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
439                             if (entry.getKey().equalsIgnoreCase(key)) {
440                                 found = true;
441                                 final Class<?> cl = entry.getValue().getPluginClass();
442                                 try {
443                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
444                                     secretKey = provider.getSecretKey();
445                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
446                                 } catch (final Exception ex) {
447                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
448                                         cl.getName());
449                                 }
450                                 break;
451                             }
452                         }
453                         if (!found) {
454                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
455                         }
456                     } else {
457                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
458                     }
459                 }
460             } catch (final Exception ex) {
461                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
462             }
463             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
464                 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
465                 data.lockTimeoutRetryCount);
466         }
467     }
468 
469     /**
470      * Thread that sends data to Flume and pulls it from Berkeley DB.
471      */
472     private static class WriterThread extends Thread  {
473         private volatile boolean shutdown = false;
474         private final Database database;
475         private final Environment environment;
476         private final FlumePersistentManager manager;
477         private final Gate gate;
478         private final SecretKey secretKey;
479         private final int batchSize;
480         private final AtomicLong dbCounter;
481         private final int lockTimeoutRetryCount;
482 
483         public WriterThread(final Database database, final Environment environment,
484                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
485                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
486             this.database = database;
487             this.environment = environment;
488             this.manager = manager;
489             this.gate = gate;
490             this.batchSize = batchsize;
491             this.secretKey = secretKey;
492             this.setDaemon(true);
493             this.dbCounter = dbCount;
494             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
495         }
496 
497         public void shutdown() {
498             LOGGER.debug("Writer thread shutting down");
499             this.shutdown = true;
500             gate.open();
501         }
502 
503         public boolean isShutdown() {
504             return shutdown;
505         }
506 
507         @Override
508         public void run() {
509             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
510             long nextBatch = System.currentTimeMillis() + manager.delay;
511             while (!shutdown) {
512                 long now = System.currentTimeMillis();
513                 long dbCount = database.count();
514                 dbCounter.set(dbCount);
515                 if (dbCount >= batchSize || (dbCount > 0 && nextBatch <= now)) {
516                     nextBatch = now + manager.delay;
517                     try {
518                         boolean errors = false;
519                         DatabaseEntry key = new DatabaseEntry();
520                         final DatabaseEntry data = new DatabaseEntry();
521 
522                         gate.close();
523                         OperationStatus status;
524                         if (batchSize > 1) {
525                             try {
526                                 errors = sendBatch(key, data);
527                             } catch (final Exception ex) {
528                                 break;
529                             }
530                         } else {
531                             Exception exception = null;
532                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
533                                 exception = null;
534                                 Transaction txn = null;
535                                 Cursor cursor = null;
536                                 try {
537                                     txn = environment.beginTransaction(null, null);
538                                     cursor = database.openCursor(txn, null);
539                                     try {
540                                         status = cursor.getFirst(key, data, LockMode.RMW);
541                                         while (status == OperationStatus.SUCCESS) {
542                                             final SimpleEvent event = createEvent(data);
543                                             if (event != null) {
544                                                 try {
545                                                     manager.doSend(event);
546                                                 } catch (final Exception ioe) {
547                                                     errors = true;
548                                                     LOGGER.error("Error sending event", ioe);
549                                                     break;
550                                                 }
551                                                 try {
552                                                     cursor.delete();
553                                                 } catch (final Exception ex) {
554                                                     LOGGER.error("Unable to delete event", ex);
555                                                 }
556                                             }
557                                             status = cursor.getNext(key, data, LockMode.RMW);
558                                         }
559                                         if (cursor != null) {
560                                             cursor.close();
561                                             cursor = null;
562                                         }
563                                         txn.commit();
564                                         txn = null;
565                                         dbCounter.decrementAndGet();
566                                         exception = null;
567                                         break;
568                                     } catch (final LockConflictException lce) {
569                                         exception = lce;
570                                         // Fall through and retry.
571                                     } catch (final Exception ex) {
572                                         LOGGER.error("Error reading or writing to database", ex);
573                                         shutdown = true;
574                                         break;
575                                     } finally {
576                                         if (cursor != null) {
577                                             cursor.close();
578                                             cursor = null;
579                                         }
580                                         if (txn != null) {
581                                             txn.abort();
582                                             txn = null;
583                                         }
584                                     }
585                                 } catch (LockConflictException lce) {
586                                     exception = lce;
587                                     if (cursor != null) {
588                                         try {
589                                             cursor.close();
590                                             cursor = null;
591                                         } catch (Exception ex) {
592                                             // Ignore exception
593                                         }
594                                     }
595                                     if (txn != null) {
596                                         try {
597                                             txn.abort();
598                                             txn = null;
599                                         } catch (Exception ex) {
600                                             // Ignore exception
601                                         }
602                                     }
603                                 }
604                                 try {
605                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
606                                 } catch (InterruptedException ie) {
607                                     // Ignore the error
608                                 }
609                             }
610                             if (exception != null) {
611                                 LOGGER.error("Unable to read or update data base", exception);
612                             }
613                         }
614                         if (errors) {
615                             Thread.sleep(manager.delay);
616                             continue;
617                         }
618                     } catch (final Exception ex) {
619                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
620                     }
621                 } else {
622                     if (nextBatch <= now) {
623                         nextBatch = now + manager.delay;
624                     }
625                     try {
626                         final long interval = nextBatch - now;
627                         gate.waitForOpen(interval);
628                     } catch (final InterruptedException ie) {
629                         LOGGER.warn("WriterThread interrupted, continuing");
630                     } catch (final Exception ex) {
631                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
632                         break;
633                     }
634                 }
635             }
636 
637             if (batchSize > 1 && database.count() > 0) {
638                 DatabaseEntry key = new DatabaseEntry();
639                 final DatabaseEntry data = new DatabaseEntry();
640                 try {
641                     sendBatch(key, data);
642                 } catch (final Exception ex) {
643                     LOGGER.warn("Unable to write final batch");
644                 }
645             }
646             LOGGER.trace("WriterThread exiting");
647         }
648 
649         private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception {
650             boolean errors = false;
651             OperationStatus status;
652             Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
653             try {
654                 status = cursor.getFirst(key, data, null);
655 
656                 final BatchEvent batch = new BatchEvent();
657                 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
658                     final SimpleEvent event = createEvent(data);
659                     if (event != null) {
660                         batch.addEvent(event);
661                     }
662                     status = cursor.getNext(key, data, null);
663                 }
664                 try {
665                     manager.send(batch);
666                 } catch (final Exception ioe) {
667                     LOGGER.error("Error sending events", ioe);
668                     errors = true;
669                 }
670                 if (!errors) {
671                     cursor.close();
672                     cursor = null;
673                     Transaction txn = null;
674                     Exception exception = null;
675                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
676                         try {
677                             txn = environment.beginTransaction(null, null);
678                             try {
679                                 for (final Event event : batch.getEvents()) {
680                                     try {
681                                         final Map<String, String> headers = event.getHeaders();
682                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
683                                         database.delete(txn, key);
684                                     } catch (final Exception ex) {
685                                         LOGGER.error("Error deleting key from database", ex);
686                                     }
687                                 }
688                                 txn.commit();
689                                 long count = dbCounter.get();
690                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
691                                     count = dbCounter.get();
692                                 }
693                                 exception = null;
694                                 break;
695                             } catch (final LockConflictException lce) {
696                                 exception = lce;
697                                 if (cursor != null) {
698                                     try {
699                                         cursor.close();
700                                         cursor = null;
701                                     } catch (Exception ex) {
702                                         // Ignore exception
703                                     }
704                                 }
705                                 if (txn != null) {
706                                     try {
707                                         txn.abort();
708                                         txn = null;
709                                     } catch (Exception ex) {
710                                         // Ignore exception
711                                     }
712                                 }
713                             } catch (final Exception ex) {
714                                 LOGGER.error("Unable to commit transaction", ex);
715                                 if (txn != null) {
716                                     txn.abort();
717                                 }
718                             }
719                         } catch (LockConflictException lce) {
720                             exception = lce;
721                             if (cursor != null) {
722                                 try {
723                                     cursor.close();
724                                     cursor = null;
725                                 } catch (Exception ex) {
726                                     // Ignore exception
727                                 }
728                             }
729                             if (txn != null) {
730                                 try {
731                                     txn.abort();
732                                     txn = null;
733                                 } catch (Exception ex) {
734                                     // Ignore exception
735                                 }
736                             }
737                         } finally {
738                             if (cursor != null) {
739                                 cursor.close();
740                                 cursor = null;
741                             }
742                             if (txn != null) {
743                                 txn.abort();
744                                 txn = null;
745                             }
746                         }
747                         try {
748                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
749                         } catch (InterruptedException ie) {
750                             // Ignore the error
751                         }
752                     }
753                     if (exception != null) {
754                         LOGGER.error("Unable to delete events from data base", exception);
755                     }
756                 }
757             } catch (final Exception ex) {
758                 LOGGER.error("Error reading database", ex);
759                 shutdown = true;
760                 throw ex;
761             } finally {
762                 if (cursor != null) {
763                     cursor.close();
764                 }
765             }
766 
767             return errors;
768         }
769 
770         private SimpleEvent createEvent(final DatabaseEntry data) {
771             final SimpleEvent event = new SimpleEvent();
772             try {
773                 byte[] eventData = data.getData();
774                 if (secretKey != null) {
775                     final Cipher cipher = Cipher.getInstance("AES");
776                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
777                     eventData = cipher.doFinal(eventData);
778                 }
779                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
780                 final DataInputStream dais = new DataInputStream(bais);
781                 int length = dais.readInt();
782                 final byte[] bytes = new byte[length];
783                 dais.read(bytes, 0, length);
784                 event.setBody(bytes);
785                 length = dais.readInt();
786                 final Map<String, String> map = new HashMap<String, String>(length);
787                 for (int i = 0; i < length; ++i) {
788                     final String headerKey = dais.readUTF();
789                     final String value = dais.readUTF();
790                     map.put(headerKey, value);
791                 }
792                 event.setHeaders(map);
793                 return event;
794             } catch (final Exception ex) {
795                 LOGGER.error("Error retrieving event", ex);
796                 return null;
797             }
798         }
799 
800     }
801 
802     /**
803      * Factory that creates Daemon threads that can be properly shut down.
804      */
805     private static class DaemonThreadFactory implements ThreadFactory {
806         private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
807         private final ThreadGroup group;
808         private final AtomicInteger threadNumber = new AtomicInteger(1);
809         private final String namePrefix;
810 
811         public DaemonThreadFactory() {
812             final SecurityManager securityManager = System.getSecurityManager();
813             group = (securityManager != null) ? securityManager.getThreadGroup() :
814                 Thread.currentThread().getThreadGroup();
815             namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
816         }
817 
818         @Override
819 	    public Thread newThread(final Runnable r) {
820             final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
821             thread.setDaemon(true);
822             if (thread.getPriority() != Thread.NORM_PRIORITY) {
823                 thread.setPriority(Thread.NORM_PRIORITY);
824             }
825             return thread;
826         }
827     }
828 
829     private static class Gate {
830 
831         private boolean isOpen = false;
832 
833         public boolean isOpen() {
834             return isOpen;
835         }
836 
837         public synchronized void open() {
838             isOpen = true;
839             notifyAll();
840         }
841 
842         public synchronized void close() {
843             isOpen = false;
844         }
845 
846         public synchronized void waitForOpen(long timeout) throws InterruptedException {
847             wait(timeout);
848         }
849     }
850 }