1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.nio.charset.StandardCharsets;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicLong;
34 import javax.crypto.Cipher;
35 import javax.crypto.SecretKey;
36
37 import com.sleepycat.je.Cursor;
38 import com.sleepycat.je.CursorConfig;
39 import com.sleepycat.je.Database;
40 import com.sleepycat.je.DatabaseConfig;
41 import com.sleepycat.je.DatabaseEntry;
42 import com.sleepycat.je.Environment;
43 import com.sleepycat.je.EnvironmentConfig;
44 import com.sleepycat.je.LockConflictException;
45 import com.sleepycat.je.LockMode;
46 import com.sleepycat.je.OperationStatus;
47 import com.sleepycat.je.StatsConfig;
48 import com.sleepycat.je.Transaction;
49 import org.apache.flume.Event;
50 import org.apache.flume.event.SimpleEvent;
51 import org.apache.logging.log4j.LoggingException;
52 import org.apache.logging.log4j.core.appender.ManagerFactory;
53 import org.apache.logging.log4j.core.config.Property;
54 import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
55 import org.apache.logging.log4j.core.config.plugins.util.PluginType;
56 import org.apache.logging.log4j.core.util.ExecutorServices;
57 import org.apache.logging.log4j.core.util.FileUtils;
58 import org.apache.logging.log4j.core.util.Log4jThread;
59 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
60 import org.apache.logging.log4j.core.util.SecretKeyProvider;
61 import org.apache.logging.log4j.util.Strings;
62
63
64
65
66 public class FlumePersistentManager extends FlumeAvroManager {
67
68
69 public static final String KEY_PROVIDER = "keyProvider";
70
71 private static final Charset UTF8 = StandardCharsets.UTF_8;
72
73 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
74
75 private static final long SHUTDOWN_WAIT_MILLIS = 60000;
76
77 private static final long LOCK_TIMEOUT_SLEEP_MILLIS = 500;
78
79 private static BDBManagerFactory factory = new BDBManagerFactory();
80
81 private final Database database;
82
83 private final Environment environment;
84
85 private final WriterThread worker;
86
87 private final Gate gate = new Gate();
88
89 private final SecretKey secretKey;
90
91 private final int lockTimeoutRetryCount;
92
93 private final ExecutorService threadPool;
94
95 private final AtomicLong dbCount = new AtomicLong();
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
113 final int batchSize, final int retries, final int connectionTimeout,
114 final int requestTimeout, final int delay, final Database database,
115 final Environment environment, final SecretKey secretKey,
116 final int lockTimeoutRetryCount) {
117 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
118 this.database = database;
119 this.environment = environment;
120 dbCount.set(database.count());
121 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
122 lockTimeoutRetryCount);
123 this.worker.start();
124 this.secretKey = secretKey;
125 this.threadPool = Executors.newCachedThreadPool(Log4jThreadFactory.createDaemonThreadFactory("Flume"));
126 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
127 }
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
145 final Property[] properties, int batchSize, final int retries,
146 final int connectionTimeout, final int requestTimeout,
147 final int delayMillis, final int lockTimeoutRetryCount,
148 final String dataDir) {
149 if (agents == null || agents.length == 0) {
150 throw new IllegalArgumentException("At least one agent is required");
151 }
152
153 if (batchSize <= 0) {
154 batchSize = 1;
155 }
156 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
157
158 final StringBuilder sb = new StringBuilder("FlumePersistent[");
159 boolean first = true;
160 for (final Agent agent : agents) {
161 if (!first) {
162 sb.append(',');
163 }
164 sb.append(agent.getHost()).append(':').append(agent.getPort());
165 first = false;
166 }
167 sb.append(']');
168 sb.append(' ').append(dataDirectory);
169 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
170 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
171 }
172
173 @Override
174 public void send(final Event event) {
175 if (worker.isShutdown()) {
176 throw new LoggingException("Unable to record event");
177 }
178
179 final Map<String, String> headers = event.getHeaders();
180 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
181 try {
182 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
183 final DataOutputStream daos = new DataOutputStream(baos);
184 daos.writeInt(event.getBody().length);
185 daos.write(event.getBody(), 0, event.getBody().length);
186 daos.writeInt(event.getHeaders().size());
187 for (final Map.Entry<String, String> entry : headers.entrySet()) {
188 daos.writeUTF(entry.getKey());
189 daos.writeUTF(entry.getValue());
190 }
191 byte[] eventData = baos.toByteArray();
192 if (secretKey != null) {
193 final Cipher cipher = Cipher.getInstance("AES");
194 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
195 eventData = cipher.doFinal(eventData);
196 }
197 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
198 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
199 boolean interrupted = false;
200 int ieCount = 0;
201 do {
202 try {
203 future.get();
204 } catch (final InterruptedException ie) {
205 interrupted = true;
206 ++ieCount;
207 }
208 } while (interrupted && ieCount <= 1);
209
210 } catch (final Exception ex) {
211 throw new LoggingException("Exception occurred writing log event", ex);
212 }
213 }
214
215 @Override
216 protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
217 boolean closed = true;
218 LOGGER.debug("Shutting down FlumePersistentManager");
219 worker.shutdown();
220 final long requestedTimeoutMillis = timeUnit.toMillis(timeout);
221 final long shutdownWaitMillis = requestedTimeoutMillis > 0 ? requestedTimeoutMillis : SHUTDOWN_WAIT_MILLIS;
222 try {
223 worker.join(shutdownWaitMillis);
224 } catch (final InterruptedException ie) {
225
226 }
227 ExecutorServices.shutdown(threadPool, shutdownWaitMillis, TimeUnit.MILLISECONDS, toString());
228 try {
229 worker.join();
230 } catch (final InterruptedException ex) {
231 logDebug("interrupted while waiting for worker to complete", ex);
232 }
233 try {
234 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
235 database.close();
236 } catch (final Exception ex) {
237 logWarn("Failed to close database", ex);
238 closed = false;
239 }
240 try {
241 environment.cleanLog();
242 environment.close();
243 } catch (final Exception ex) {
244 logWarn("Failed to close environment", ex);
245 closed = false;
246 }
247 return closed && super.releaseSub(timeout, timeUnit);
248 }
249
250 private void doSend(final SimpleEvent event) {
251 LOGGER.debug("Sending event to Flume");
252 super.send(event);
253 }
254
255
256
257
258 private static class BDBWriter implements Callable<Integer> {
259 private final byte[] eventData;
260 private final byte[] keyData;
261 private final Environment environment;
262 private final Database database;
263 private final Gate gate;
264 private final AtomicLong dbCount;
265 private final long batchSize;
266 private final int lockTimeoutRetryCount;
267
268 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
269 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
270 final int lockTimeoutRetryCount) {
271 this.keyData = keyData;
272 this.eventData = eventData;
273 this.environment = environment;
274 this.database = database;
275 this.gate = gate;
276 this.dbCount = dbCount;
277 this.batchSize = batchSize;
278 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
279 }
280
281 @Override
282 public Integer call() throws Exception {
283 final DatabaseEntry key = new DatabaseEntry(keyData);
284 final DatabaseEntry data = new DatabaseEntry(eventData);
285 Exception exception = null;
286 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
287 Transaction txn = null;
288 try {
289 txn = environment.beginTransaction(null, null);
290 try {
291 database.put(txn, key, data);
292 txn.commit();
293 txn = null;
294 if (dbCount.incrementAndGet() >= batchSize) {
295 gate.open();
296 }
297 exception = null;
298 break;
299 } catch (final LockConflictException lce) {
300 exception = lce;
301
302 } catch (final Exception ex) {
303 if (txn != null) {
304 txn.abort();
305 }
306 throw ex;
307 } finally {
308 if (txn != null) {
309 txn.abort();
310 txn = null;
311 }
312 }
313 } catch (final LockConflictException lce) {
314 exception = lce;
315 if (txn != null) {
316 try {
317 txn.abort();
318 txn = null;
319 } catch (final Exception ex) {
320 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
321 }
322 }
323
324 }
325 try {
326 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
327 } catch (final InterruptedException ie) {
328
329 }
330 }
331 if (exception != null) {
332 throw exception;
333 }
334 return eventData.length;
335 }
336 }
337
338
339
340
341 private static class FactoryData {
342 private final String name;
343 private final Agent[] agents;
344 private final int batchSize;
345 private final String dataDir;
346 private final int retries;
347 private final int connectionTimeout;
348 private final int requestTimeout;
349 private final int delayMillis;
350 private final int lockTimeoutRetryCount;
351 private final Property[] properties;
352
353
354
355
356
357
358
359
360 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
361 final int connectionTimeout, final int requestTimeout, final int delayMillis,
362 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
363 this.name = name;
364 this.agents = agents;
365 this.batchSize = batchSize;
366 this.dataDir = dataDir;
367 this.retries = retries;
368 this.connectionTimeout = connectionTimeout;
369 this.requestTimeout = requestTimeout;
370 this.delayMillis = delayMillis;
371 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
372 this.properties = properties;
373 }
374 }
375
376
377
378
379 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
380
381
382
383
384
385
386
387 @Override
388 public FlumePersistentManager createManager(final String name, final FactoryData data) {
389 SecretKey secretKey = null;
390 Database database = null;
391 Environment environment = null;
392
393 final Map<String, String> properties = new HashMap<>();
394 if (data.properties != null) {
395 for (final Property property : data.properties) {
396 properties.put(property.getName(), property.getValue());
397 }
398 }
399
400 try {
401 final File dir = new File(data.dataDir);
402 FileUtils.mkdir(dir, true);
403 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
404 dbEnvConfig.setTransactional(true);
405 dbEnvConfig.setAllowCreate(true);
406 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
407 environment = new Environment(dir, dbEnvConfig);
408 final DatabaseConfig dbConfig = new DatabaseConfig();
409 dbConfig.setTransactional(true);
410 dbConfig.setAllowCreate(true);
411 database = environment.openDatabase(null, name, dbConfig);
412 } catch (final Exception ex) {
413 LOGGER.error("Could not create FlumePersistentManager", ex);
414
415
416
417 if (database != null) {
418 database.close();
419 database = null;
420 }
421 if (environment != null) {
422 environment.close();
423 environment = null;
424 }
425 return null;
426 }
427
428 try {
429 String key = null;
430 for (final Map.Entry<String, String> entry : properties.entrySet()) {
431 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
432 key = entry.getValue();
433 break;
434 }
435 }
436 if (key != null) {
437 final PluginManager manager = new PluginManager("KeyProvider");
438 manager.collectPlugins();
439 final Map<String, PluginType<?>> plugins = manager.getPlugins();
440 if (plugins != null) {
441 boolean found = false;
442 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
443 if (entry.getKey().equalsIgnoreCase(key)) {
444 found = true;
445 final Class<?> cl = entry.getValue().getPluginClass();
446 try {
447 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
448 secretKey = provider.getSecretKey();
449 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
450 } catch (final Exception ex) {
451 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
452 cl.getName());
453 }
454 break;
455 }
456 }
457 if (!found) {
458 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
459 }
460 } else {
461 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
462 }
463 }
464 } catch (final Exception ex) {
465 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
466 }
467 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
468 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
469 data.lockTimeoutRetryCount);
470 }
471 }
472
473
474
475
476 private static class WriterThread extends Log4jThread {
477 private volatile boolean shutdown = false;
478 private final Database database;
479 private final Environment environment;
480 private final FlumePersistentManager manager;
481 private final Gate gate;
482 private final SecretKey secretKey;
483 private final int batchSize;
484 private final AtomicLong dbCounter;
485 private final int lockTimeoutRetryCount;
486
487 public WriterThread(final Database database, final Environment environment,
488 final FlumePersistentManager manager, final Gate gate, final int batchsize,
489 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
490 super("FlumePersistentManager-Writer");
491 this.database = database;
492 this.environment = environment;
493 this.manager = manager;
494 this.gate = gate;
495 this.batchSize = batchsize;
496 this.secretKey = secretKey;
497 this.setDaemon(true);
498 this.dbCounter = dbCount;
499 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
500 }
501
502 public void shutdown() {
503 LOGGER.debug("Writer thread shutting down");
504 this.shutdown = true;
505 gate.open();
506 }
507
508 public boolean isShutdown() {
509 return shutdown;
510 }
511
512 @Override
513 public void run() {
514 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
515 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
516 while (!shutdown) {
517 final long nowMillis = System.currentTimeMillis();
518 final long dbCount = database.count();
519 dbCounter.set(dbCount);
520 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
521 nextBatchMillis = nowMillis + manager.getDelayMillis();
522 try {
523 boolean errors = false;
524 final DatabaseEntry key = new DatabaseEntry();
525 final DatabaseEntry data = new DatabaseEntry();
526
527 gate.close();
528 OperationStatus status;
529 if (batchSize > 1) {
530 try {
531 errors = sendBatch(key, data);
532 } catch (final Exception ex) {
533 break;
534 }
535 } else {
536 Exception exception = null;
537 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
538 exception = null;
539 Transaction txn = null;
540 Cursor cursor = null;
541 try {
542 txn = environment.beginTransaction(null, null);
543 cursor = database.openCursor(txn, null);
544 try {
545 status = cursor.getFirst(key, data, LockMode.RMW);
546 while (status == OperationStatus.SUCCESS) {
547 final SimpleEvent event = createEvent(data);
548 if (event != null) {
549 try {
550 manager.doSend(event);
551 } catch (final Exception ioe) {
552 errors = true;
553 LOGGER.error("Error sending event", ioe);
554 break;
555 }
556 try {
557 cursor.delete();
558 } catch (final Exception ex) {
559 LOGGER.error("Unable to delete event", ex);
560 }
561 }
562 status = cursor.getNext(key, data, LockMode.RMW);
563 }
564 if (cursor != null) {
565 cursor.close();
566 cursor = null;
567 }
568 txn.commit();
569 txn = null;
570 dbCounter.decrementAndGet();
571 exception = null;
572 break;
573 } catch (final LockConflictException lce) {
574 exception = lce;
575
576 } catch (final Exception ex) {
577 LOGGER.error("Error reading or writing to database", ex);
578 shutdown = true;
579 break;
580 } finally {
581 if (cursor != null) {
582 cursor.close();
583 cursor = null;
584 }
585 if (txn != null) {
586 txn.abort();
587 txn = null;
588 }
589 }
590 } catch (final LockConflictException lce) {
591 exception = lce;
592 if (cursor != null) {
593 try {
594 cursor.close();
595 cursor = null;
596 } catch (final Exception ex) {
597 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
598 }
599 }
600 if (txn != null) {
601 try {
602 txn.abort();
603 txn = null;
604 } catch (final Exception ex) {
605 LOGGER.trace("Ignored exception aborting tx during lock conflict.");
606 }
607 }
608 }
609 try {
610 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
611 } catch (final InterruptedException ie) {
612
613 }
614 }
615 if (exception != null) {
616 LOGGER.error("Unable to read or update data base", exception);
617 }
618 }
619 if (errors) {
620 Thread.sleep(manager.getDelayMillis());
621 continue;
622 }
623 } catch (final Exception ex) {
624 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
625 }
626 } else {
627 if (nextBatchMillis <= nowMillis) {
628 nextBatchMillis = nowMillis + manager.getDelayMillis();
629 }
630 try {
631 final long interval = nextBatchMillis - nowMillis;
632 gate.waitForOpen(interval);
633 } catch (final InterruptedException ie) {
634 LOGGER.warn("WriterThread interrupted, continuing");
635 } catch (final Exception ex) {
636 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
637 break;
638 }
639 }
640 }
641
642 if (batchSize > 1 && database.count() > 0) {
643 final DatabaseEntry key = new DatabaseEntry();
644 final DatabaseEntry data = new DatabaseEntry();
645 try {
646 sendBatch(key, data);
647 } catch (final Exception ex) {
648 LOGGER.warn("Unable to write final batch");
649 }
650 }
651 LOGGER.trace("WriterThread exiting");
652 }
653
654 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
655 boolean errors = false;
656 OperationStatus status;
657 Cursor cursor = null;
658 try {
659 final BatchEvent batch = new BatchEvent();
660 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
661 try {
662 cursor = database.openCursor(null, CursorConfig.DEFAULT);
663 status = cursor.getFirst(key, data, null);
664
665 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
666 final SimpleEvent event = createEvent(data);
667 if (event != null) {
668 batch.addEvent(event);
669 }
670 status = cursor.getNext(key, data, null);
671 }
672 break;
673 } catch (final LockConflictException lce) {
674 if (cursor != null) {
675 try {
676 cursor.close();
677 cursor = null;
678 } catch (final Exception ex) {
679 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
680 }
681 }
682 }
683 }
684
685 try {
686 manager.send(batch);
687 } catch (final Exception ioe) {
688 LOGGER.error("Error sending events", ioe);
689 errors = true;
690 }
691 if (!errors) {
692 if (cursor != null) {
693 cursor.close();
694 cursor = null;
695 }
696 Transaction txn = null;
697 Exception exception = null;
698 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
699 try {
700 txn = environment.beginTransaction(null, null);
701 try {
702 for (final Event event : batch.getEvents()) {
703 try {
704 final Map<String, String> headers = event.getHeaders();
705 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
706 database.delete(txn, key);
707 } catch (final Exception ex) {
708 LOGGER.error("Error deleting key from database", ex);
709 }
710 }
711 txn.commit();
712 long count = dbCounter.get();
713 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
714 count = dbCounter.get();
715 }
716 exception = null;
717 break;
718 } catch (final LockConflictException lce) {
719 exception = lce;
720 if (cursor != null) {
721 try {
722 cursor.close();
723 cursor = null;
724 } catch (final Exception ex) {
725 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
726 }
727 }
728 if (txn != null) {
729 try {
730 txn.abort();
731 txn = null;
732 } catch (final Exception ex) {
733 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
734 }
735 }
736 } catch (final Exception ex) {
737 LOGGER.error("Unable to commit transaction", ex);
738 if (txn != null) {
739 txn.abort();
740 }
741 }
742 } catch (final LockConflictException lce) {
743 exception = lce;
744 if (cursor != null) {
745 try {
746 cursor.close();
747 cursor = null;
748 } catch (final Exception ex) {
749 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
750 }
751 }
752 if (txn != null) {
753 try {
754 txn.abort();
755 txn = null;
756 } catch (final Exception ex) {
757 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
758 }
759 }
760 } finally {
761 if (cursor != null) {
762 cursor.close();
763 cursor = null;
764 }
765 if (txn != null) {
766 txn.abort();
767 txn = null;
768 }
769 }
770 try {
771 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
772 } catch (final InterruptedException ie) {
773
774 }
775 }
776 if (exception != null) {
777 LOGGER.error("Unable to delete events from data base", exception);
778 }
779 }
780 } catch (final Exception ex) {
781 LOGGER.error("Error reading database", ex);
782 shutdown = true;
783 throw ex;
784 } finally {
785 if (cursor != null) {
786 cursor.close();
787 }
788 }
789
790 return errors;
791 }
792
793 private SimpleEvent createEvent(final DatabaseEntry data) {
794 final SimpleEvent event = new SimpleEvent();
795 try {
796 byte[] eventData = data.getData();
797 if (secretKey != null) {
798 final Cipher cipher = Cipher.getInstance("AES");
799 cipher.init(Cipher.DECRYPT_MODE, secretKey);
800 eventData = cipher.doFinal(eventData);
801 }
802 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
803 final DataInputStream dais = new DataInputStream(bais);
804 int length = dais.readInt();
805 final byte[] bytes = new byte[length];
806 dais.read(bytes, 0, length);
807 event.setBody(bytes);
808 length = dais.readInt();
809 final Map<String, String> map = new HashMap<>(length);
810 for (int i = 0; i < length; ++i) {
811 final String headerKey = dais.readUTF();
812 final String value = dais.readUTF();
813 map.put(headerKey, value);
814 }
815 event.setHeaders(map);
816 return event;
817 } catch (final Exception ex) {
818 LOGGER.error("Error retrieving event", ex);
819 return null;
820 }
821 }
822
823 }
824
825
826
827
828 private static class Gate {
829
830 private boolean isOpen = false;
831
832 public boolean isOpen() {
833 return isOpen;
834 }
835
836 public synchronized void open() {
837 isOpen = true;
838 notifyAll();
839 }
840
841 public synchronized void close() {
842 isOpen = false;
843 }
844
845 public synchronized void waitForOpen(final long timeout) throws InterruptedException {
846 wait(timeout);
847 }
848 }
849 }