View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.DataInputStream;
22  import java.io.DataOutputStream;
23  import java.io.File;
24  import java.nio.charset.Charset;
25  import java.nio.charset.StandardCharsets;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.Future;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicLong;
34  import javax.crypto.Cipher;
35  import javax.crypto.SecretKey;
36  
37  import com.sleepycat.je.Cursor;
38  import com.sleepycat.je.CursorConfig;
39  import com.sleepycat.je.Database;
40  import com.sleepycat.je.DatabaseConfig;
41  import com.sleepycat.je.DatabaseEntry;
42  import com.sleepycat.je.Environment;
43  import com.sleepycat.je.EnvironmentConfig;
44  import com.sleepycat.je.LockConflictException;
45  import com.sleepycat.je.LockMode;
46  import com.sleepycat.je.OperationStatus;
47  import com.sleepycat.je.StatsConfig;
48  import com.sleepycat.je.Transaction;
49  import org.apache.flume.Event;
50  import org.apache.flume.event.SimpleEvent;
51  import org.apache.logging.log4j.LoggingException;
52  import org.apache.logging.log4j.core.appender.ManagerFactory;
53  import org.apache.logging.log4j.core.config.Property;
54  import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
55  import org.apache.logging.log4j.core.config.plugins.util.PluginType;
56  import org.apache.logging.log4j.core.util.ExecutorServices;
57  import org.apache.logging.log4j.core.util.FileUtils;
58  import org.apache.logging.log4j.core.util.Log4jThread;
59  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
60  import org.apache.logging.log4j.core.util.SecretKeyProvider;
61  import org.apache.logging.log4j.util.Strings;
62  
63  /**
64   * Manager that persists data to Berkeley DB before passing it on to Flume.
65   */
66  public class FlumePersistentManager extends FlumeAvroManager {
67  
68      /** Attribute name for the key provider. */
69      public static final String KEY_PROVIDER = "keyProvider";
70  
71      private static final Charset UTF8 = StandardCharsets.UTF_8;
72  
73      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74  
75      private static final long SHUTDOWN_WAIT_MILLIS = 60000;
76  
77      private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
78  
79      private static BDBManagerFactory factory = new BDBManagerFactory();
80  
81      private final Database database;
82  
83      private final Environment environment;
84  
85      private final WriterThread worker;
86  
87      private final Gate gate = new Gate();
88  
89      private final SecretKey secretKey;
90  
91      private final int lockTimeoutRetryCount;
92  
93      private final ExecutorService threadPool;
94  
95      private final AtomicLong dbCount = new AtomicLong();
96  
97      /**
98       * Constructor
99       * @param name The unique name of this manager.
100      * @param shortName Original name for the Manager.
101      * @param agents An array of Agents.
102      * @param batchSize The number of events to include in a batch.
103      * @param retries The number of times to retry connecting before giving up.
104      * @param connectionTimeout The amount of time to wait for a connection to be established.
105      * @param requestTimeout The amount of time to wair for a response to a request.
106      * @param delay The amount of time to wait between retries.
107      * @param database The database to write to.
108      * @param environment The database environment.
109      * @param secretKey The SecretKey to use for encryption.
110      * @param lockTimeoutRetryCount The number of times to retry a lock timeout.
111      */
112     protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
113                                      final int batchSize, final int retries, final int connectionTimeout,
114                                      final int requestTimeout, final int delay, final Database database,
115                                      final Environment environment, final SecretKey secretKey,
116                                      final int lockTimeoutRetryCount) {
117         super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
118         this.database = database;
119         this.environment = environment;
120         dbCount.set(database.count());
121         this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
122             lockTimeoutRetryCount);
123         this.worker.start();
124         this.secretKey = secretKey;
125         this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
126         this.lockTimeoutRetryCount = lockTimeoutRetryCount;
127     }
128 
129 
130     /**
131      * Returns a FlumeAvroManager.
132      * @param name The name of the manager.
133      * @param agents The agents to use.
134      * @param properties Properties to pass to the Manager.
135      * @param batchSize The number of events to include in a batch.
136      * @param retries The number of times to retry connecting before giving up.
137      * @param connectionTimeout The amount of time to wait to establish a connection.
138      * @param requestTimeout The amount of time to wait for a response to a request.
139      * @param delayMillis Amount of time to delay before delivering a batch.
140      * @param lockTimeoutRetryCount The number of times to retry after a lock timeout.
141      * @param dataDir The location of the Berkeley database.
142      * @return A FlumeAvroManager.
143      */
144     public static FlumePersistentManager getManager(final String name, final Agent[] agents,
145                                                     final Property[] properties, int batchSize, final int retries,
146                                                     final int connectionTimeout, final int requestTimeout,
147                                                     final int delayMillis, final int lockTimeoutRetryCount,
148                                                     final String dataDir) {
149         if (agents == null || agents.length == 0) {
150             throw new IllegalArgumentException("At least one agent is required");
151         }
152 
153         if (batchSize <= 0) {
154             batchSize = 1;
155         }
156         final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
157 
158         final StringBuilder sb = new StringBuilder("FlumePersistent[");
159         boolean first = true;
160         for (final Agent agent : agents) {
161             if (!first) {
162                 sb.append(',');
163             }
164             sb.append(agent.getHost()).append(':').append(agent.getPort());
165             first = false;
166         }
167         sb.append(']');
168         sb.append(' ').append(dataDirectory);
169         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
170             connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
171     }
172 
173     @Override
174     public void send(final Event event)  {
175         if (worker.isShutdown()) {
176             throw new LoggingException("Unable to record event");
177         }
178 
179         final Map<String, String> headers = event.getHeaders();
180         final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
181         try {
182             final ByteArrayOutputStream baos = new ByteArrayOutputStream();
183             final DataOutputStream daos = new DataOutputStream(baos);
184             daos.writeInt(event.getBody().length);
185             daos.write(event.getBody(), 0, event.getBody().length);
186             daos.writeInt(event.getHeaders().size());
187             for (final Map.Entry<String, String> entry : headers.entrySet()) {
188                 daos.writeUTF(entry.getKey());
189                 daos.writeUTF(entry.getValue());
190             }
191             byte[] eventData = baos.toByteArray();
192             if (secretKey != null) {
193                 final Cipher cipher = Cipher.getInstance("AES");
194                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
195                 eventData = cipher.doFinal(eventData);
196             }
197             final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
198                 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
199             boolean interrupted = false;
200             int ieCount = 0;
201             do {
202                 try {
203                     future.get();
204                 } catch (final InterruptedException ie) {
205                     interrupted = true;
206                     ++ieCount;
207                 }
208             } while (interrupted && ieCount <= 1);
209 
210         } catch (final Exception ex) {
211             throw new LoggingException("Exception occurred writing log event", ex);
212         }
213     }
214 
215     @Override
216     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
217     	boolean closed = true;
218         LOGGER.debug("Shutting down FlumePersistentManager");
219         worker.shutdown();
220         final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
221         final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS;
222 		try {
223             worker.join(shutdownWaitMillis);
224         } catch (final InterruptedException ie) {
225             // Ignore the exception and shutdown.
226         }
227         ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString());
228         try {
229             worker.join();
230         } catch (final InterruptedException ex) {
231             logDebug("interrupted while waiting for worker to complete", ex);
232         }
233         try {
234             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
235             database.close();
236         } catch (final Exception ex) {
237             logWarn("Failed to close database", ex);
238             closed = false;
239         }
240         try {
241             environment.cleanLog();
242             environment.close();
243         } catch (final Exception ex) {
244             logWarn("Failed to close environment", ex);
245             closed = false;
246         }
247         return closed && super.releaseSub(timeout, timeUnit);
248     }
249 
250     private void doSend(final SimpleEvent event) {
251         LOGGER.debug("Sending event to Flume");
252         super.send(event);
253     }
254 
255     /**
256      * Thread for writing to Berkeley DB to avoid having interrupts close the database.
257      */
258     private static class BDBWriter implements Callable<Integer> {
259         private final byte[] eventData;
260         private final byte[] keyData;
261         private final Environment environment;
262         private final Database database;
263         private final Gate gate;
264         private final AtomicLong dbCount;
265         private final long batchSize;
266         private final int lockTimeoutRetryCount;
267 
268         public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
269                          final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
270                          final int lockTimeoutRetryCount) {
271             this.keyData = keyData;
272             this.eventData = eventData;
273             this.environment = environment;
274             this.database = database;
275             this.gate = gate;
276             this.dbCount = dbCount;
277             this.batchSize = batchSize;
278             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
279         }
280 
281         @Override
282         public Integer call() throws Exception {
283             final DatabaseEntry key = new DatabaseEntry(keyData);
284             final DatabaseEntry data = new DatabaseEntry(eventData);
285             Exception exception = null;
286             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
287                 Transaction txn = null;
288                 try {
289                     txn = environment.beginTransaction(null, null);
290                     try {
291                         database.put(txn, key, data);
292                         txn.commit();
293                         txn = null;
294                         if (dbCount.incrementAndGet() >= batchSize) {
295                             gate.open();
296                         }
297                         exception = null;
298                         break;
299                     } catch (final LockConflictException lce) {
300                         exception = lce;
301                         // Fall through and retry.
302                     } catch (final Exception ex) {
303                         if (txn != null) {
304                             txn.abort();
305                         }
306                         throw ex;
307                     } finally {
308                         if (txn != null) {
309                             txn.abort();
310                             txn = null;
311                         }
312                     }
313                 } catch (final LockConflictException lce) {
314                     exception = lce;
315                     if (txn != null) {
316                         try {
317                             txn.abort();
318                             txn = null;
319                         } catch (final Exception ex) {
320                             LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
321                         }
322                     }
323 
324                 }
325                 try {
326                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
327                 } catch (final InterruptedException ie) {
328                     // Ignore the error
329                 }
330             }
331             if (exception != null) {
332                 throw exception;
333             }
334             return eventData.length;
335         }
336     }
337 
338     /**
339      * Factory data.
340      */
341     private static class FactoryData {
342         private final String name;
343         private final Agent[] agents;
344         private final int batchSize;
345         private final String dataDir;
346         private final int retries;
347         private final int connectionTimeout;
348         private final int requestTimeout;
349         private final int delayMillis;
350         private final int lockTimeoutRetryCount;
351         private final Property[] properties;
352 
353         /**
354          * Constructor.
355          * @param name The name of the Appender.
356          * @param agents The agents.
357          * @param batchSize The number of events to include in a batch.
358          * @param dataDir The directory for data.
359          */
360         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
361                            final int connectionTimeout, final int requestTimeout, final int delayMillis,
362                            final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
363             this.name = name;
364             this.agents = agents;
365             this.batchSize = batchSize;
366             this.dataDir = dataDir;
367             this.retries = retries;
368             this.connectionTimeout = connectionTimeout;
369             this.requestTimeout = requestTimeout;
370             this.delayMillis = delayMillis;
371             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
372             this.properties = properties;
373         }
374     }
375 
376     /**
377      * Avro Manager Factory.
378      */
379     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
380 
381         /**
382          * Create the FlumeKratiManager.
383          * @param name The name of the entity to manage.
384          * @param data The data required to create the entity.
385          * @return The FlumeKratiManager.
386          */
387         @Override
388         public FlumePersistentManager createManager(final String name, final FactoryData data) {
389             SecretKey secretKey = null;
390             Database database = null;
391             Environment environment = null;
392 
393             final Map<String, String> properties = new HashMap<>();
394             if (data.properties != null) {
395                 for (final Property property : data.properties) {
396                     properties.put(property.getName(), property.getValue());
397                 }
398             }
399 
400             try {
401                 final File dir = new File(data.dataDir);
402                 FileUtils.mkdir(dir, true);
403                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
404                 dbEnvConfig.setTransactional(true);
405                 dbEnvConfig.setAllowCreate(true);
406                 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
407                 environment = new Environment(dir, dbEnvConfig);
408                 final DatabaseConfig dbConfig = new DatabaseConfig();
409                 dbConfig.setTransactional(true);
410                 dbConfig.setAllowCreate(true);
411                 database = environment.openDatabase(null, name, dbConfig);
412             } catch (final Exception ex) {
413                 LOGGER.error("Could not create FlumePersistentManager", ex);
414                 // For consistency, close database as well as environment even though it should never happen since the
415                 // database is that last thing in the block above, but this does guard against a future line being
416                 // inserted at the end that would bomb (like some debug logging).
417                 if (database != null) {
418                     database.close();
419                     database = null;
420                 }
421                 if (environment != null) {
422                     environment.close();
423                     environment = null;
424                 }
425                 return null;
426             }
427 
428             try {
429                 String key = null;
430                 for (final Map.Entry<String, String> entry : properties.entrySet()) {
431                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
432                         key = entry.getValue();
433                         break;
434                     }
435                 }
436                 if (key != null) {
437                     final PluginManager manager = new PluginManager("KeyProvider");
438                     manager.collectPlugins();
439                     final Map<String, PluginType<?>> plugins = manager.getPlugins();
440                     if (plugins != null) {
441                         boolean found = false;
442                         for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
443                             if (entry.getKey().equalsIgnoreCase(key)) {
444                                 found = true;
445                                 final Class<?> cl = entry.getValue().getPluginClass();
446                                 try {
447                                     final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
448                                     secretKey = provider.getSecretKey();
449                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
450                                 } catch (final Exception ex) {
451                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
452                                         cl.getName());
453                                 }
454                                 break;
455                             }
456                         }
457                         if (!found) {
458                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
459                         }
460                     } else {
461                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
462                     }
463                 }
464             } catch (final Exception ex) {
465                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
466             }
467             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
468                 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
469                 data.lockTimeoutRetryCount);
470         }
471     }
472 
473     /**
474      * Thread that sends data to Flume and pulls it from Berkeley DB.
475      */
476     private static class WriterThread extends Log4jThread  {
477         private volatile boolean shutdown = false;
478         private final Database database;
479         private final Environment environment;
480         private final FlumePersistentManager manager;
481         private final Gate gate;
482         private final SecretKey secretKey;
483         private final int batchSize;
484         private final AtomicLong dbCounter;
485         private final int lockTimeoutRetryCount;
486 
487         public WriterThread(final Database database, final Environment environment,
488                             final FlumePersistentManager manager, final Gate gate, final int batchsize,
489                             final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
490             super("FlumePersistentManager-Writer");
491             this.database = database;
492             this.environment = environment;
493             this.manager = manager;
494             this.gate = gate;
495             this.batchSize = batchsize;
496             this.secretKey = secretKey;
497             this.setDaemon(true);
498             this.dbCounter = dbCount;
499             this.lockTimeoutRetryCount = lockTimeoutRetryCount;
500         }
501 
502         public void shutdown() {
503             LOGGER.debug("Writer thread shutting down");
504             this.shutdown = true;
505             gate.open();
506         }
507 
508         public boolean isShutdown() {
509             return shutdown;
510         }
511 
512         @Override
513         public void run() {
514             LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
515             long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
516             while (!shutdown) {
517                 final long nowMillis = System.currentTimeMillis();
518                 final long dbCount = database.count();
519                 dbCounter.set(dbCount);
520                 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
521                     nextBatchMillis = nowMillis + manager.getDelayMillis();
522                     try {
523                         boolean errors = false;
524                         final DatabaseEntry key = new DatabaseEntry();
525                         final DatabaseEntry data = new DatabaseEntry();
526 
527                         gate.close();
528                         OperationStatus status;
529                         if (batchSize > 1) {
530                             try {
531                                 errors = sendBatch(key, data);
532                             } catch (final Exception ex) {
533                                 break;
534                             }
535                         } else {
536                             Exception exception = null;
537                             for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
538                                 exception = null;
539                                 Transaction txn = null;
540                                 Cursor cursor = null;
541                                 try {
542                                     txn = environment.beginTransaction(null, null);
543                                     cursor = database.openCursor(txn, null);
544                                     try {
545                                         status = cursor.getFirst(key, data, LockMode.RMW);
546                                         while (status == OperationStatus.SUCCESS) {
547                                             final SimpleEvent event = createEvent(data);
548                                             if (event != null) {
549                                                 try {
550                                                     manager.doSend(event);
551                                                 } catch (final Exception ioe) {
552                                                     errors = true;
553                                                     LOGGER.error("Error sending event", ioe);
554                                                     break;
555                                                 }
556                                                 try {
557                                                     cursor.delete();
558                                                 } catch (final Exception ex) {
559                                                     LOGGER.error("Unable to delete event", ex);
560                                                 }
561                                             }
562                                             status = cursor.getNext(key, data, LockMode.RMW);
563                                         }
564                                         if (cursor != null) {
565                                             cursor.close();
566                                             cursor = null;
567                                         }
568                                         txn.commit();
569                                         txn = null;
570                                         dbCounter.decrementAndGet();
571                                         exception = null;
572                                         break;
573                                     } catch (final LockConflictException lce) {
574                                         exception = lce;
575                                         // Fall through and retry.
576                                     } catch (final Exception ex) {
577                                         LOGGER.error("Error reading or writing to database", ex);
578                                         shutdown = true;
579                                         break;
580                                     } finally {
581                                         if (cursor != null) {
582                                             cursor.close();
583                                             cursor = null;
584                                         }
585                                         if (txn != null) {
586                                             txn.abort();
587                                             txn = null;
588                                         }
589                                     }
590                                 } catch (final LockConflictException lce) {
591                                     exception = lce;
592                                     if (cursor != null) {
593                                         try {
594                                             cursor.close();
595                                             cursor = null;
596                                         } catch (final Exception ex) {
597                                             LOGGER.trace("Ignored exception closing cursor during lock conflict.");
598                                         }
599                                     }
600                                     if (txn != null) {
601                                         try {
602                                             txn.abort();
603                                             txn = null;
604                                         } catch (final Exception ex) {
605                                             LOGGER.trace("Ignored exception aborting tx during lock conflict.");
606                                         }
607                                     }
608                                 }
609                                 try {
610                                     Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
611                                 } catch (final InterruptedException ie) {
612                                     // Ignore the error
613                                 }
614                             }
615                             if (exception != null) {
616                                 LOGGER.error("Unable to read or update data base", exception);
617                             }
618                         }
619                         if (errors) {
620                             Thread.sleep(manager.getDelayMillis());
621                             continue;
622                         }
623                     } catch (final Exception ex) {
624                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
625                     }
626                 } else {
627                     if (nextBatchMillis <= nowMillis) {
628                         nextBatchMillis = nowMillis + manager.getDelayMillis();
629                     }
630                     try {
631                         final long interval = nextBatchMillis - nowMillis;
632                         gate.waitForOpen(interval);
633                     } catch (final InterruptedException ie) {
634                         LOGGER.warn("WriterThread interrupted, continuing");
635                     } catch (final Exception ex) {
636                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
637                         break;
638                     }
639                 }
640             }
641 
642             if (batchSize > 1 && database.count() > 0) {
643                 final DatabaseEntry key = new DatabaseEntry();
644                 final DatabaseEntry data = new DatabaseEntry();
645                 try {
646                     sendBatch(key, data);
647                 } catch (final Exception ex) {
648                     LOGGER.warn("Unable to write final batch");
649                 }
650             }
651             LOGGER.trace("WriterThread exiting");
652         }
653 
654         private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
655             boolean errors = false;
656             OperationStatus status;
657             Cursor cursor = null;
658             try {
659             	final BatchEvent batch = new BatchEvent();
660             	for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
661             		try {
662             			cursor = database.openCursor(null, CursorConfig.DEFAULT);
663             			status = cursor.getFirst(key, data, null);
664 
665             			for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
666             				final SimpleEvent event = createEvent(data);
667             				if (event != null) {
668             					batch.addEvent(event);
669             				}
670             				status = cursor.getNext(key, data, null);
671             			}
672             			break;
673             		} catch (final LockConflictException lce) {
674             			if (cursor != null) {
675             				try {
676                                 cursor.close();
677                                 cursor = null;
678                             } catch (final Exception ex) {
679                                 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
680                             }
681                         }
682                     }
683             	}
684 
685                 try {
686                     manager.send(batch);
687                 } catch (final Exception ioe) {
688                     LOGGER.error("Error sending events", ioe);
689                     errors = true;
690                 }
691                 if (!errors) {
692                 	if (cursor != null) {
693 	                    cursor.close();
694 	                    cursor = null;
695                 	}
696                     Transaction txn = null;
697                     Exception exception = null;
698                     for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
699                         try {
700                             txn = environment.beginTransaction(null, null);
701                             try {
702                                 for (final Event event : batch.getEvents()) {
703                                     try {
704                                         final Map<String, String> headers = event.getHeaders();
705                                         key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
706                                         database.delete(txn, key);
707                                     } catch (final Exception ex) {
708                                         LOGGER.error("Error deleting key from database", ex);
709                                     }
710                                 }
711                                 txn.commit();
712                                 long count = dbCounter.get();
713                                 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
714                                     count = dbCounter.get();
715                                 }
716                                 exception = null;
717                                 break;
718                             } catch (final LockConflictException lce) {
719                                 exception = lce;
720                                 if (cursor != null) {
721                                     try {
722                                         cursor.close();
723                                         cursor = null;
724                                     } catch (final Exception ex) {
725                                         LOGGER.trace("Ignored exception closing cursor during lock conflict.");
726                                     }
727                                 }
728                                 if (txn != null) {
729                                     try {
730                                         txn.abort();
731                                         txn = null;
732                                     } catch (final Exception ex) {
733                                         LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
734                                     }
735                                 }
736                             } catch (final Exception ex) {
737                                 LOGGER.error("Unable to commit transaction", ex);
738                                 if (txn != null) {
739                                     txn.abort();
740                                 }
741                             }
742                         } catch (final LockConflictException lce) {
743                             exception = lce;
744                             if (cursor != null) {
745                                 try {
746                                     cursor.close();
747                                     cursor = null;
748                                 } catch (final Exception ex) {
749                                     LOGGER.trace("Ignored exception closing cursor during lock conflict.");
750                                 }
751                             }
752                             if (txn != null) {
753                                 try {
754                                     txn.abort();
755                                     txn = null;
756                                 } catch (final Exception ex) {
757                                     LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
758                                 }
759                             }
760                         } finally {
761                             if (cursor != null) {
762                                 cursor.close();
763                                 cursor = null;
764                             }
765                             if (txn != null) {
766                                 txn.abort();
767                                 txn = null;
768                             }
769                         }
770                         try {
771                             Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
772                         } catch (final InterruptedException ie) {
773                             // Ignore the error
774                         }
775                     }
776                     if (exception != null) {
777                         LOGGER.error("Unable to delete events from data base", exception);
778                     }
779                 }
780             } catch (final Exception ex) {
781                 LOGGER.error("Error reading database", ex);
782                 shutdown = true;
783                 throw ex;
784             } finally {
785                 if (cursor != null) {
786                     cursor.close();
787                 }
788             }
789 
790             return errors;
791         }
792 
793         private SimpleEvent createEvent(final DatabaseEntry data) {
794             final SimpleEvent event = new SimpleEvent();
795             try {
796                 byte[] eventData = data.getData();
797                 if (secretKey != null) {
798                     final Cipher cipher = Cipher.getInstance("AES");
799                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
800                     eventData = cipher.doFinal(eventData);
801                 }
802                 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
803                 final DataInputStream dais = new DataInputStream(bais);
804                 int length = dais.readInt();
805                 final byte[] bytes = new byte[length];
806                 dais.read(bytes, 0, length);
807                 event.setBody(bytes);
808                 length = dais.readInt();
809                 final Map<String, String> map = new HashMap<>(length);
810                 for (int i = 0; i < length; ++i) {
811                     final String headerKey = dais.readUTF();
812                     final String value = dais.readUTF();
813                     map.put(headerKey, value);
814                 }
815                 event.setHeaders(map);
816                 return event;
817             } catch (final Exception ex) {
818                 LOGGER.error("Error retrieving event", ex);
819                 return null;
820             }
821         }
822 
823     }
824 
825     /**
826      * An internal class.
827      */
828     private static class Gate {
829 
830         private boolean isOpen = false;
831 
832         public boolean isOpen() {
833             return isOpen;
834         }
835 
836         public synchronized void open() {
837             isOpen = true;
838             notifyAll();
839         }
840 
841         public synchronized void close() {
842             isOpen = false;
843         }
844 
845         public synchronized void waitForOpen(final long timeout) throws InterruptedException {
846             wait(timeout);
847         }
848     }
849 }