1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.nio.charset.StandardCharsets;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import javax.crypto.Cipher;
38 import javax.crypto.SecretKey;
39
40 import org.apache.flume.Event;
41 import org.apache.flume.event.SimpleEvent;
42 import org.apache.logging.log4j.LoggingException;
43 import org.apache.logging.log4j.core.appender.ManagerFactory;
44 import org.apache.logging.log4j.core.config.Property;
45 import org.apache.logging.log4j.core.config.plugins.util.PluginManager;
46 import org.apache.logging.log4j.core.config.plugins.util.PluginType;
47 import org.apache.logging.log4j.core.util.FileUtils;
48 import org.apache.logging.log4j.core.util.Log4jThread;
49 import org.apache.logging.log4j.core.util.SecretKeyProvider;
50 import org.apache.logging.log4j.util.Strings;
51
52 import com.sleepycat.je.Cursor;
53 import com.sleepycat.je.CursorConfig;
54 import com.sleepycat.je.Database;
55 import com.sleepycat.je.DatabaseConfig;
56 import com.sleepycat.je.DatabaseEntry;
57 import com.sleepycat.je.Environment;
58 import com.sleepycat.je.EnvironmentConfig;
59 import com.sleepycat.je.LockConflictException;
60 import com.sleepycat.je.LockMode;
61 import com.sleepycat.je.OperationStatus;
62 import com.sleepycat.je.StatsConfig;
63 import com.sleepycat.je.Transaction;
64
65
66
67
68 public class FlumePersistentManager extends FlumeAvroManager {
69
70
71 public static final String KEY_PROVIDER = "keyProvider";
72
73 private static final Charset UTF8 = StandardCharsets.UTF_8;
74
75 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
76
77 private static final int SHUTDOWN_WAIT = 60;
78
79 private static final int MILLIS_PER_SECOND = 1000;
80
81 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
82
83 private static BDBManagerFactory factory = new BDBManagerFactory();
84
85 private final Database database;
86
87 private final Environment environment;
88
89 private final WriterThread worker;
90
91 private final Gate gate = new Gate();
92
93 private final SecretKey secretKey;
94
95 private final int lockTimeoutRetryCount;
96
97 private final ExecutorService threadPool;
98
99 private final AtomicLong dbCount = new AtomicLong();
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
117 final int batchSize, final int retries, final int connectionTimeout,
118 final int requestTimeout, final int delay, final Database database,
119 final Environment environment, final SecretKey secretKey,
120 final int lockTimeoutRetryCount) {
121 super(name, shortName, agents, batchSize, delay, retries, connectionTimeout, requestTimeout);
122 this.database = database;
123 this.environment = environment;
124 dbCount.set(database.count());
125 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
126 lockTimeoutRetryCount);
127 this.worker.start();
128 this.secretKey = secretKey;
129 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
130 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
149 final Property[] properties, int batchSize, final int retries,
150 final int connectionTimeout, final int requestTimeout,
151 final int delayMillis, final int lockTimeoutRetryCount,
152 final String dataDir) {
153 if (agents == null || agents.length == 0) {
154 throw new IllegalArgumentException("At least one agent is required");
155 }
156
157 if (batchSize <= 0) {
158 batchSize = 1;
159 }
160 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
161
162 final StringBuilder sb = new StringBuilder("FlumePersistent[");
163 boolean first = true;
164 for (final Agent agent : agents) {
165 if (!first) {
166 sb.append(',');
167 }
168 sb.append(agent.getHost()).append(':').append(agent.getPort());
169 first = false;
170 }
171 sb.append(']');
172 sb.append(' ').append(dataDirectory);
173 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
174 connectionTimeout, requestTimeout, delayMillis, lockTimeoutRetryCount, dataDir, properties));
175 }
176
177 @Override
178 public void send(final Event event) {
179 if (worker.isShutdown()) {
180 throw new LoggingException("Unable to record event");
181 }
182
183 final Map<String, String> headers = event.getHeaders();
184 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
185 try {
186 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
187 final DataOutputStream daos = new DataOutputStream(baos);
188 daos.writeInt(event.getBody().length);
189 daos.write(event.getBody(), 0, event.getBody().length);
190 daos.writeInt(event.getHeaders().size());
191 for (final Map.Entry<String, String> entry : headers.entrySet()) {
192 daos.writeUTF(entry.getKey());
193 daos.writeUTF(entry.getValue());
194 }
195 byte[] eventData = baos.toByteArray();
196 if (secretKey != null) {
197 final Cipher cipher = Cipher.getInstance("AES");
198 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
199 eventData = cipher.doFinal(eventData);
200 }
201 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
202 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
203 boolean interrupted = false;
204 int ieCount = 0;
205 do {
206 try {
207 future.get();
208 } catch (final InterruptedException ie) {
209 interrupted = true;
210 ++ieCount;
211 }
212 } while (interrupted && ieCount <= 1);
213
214 } catch (final Exception ex) {
215 throw new LoggingException("Exception occurred writing log event", ex);
216 }
217 }
218
219 @Override
220 protected void releaseSub() {
221 LOGGER.debug("Shutting down FlumePersistentManager");
222 worker.shutdown();
223 try {
224 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
225 } catch (final InterruptedException ie) {
226
227 }
228 threadPool.shutdown();
229 try {
230 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
231 } catch (final InterruptedException e) {
232 logWarn("PersistentManager Thread pool failed to shut down", e);
233 }
234 try {
235 worker.join();
236 } catch (final InterruptedException ex) {
237 logDebug("interrupted while waiting for worker to complete", ex);
238 }
239 try {
240 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
241 database.close();
242 } catch (final Exception ex) {
243 logWarn("failed to close database", ex);
244 }
245 try {
246 environment.cleanLog();
247 environment.close();
248 } catch (final Exception ex) {
249 logWarn("failed to close environment", ex);
250 }
251 super.releaseSub();
252 }
253
254 private void doSend(final SimpleEvent event) {
255 LOGGER.debug("Sending event to Flume");
256 super.send(event);
257 }
258
259
260
261
262 private static class BDBWriter implements Callable<Integer> {
263 private final byte[] eventData;
264 private final byte[] keyData;
265 private final Environment environment;
266 private final Database database;
267 private final Gate gate;
268 private final AtomicLong dbCount;
269 private final long batchSize;
270 private final int lockTimeoutRetryCount;
271
272 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
273 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
274 final int lockTimeoutRetryCount) {
275 this.keyData = keyData;
276 this.eventData = eventData;
277 this.environment = environment;
278 this.database = database;
279 this.gate = gate;
280 this.dbCount = dbCount;
281 this.batchSize = batchSize;
282 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
283 }
284
285 @Override
286 public Integer call() throws Exception {
287 final DatabaseEntry key = new DatabaseEntry(keyData);
288 final DatabaseEntry data = new DatabaseEntry(eventData);
289 Exception exception = null;
290 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
291 Transaction txn = null;
292 try {
293 txn = environment.beginTransaction(null, null);
294 try {
295 database.put(txn, key, data);
296 txn.commit();
297 txn = null;
298 if (dbCount.incrementAndGet() >= batchSize) {
299 gate.open();
300 }
301 exception = null;
302 break;
303 } catch (final LockConflictException lce) {
304 exception = lce;
305
306 } catch (final Exception ex) {
307 if (txn != null) {
308 txn.abort();
309 }
310 throw ex;
311 } finally {
312 if (txn != null) {
313 txn.abort();
314 txn = null;
315 }
316 }
317 } catch (final LockConflictException lce) {
318 exception = lce;
319 if (txn != null) {
320 try {
321 txn.abort();
322 txn = null;
323 } catch (final Exception ex) {
324 LOGGER.trace("Ignoring exception while aborting transaction during lock conflict.");
325 }
326 }
327
328 }
329 try {
330 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
331 } catch (final InterruptedException ie) {
332
333 }
334 }
335 if (exception != null) {
336 throw exception;
337 }
338 return eventData.length;
339 }
340 }
341
342
343
344
345 private static class FactoryData {
346 private final String name;
347 private final Agent[] agents;
348 private final int batchSize;
349 private final String dataDir;
350 private final int retries;
351 private final int connectionTimeout;
352 private final int requestTimeout;
353 private final int delayMillis;
354 private final int lockTimeoutRetryCount;
355 private final Property[] properties;
356
357
358
359
360
361
362
363
364 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
365 final int connectionTimeout, final int requestTimeout, final int delayMillis,
366 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
367 this.name = name;
368 this.agents = agents;
369 this.batchSize = batchSize;
370 this.dataDir = dataDir;
371 this.retries = retries;
372 this.connectionTimeout = connectionTimeout;
373 this.requestTimeout = requestTimeout;
374 this.delayMillis = delayMillis;
375 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
376 this.properties = properties;
377 }
378 }
379
380
381
382
383 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
384
385
386
387
388
389
390
391 @Override
392 public FlumePersistentManager createManager(final String name, final FactoryData data) {
393 SecretKey secretKey = null;
394 Database database = null;
395 Environment environment = null;
396
397 final Map<String, String> properties = new HashMap<>();
398 if (data.properties != null) {
399 for (final Property property : data.properties) {
400 properties.put(property.getName(), property.getValue());
401 }
402 }
403
404 try {
405 final File dir = new File(data.dataDir);
406 FileUtils.mkdir(dir, true);
407 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
408 dbEnvConfig.setTransactional(true);
409 dbEnvConfig.setAllowCreate(true);
410 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
411 environment = new Environment(dir, dbEnvConfig);
412 final DatabaseConfig dbConfig = new DatabaseConfig();
413 dbConfig.setTransactional(true);
414 dbConfig.setAllowCreate(true);
415 database = environment.openDatabase(null, name, dbConfig);
416 } catch (final Exception ex) {
417 LOGGER.error("Could not create FlumePersistentManager", ex);
418
419
420
421 if (database != null) {
422 database.close();
423 database = null;
424 }
425 if (environment != null) {
426 environment.close();
427 environment = null;
428 }
429 return null;
430 }
431
432 try {
433 String key = null;
434 for (final Map.Entry<String, String> entry : properties.entrySet()) {
435 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
436 key = entry.getValue();
437 break;
438 }
439 }
440 if (key != null) {
441 final PluginManager manager = new PluginManager("KeyProvider");
442 manager.collectPlugins();
443 final Map<String, PluginType<?>> plugins = manager.getPlugins();
444 if (plugins != null) {
445 boolean found = false;
446 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
447 if (entry.getKey().equalsIgnoreCase(key)) {
448 found = true;
449 final Class<?> cl = entry.getValue().getPluginClass();
450 try {
451 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
452 secretKey = provider.getSecretKey();
453 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
454 } catch (final Exception ex) {
455 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
456 cl.getName());
457 }
458 break;
459 }
460 }
461 if (!found) {
462 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
463 }
464 } else {
465 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
466 }
467 }
468 } catch (final Exception ex) {
469 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
470 }
471 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
472 data.connectionTimeout, data.requestTimeout, data.delayMillis, database, environment, secretKey,
473 data.lockTimeoutRetryCount);
474 }
475 }
476
477
478
479
480 private static class WriterThread extends Thread {
481 private volatile boolean shutdown = false;
482 private final Database database;
483 private final Environment environment;
484 private final FlumePersistentManager manager;
485 private final Gate gate;
486 private final SecretKey secretKey;
487 private final int batchSize;
488 private final AtomicLong dbCounter;
489 private final int lockTimeoutRetryCount;
490
491 public WriterThread(final Database database, final Environment environment,
492 final FlumePersistentManager manager, final Gate gate, final int batchsize,
493 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
494 this.database = database;
495 this.environment = environment;
496 this.manager = manager;
497 this.gate = gate;
498 this.batchSize = batchsize;
499 this.secretKey = secretKey;
500 this.setDaemon(true);
501 this.dbCounter = dbCount;
502 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
503 }
504
505 public void shutdown() {
506 LOGGER.debug("Writer thread shutting down");
507 this.shutdown = true;
508 gate.open();
509 }
510
511 public boolean isShutdown() {
512 return shutdown;
513 }
514
515 @Override
516 public void run() {
517 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delayMillis = " + manager.getDelayMillis());
518 long nextBatchMillis = System.currentTimeMillis() + manager.getDelayMillis();
519 while (!shutdown) {
520 final long nowMillis = System.currentTimeMillis();
521 final long dbCount = database.count();
522 dbCounter.set(dbCount);
523 if (dbCount >= batchSize || dbCount > 0 && nextBatchMillis <= nowMillis) {
524 nextBatchMillis = nowMillis + manager.getDelayMillis();
525 try {
526 boolean errors = false;
527 final DatabaseEntry key = new DatabaseEntry();
528 final DatabaseEntry data = new DatabaseEntry();
529
530 gate.close();
531 OperationStatus status;
532 if (batchSize > 1) {
533 try {
534 errors = sendBatch(key, data);
535 } catch (final Exception ex) {
536 break;
537 }
538 } else {
539 Exception exception = null;
540 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
541 exception = null;
542 Transaction txn = null;
543 Cursor cursor = null;
544 try {
545 txn = environment.beginTransaction(null, null);
546 cursor = database.openCursor(txn, null);
547 try {
548 status = cursor.getFirst(key, data, LockMode.RMW);
549 while (status == OperationStatus.SUCCESS) {
550 final SimpleEvent event = createEvent(data);
551 if (event != null) {
552 try {
553 manager.doSend(event);
554 } catch (final Exception ioe) {
555 errors = true;
556 LOGGER.error("Error sending event", ioe);
557 break;
558 }
559 try {
560 cursor.delete();
561 } catch (final Exception ex) {
562 LOGGER.error("Unable to delete event", ex);
563 }
564 }
565 status = cursor.getNext(key, data, LockMode.RMW);
566 }
567 if (cursor != null) {
568 cursor.close();
569 cursor = null;
570 }
571 txn.commit();
572 txn = null;
573 dbCounter.decrementAndGet();
574 exception = null;
575 break;
576 } catch (final LockConflictException lce) {
577 exception = lce;
578
579 } catch (final Exception ex) {
580 LOGGER.error("Error reading or writing to database", ex);
581 shutdown = true;
582 break;
583 } finally {
584 if (cursor != null) {
585 cursor.close();
586 cursor = null;
587 }
588 if (txn != null) {
589 txn.abort();
590 txn = null;
591 }
592 }
593 } catch (final LockConflictException lce) {
594 exception = lce;
595 if (cursor != null) {
596 try {
597 cursor.close();
598 cursor = null;
599 } catch (final Exception ex) {
600 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
601 }
602 }
603 if (txn != null) {
604 try {
605 txn.abort();
606 txn = null;
607 } catch (final Exception ex) {
608 LOGGER.trace("Ignored exception aborting tx during lock conflict.");
609 }
610 }
611 }
612 try {
613 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
614 } catch (final InterruptedException ie) {
615
616 }
617 }
618 if (exception != null) {
619 LOGGER.error("Unable to read or update data base", exception);
620 }
621 }
622 if (errors) {
623 Thread.sleep(manager.getDelayMillis());
624 continue;
625 }
626 } catch (final Exception ex) {
627 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
628 }
629 } else {
630 if (nextBatchMillis <= nowMillis) {
631 nextBatchMillis = nowMillis + manager.getDelayMillis();
632 }
633 try {
634 final long interval = nextBatchMillis - nowMillis;
635 gate.waitForOpen(interval);
636 } catch (final InterruptedException ie) {
637 LOGGER.warn("WriterThread interrupted, continuing");
638 } catch (final Exception ex) {
639 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
640 break;
641 }
642 }
643 }
644
645 if (batchSize > 1 && database.count() > 0) {
646 final DatabaseEntry key = new DatabaseEntry();
647 final DatabaseEntry data = new DatabaseEntry();
648 try {
649 sendBatch(key, data);
650 } catch (final Exception ex) {
651 LOGGER.warn("Unable to write final batch");
652 }
653 }
654 LOGGER.trace("WriterThread exiting");
655 }
656
657 private boolean sendBatch(DatabaseEntry key, final DatabaseEntry data) throws Exception {
658 boolean errors = false;
659 OperationStatus status;
660 Cursor cursor = null;
661 try {
662 final BatchEvent batch = new BatchEvent();
663 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
664 try {
665 cursor = database.openCursor(null, CursorConfig.DEFAULT);
666 status = cursor.getFirst(key, data, null);
667
668 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
669 final SimpleEvent event = createEvent(data);
670 if (event != null) {
671 batch.addEvent(event);
672 }
673 status = cursor.getNext(key, data, null);
674 }
675 break;
676 } catch (final LockConflictException lce) {
677 if (cursor != null) {
678 try {
679 cursor.close();
680 cursor = null;
681 } catch (final Exception ex) {
682 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
683 }
684 }
685 }
686 }
687
688 try {
689 manager.send(batch);
690 } catch (final Exception ioe) {
691 LOGGER.error("Error sending events", ioe);
692 errors = true;
693 }
694 if (!errors) {
695 if (cursor != null) {
696 cursor.close();
697 cursor = null;
698 }
699 Transaction txn = null;
700 Exception exception = null;
701 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
702 try {
703 txn = environment.beginTransaction(null, null);
704 try {
705 for (final Event event : batch.getEvents()) {
706 try {
707 final Map<String, String> headers = event.getHeaders();
708 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
709 database.delete(txn, key);
710 } catch (final Exception ex) {
711 LOGGER.error("Error deleting key from database", ex);
712 }
713 }
714 txn.commit();
715 long count = dbCounter.get();
716 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
717 count = dbCounter.get();
718 }
719 exception = null;
720 break;
721 } catch (final LockConflictException lce) {
722 exception = lce;
723 if (cursor != null) {
724 try {
725 cursor.close();
726 cursor = null;
727 } catch (final Exception ex) {
728 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
729 }
730 }
731 if (txn != null) {
732 try {
733 txn.abort();
734 txn = null;
735 } catch (final Exception ex) {
736 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
737 }
738 }
739 } catch (final Exception ex) {
740 LOGGER.error("Unable to commit transaction", ex);
741 if (txn != null) {
742 txn.abort();
743 }
744 }
745 } catch (final LockConflictException lce) {
746 exception = lce;
747 if (cursor != null) {
748 try {
749 cursor.close();
750 cursor = null;
751 } catch (final Exception ex) {
752 LOGGER.trace("Ignored exception closing cursor during lock conflict.");
753 }
754 }
755 if (txn != null) {
756 try {
757 txn.abort();
758 txn = null;
759 } catch (final Exception ex) {
760 LOGGER.trace("Ignored exception aborting transaction during lock conflict.");
761 }
762 }
763 } finally {
764 if (cursor != null) {
765 cursor.close();
766 cursor = null;
767 }
768 if (txn != null) {
769 txn.abort();
770 txn = null;
771 }
772 }
773 try {
774 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
775 } catch (final InterruptedException ie) {
776
777 }
778 }
779 if (exception != null) {
780 LOGGER.error("Unable to delete events from data base", exception);
781 }
782 }
783 } catch (final Exception ex) {
784 LOGGER.error("Error reading database", ex);
785 shutdown = true;
786 throw ex;
787 } finally {
788 if (cursor != null) {
789 cursor.close();
790 }
791 }
792
793 return errors;
794 }
795
796 private SimpleEvent createEvent(final DatabaseEntry data) {
797 final SimpleEvent event = new SimpleEvent();
798 try {
799 byte[] eventData = data.getData();
800 if (secretKey != null) {
801 final Cipher cipher = Cipher.getInstance("AES");
802 cipher.init(Cipher.DECRYPT_MODE, secretKey);
803 eventData = cipher.doFinal(eventData);
804 }
805 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
806 final DataInputStream dais = new DataInputStream(bais);
807 int length = dais.readInt();
808 final byte[] bytes = new byte[length];
809 dais.read(bytes, 0, length);
810 event.setBody(bytes);
811 length = dais.readInt();
812 final Map<String, String> map = new HashMap<>(length);
813 for (int i = 0; i < length; ++i) {
814 final String headerKey = dais.readUTF();
815 final String value = dais.readUTF();
816 map.put(headerKey, value);
817 }
818 event.setHeaders(map);
819 return event;
820 } catch (final Exception ex) {
821 LOGGER.error("Error retrieving event", ex);
822 return null;
823 }
824 }
825
826 }
827
828
829
830
831 private static class DaemonThreadFactory implements ThreadFactory {
832 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
833 private final ThreadGroup group;
834 private final AtomicInteger threadNumber = new AtomicInteger(1);
835 private final String namePrefix;
836
837 public DaemonThreadFactory() {
838 final SecurityManager securityManager = System.getSecurityManager();
839 group = securityManager != null ? securityManager.getThreadGroup() :
840 Thread.currentThread().getThreadGroup();
841 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
842 }
843
844 @Override
845 public Thread newThread(final Runnable r) {
846 final Thread thread = new Log4jThread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
847 thread.setDaemon(true);
848 if (thread.getPriority() != Thread.NORM_PRIORITY) {
849 thread.setPriority(Thread.NORM_PRIORITY);
850 }
851 return thread;
852 }
853 }
854
855
856
857
858 private static class Gate {
859
860 private boolean isOpen = false;
861
862 public boolean isOpen() {
863 return isOpen;
864 }
865
866 public synchronized void open() {
867 isOpen = true;
868 notifyAll();
869 }
870
871 public synchronized void close() {
872 isOpen = false;
873 }
874
875 public synchronized void waitForOpen(final long timeout) throws InterruptedException {
876 wait(timeout);
877 }
878 }
879 }