1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.File;
24 import java.nio.charset.Charset;
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.ThreadFactory;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35 import javax.crypto.Cipher;
36 import javax.crypto.SecretKey;
37
38 import com.sleepycat.je.LockConflictException;
39 import org.apache.flume.Event;
40 import org.apache.flume.event.SimpleEvent;
41 import org.apache.logging.log4j.LoggingException;
42 import org.apache.logging.log4j.core.appender.ManagerFactory;
43 import org.apache.logging.log4j.core.config.Property;
44 import org.apache.logging.log4j.core.config.plugins.PluginManager;
45 import org.apache.logging.log4j.core.config.plugins.PluginType;
46 import org.apache.logging.log4j.core.helpers.FileUtils;
47 import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
48 import org.apache.logging.log4j.core.helpers.Strings;
49
50 import com.sleepycat.je.Cursor;
51 import com.sleepycat.je.CursorConfig;
52 import com.sleepycat.je.Database;
53 import com.sleepycat.je.DatabaseConfig;
54 import com.sleepycat.je.DatabaseEntry;
55 import com.sleepycat.je.Environment;
56 import com.sleepycat.je.EnvironmentConfig;
57 import com.sleepycat.je.LockMode;
58 import com.sleepycat.je.OperationStatus;
59 import com.sleepycat.je.StatsConfig;
60 import com.sleepycat.je.Transaction;
61
62
63
64
65 public class FlumePersistentManager extends FlumeAvroManager {
66
67
68 public static final String KEY_PROVIDER = "keyProvider";
69
70 private static final Charset UTF8 = Charset.forName("UTF-8");
71
72 private static final String SHUTDOWN = "Shutdown";
73
74 private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
75
76 private static final int SHUTDOWN_WAIT = 60;
77
78 private static final int MILLIS_PER_SECOND = 1000;
79
80 private static final int LOCK_TIMEOUT_SLEEP_MILLIS = 500;
81
82 private static BDBManagerFactory factory = new BDBManagerFactory();
83
84 private final Database database;
85
86 private final Environment environment;
87
88 private final WriterThread worker;
89
90 private final Gate gate = new Gate();
91
92 private final SecretKey secretKey;
93
94 private final int delay;
95
96 private final int lockTimeoutRetryCount;
97
98 private final ExecutorService threadPool;
99
100 private AtomicLong dbCount = new AtomicLong();
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117 protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
118 final int batchSize, final int retries, final int connectionTimeout,
119 final int requestTimeout, final int delay, final Database database,
120 final Environment environment, final SecretKey secretKey,
121 final int lockTimeoutRetryCount) {
122 super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
123 this.delay = delay;
124 this.database = database;
125 this.environment = environment;
126 dbCount.set(database.count());
127 this.worker = new WriterThread(database, environment, this, gate, batchSize, secretKey, dbCount,
128 lockTimeoutRetryCount);
129 this.worker.start();
130 this.secretKey = secretKey;
131 this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
132 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public static FlumePersistentManager getManager(final String name, final Agent[] agents,
150 final Property[] properties, int batchSize, final int retries,
151 final int connectionTimeout, final int requestTimeout,
152 final int delay, final int lockTimeoutRetryCount,
153 final String dataDir) {
154 if (agents == null || agents.length == 0) {
155 throw new IllegalArgumentException("At least one agent is required");
156 }
157
158 if (batchSize <= 0) {
159 batchSize = 1;
160 }
161 final String dataDirectory = Strings.isEmpty(dataDir) ? DEFAULT_DATA_DIR : dataDir;
162
163 final StringBuilder sb = new StringBuilder("FlumePersistent[");
164 boolean first = true;
165 for (final Agent agent : agents) {
166 if (!first) {
167 sb.append(",");
168 }
169 sb.append(agent.getHost()).append(":").append(agent.getPort());
170 first = false;
171 }
172 sb.append("]");
173 sb.append(" ").append(dataDirectory);
174 return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
175 connectionTimeout, requestTimeout, delay, lockTimeoutRetryCount, dataDir, properties));
176 }
177
178 @Override
179 public void send(final Event event) {
180 if (worker.isShutdown()) {
181 throw new LoggingException("Unable to record event");
182 }
183
184 final Map<String, String> headers = event.getHeaders();
185 final byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
186 try {
187 final ByteArrayOutputStream baos = new ByteArrayOutputStream();
188 final DataOutputStream daos = new DataOutputStream(baos);
189 daos.writeInt(event.getBody().length);
190 daos.write(event.getBody(), 0, event.getBody().length);
191 daos.writeInt(event.getHeaders().size());
192 for (final Map.Entry<String, String> entry : headers.entrySet()) {
193 daos.writeUTF(entry.getKey());
194 daos.writeUTF(entry.getValue());
195 }
196 byte[] eventData = baos.toByteArray();
197 if (secretKey != null) {
198 final Cipher cipher = Cipher.getInstance("AES");
199 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
200 eventData = cipher.doFinal(eventData);
201 }
202 final Future<Integer> future = threadPool.submit(new BDBWriter(keyData, eventData, environment, database,
203 gate, dbCount, getBatchSize(), lockTimeoutRetryCount));
204 boolean interrupted = false;
205 int count = 0;
206 do {
207 try {
208 future.get();
209 } catch (final InterruptedException ie) {
210 interrupted = true;
211 ++count;
212 }
213 } while (interrupted && count <= 1);
214
215 } catch (final Exception ex) {
216 throw new LoggingException("Exception occurred writing log event", ex);
217 }
218 }
219
220 @Override
221 protected void releaseSub() {
222 LOGGER.debug("Shutting down FlumePersistentManager");
223 worker.shutdown();
224 try {
225 worker.join(SHUTDOWN_WAIT * MILLIS_PER_SECOND);
226 } catch(InterruptedException ie) {
227
228 }
229 threadPool.shutdown();
230 try {
231 threadPool.awaitTermination(SHUTDOWN_WAIT, TimeUnit.SECONDS);
232 } catch (final InterruptedException ie) {
233 LOGGER.warn("PersistentManager Thread pool failed to shut down");
234 }
235 try {
236 worker.join();
237 } catch (final InterruptedException ex) {
238 LOGGER.debug("Interrupted while waiting for worker to complete");
239 }
240 try {
241 LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
242 database.close();
243 } catch (final Exception ex) {
244 LOGGER.warn("Failed to close database", ex);
245 }
246 try {
247 environment.cleanLog();
248 environment.close();
249 } catch (final Exception ex) {
250 LOGGER.warn("Failed to close environment", ex);
251 }
252 super.releaseSub();
253 }
254
255 private void doSend(final SimpleEvent event) {
256 LOGGER.debug("Sending event to Flume");
257 super.send(event);
258 }
259
260
261
262
263 private static class BDBWriter implements Callable<Integer> {
264 private final byte[] eventData;
265 private final byte[] keyData;
266 private final Environment environment;
267 private final Database database;
268 private final Gate gate;
269 private final AtomicLong dbCount;
270 private final long batchSize;
271 private final int lockTimeoutRetryCount;
272
273 public BDBWriter(final byte[] keyData, final byte[] eventData, final Environment environment,
274 final Database database, final Gate gate, final AtomicLong dbCount, final long batchSize,
275 final int lockTimeoutRetryCount) {
276 this.keyData = keyData;
277 this.eventData = eventData;
278 this.environment = environment;
279 this.database = database;
280 this.gate = gate;
281 this.dbCount = dbCount;
282 this.batchSize = batchSize;
283 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
284 }
285
286 @Override
287 public Integer call() throws Exception {
288 final DatabaseEntry key = new DatabaseEntry(keyData);
289 final DatabaseEntry data = new DatabaseEntry(eventData);
290 Exception exception = null;
291 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
292 Transaction txn = null;
293 try {
294 txn = environment.beginTransaction(null, null);
295 try {
296 database.put(txn, key, data);
297 txn.commit();
298 txn = null;
299 if (dbCount.incrementAndGet() >= batchSize) {
300 gate.open();
301 }
302 exception = null;
303 break;
304 } catch (final LockConflictException lce) {
305 exception = lce;
306
307 } catch (final Exception ex) {
308 if (txn != null) {
309 txn.abort();
310 }
311 throw ex;
312 } finally {
313 if (txn != null) {
314 txn.abort();
315 txn = null;
316 }
317 }
318 } catch (LockConflictException lce) {
319 exception = lce;
320 if (txn != null) {
321 try {
322 txn.abort();
323 txn = null;
324 } catch (Exception ex) {
325
326 }
327 }
328
329 }
330 try {
331 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
332 } catch (InterruptedException ie) {
333
334 }
335 }
336 if (exception != null) {
337 throw exception;
338 }
339 return eventData.length;
340 }
341 }
342
343
344
345
346 private static class FactoryData {
347 private final String name;
348 private final Agent[] agents;
349 private final int batchSize;
350 private final String dataDir;
351 private final int retries;
352 private final int connectionTimeout;
353 private final int requestTimeout;
354 private final int delay;
355 private final int lockTimeoutRetryCount;
356 private final Property[] properties;
357
358
359
360
361
362
363
364
365 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
366 final int connectionTimeout, final int requestTimeout, final int delay,
367 final int lockTimeoutRetryCount, final String dataDir, final Property[] properties) {
368 this.name = name;
369 this.agents = agents;
370 this.batchSize = batchSize;
371 this.dataDir = dataDir;
372 this.retries = retries;
373 this.connectionTimeout = connectionTimeout;
374 this.requestTimeout = requestTimeout;
375 this.delay = delay;
376 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
377 this.properties = properties;
378 }
379 }
380
381
382
383
384 private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
385
386
387
388
389
390
391
392 @Override
393 public FlumePersistentManager createManager(final String name, final FactoryData data) {
394 SecretKey secretKey = null;
395
396 Database database;
397 Environment environment;
398
399 final Map<String, String> properties = new HashMap<String, String>();
400 if (data.properties != null) {
401 for (final Property property : data.properties) {
402 properties.put(property.getName(), property.getValue());
403 }
404 }
405
406 try {
407
408 final File dir = new File(data.dataDir);
409 FileUtils.mkdir(dir, true);
410 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
411 dbEnvConfig.setTransactional(true);
412 dbEnvConfig.setAllowCreate(true);
413 dbEnvConfig.setLockTimeout(5, TimeUnit.SECONDS);
414 environment = new Environment(dir, dbEnvConfig);
415 final DatabaseConfig dbConfig = new DatabaseConfig();
416 dbConfig.setTransactional(true);
417 dbConfig.setAllowCreate(true);
418 database = environment.openDatabase(null, name, dbConfig);
419 } catch (final Exception ex) {
420 LOGGER.error("Could not create FlumePersistentManager", ex);
421 return null;
422 }
423
424 try {
425 String key = null;
426 for (final Map.Entry<String, String> entry : properties.entrySet()) {
427 if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
428 key = entry.getValue();
429 break;
430 }
431 }
432 if (key != null) {
433 final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
434 manager.collectPlugins();
435 final Map<String, PluginType<?>> plugins = manager.getPlugins();
436 if (plugins != null) {
437 boolean found = false;
438 for (final Map.Entry<String, PluginType<?>> entry : plugins.entrySet()) {
439 if (entry.getKey().equalsIgnoreCase(key)) {
440 found = true;
441 final Class<?> cl = entry.getValue().getPluginClass();
442 try {
443 final SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
444 secretKey = provider.getSecretKey();
445 LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
446 } catch (final Exception ex) {
447 LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
448 cl.getName());
449 }
450 break;
451 }
452 }
453 if (!found) {
454 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
455 }
456 } else {
457 LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
458 }
459 }
460 } catch (final Exception ex) {
461 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
462 }
463 return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
464 data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey,
465 data.lockTimeoutRetryCount);
466 }
467 }
468
469
470
471
472 private static class WriterThread extends Thread {
473 private volatile boolean shutdown = false;
474 private final Database database;
475 private final Environment environment;
476 private final FlumePersistentManager manager;
477 private final Gate gate;
478 private final SecretKey secretKey;
479 private final int batchSize;
480 private final AtomicLong dbCounter;
481 private final int lockTimeoutRetryCount;
482
483 public WriterThread(final Database database, final Environment environment,
484 final FlumePersistentManager manager, final Gate gate, final int batchsize,
485 final SecretKey secretKey, final AtomicLong dbCount, final int lockTimeoutRetryCount) {
486 this.database = database;
487 this.environment = environment;
488 this.manager = manager;
489 this.gate = gate;
490 this.batchSize = batchsize;
491 this.secretKey = secretKey;
492 this.setDaemon(true);
493 this.dbCounter = dbCount;
494 this.lockTimeoutRetryCount = lockTimeoutRetryCount;
495 }
496
497 public void shutdown() {
498 LOGGER.debug("Writer thread shutting down");
499 this.shutdown = true;
500 gate.open();
501 }
502
503 public boolean isShutdown() {
504 return shutdown;
505 }
506
507 @Override
508 public void run() {
509 LOGGER.trace("WriterThread started - batch size = " + batchSize + ", delay = " + manager.delay);
510 long nextBatch = System.currentTimeMillis() + manager.delay;
511 while (!shutdown) {
512 long now = System.currentTimeMillis();
513 long dbCount = database.count();
514 dbCounter.set(dbCount);
515 if (dbCount >= batchSize || (dbCount > 0 && nextBatch <= now)) {
516 nextBatch = now + manager.delay;
517 try {
518 boolean errors = false;
519 DatabaseEntry key = new DatabaseEntry();
520 final DatabaseEntry data = new DatabaseEntry();
521
522 gate.close();
523 OperationStatus status;
524 if (batchSize > 1) {
525 try {
526 errors = sendBatch(key, data);
527 } catch (final Exception ex) {
528 break;
529 }
530 } else {
531 Exception exception = null;
532 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
533 exception = null;
534 Transaction txn = null;
535 Cursor cursor = null;
536 try {
537 txn = environment.beginTransaction(null, null);
538 cursor = database.openCursor(txn, null);
539 try {
540 status = cursor.getFirst(key, data, LockMode.RMW);
541 while (status == OperationStatus.SUCCESS) {
542 final SimpleEvent event = createEvent(data);
543 if (event != null) {
544 try {
545 manager.doSend(event);
546 } catch (final Exception ioe) {
547 errors = true;
548 LOGGER.error("Error sending event", ioe);
549 break;
550 }
551 try {
552 cursor.delete();
553 } catch (final Exception ex) {
554 LOGGER.error("Unable to delete event", ex);
555 }
556 }
557 status = cursor.getNext(key, data, LockMode.RMW);
558 }
559 if (cursor != null) {
560 cursor.close();
561 cursor = null;
562 }
563 txn.commit();
564 txn = null;
565 dbCounter.decrementAndGet();
566 exception = null;
567 break;
568 } catch (final LockConflictException lce) {
569 exception = lce;
570
571 } catch (final Exception ex) {
572 LOGGER.error("Error reading or writing to database", ex);
573 shutdown = true;
574 break;
575 } finally {
576 if (cursor != null) {
577 cursor.close();
578 cursor = null;
579 }
580 if (txn != null) {
581 txn.abort();
582 txn = null;
583 }
584 }
585 } catch (LockConflictException lce) {
586 exception = lce;
587 if (cursor != null) {
588 try {
589 cursor.close();
590 cursor = null;
591 } catch (Exception ex) {
592
593 }
594 }
595 if (txn != null) {
596 try {
597 txn.abort();
598 txn = null;
599 } catch (Exception ex) {
600
601 }
602 }
603 }
604 try {
605 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
606 } catch (InterruptedException ie) {
607
608 }
609 }
610 if (exception != null) {
611 LOGGER.error("Unable to read or update data base", exception);
612 }
613 }
614 if (errors) {
615 Thread.sleep(manager.delay);
616 continue;
617 }
618 } catch (final Exception ex) {
619 LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
620 }
621 } else {
622 if (nextBatch <= now) {
623 nextBatch = now + manager.delay;
624 }
625 try {
626 final long interval = nextBatch - now;
627 gate.waitForOpen(interval);
628 } catch (final InterruptedException ie) {
629 LOGGER.warn("WriterThread interrupted, continuing");
630 } catch (final Exception ex) {
631 LOGGER.error("WriterThread encountered an exception waiting for work", ex);
632 break;
633 }
634 }
635 }
636
637 if (batchSize > 1 && database.count() > 0) {
638 DatabaseEntry key = new DatabaseEntry();
639 final DatabaseEntry data = new DatabaseEntry();
640 try {
641 sendBatch(key, data);
642 } catch (final Exception ex) {
643 LOGGER.warn("Unable to write final batch");
644 }
645 }
646 LOGGER.trace("WriterThread exiting");
647 }
648
649 private boolean sendBatch(DatabaseEntry key, DatabaseEntry data) throws Exception {
650 boolean errors = false;
651 OperationStatus status;
652 Cursor cursor = database.openCursor(null, CursorConfig.DEFAULT);
653 try {
654 status = cursor.getFirst(key, data, null);
655
656 final BatchEvent batch = new BatchEvent();
657 for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
658 final SimpleEvent event = createEvent(data);
659 if (event != null) {
660 batch.addEvent(event);
661 }
662 status = cursor.getNext(key, data, null);
663 }
664 try {
665 manager.send(batch);
666 } catch (final Exception ioe) {
667 LOGGER.error("Error sending events", ioe);
668 errors = true;
669 }
670 if (!errors) {
671 cursor.close();
672 cursor = null;
673 Transaction txn = null;
674 Exception exception = null;
675 for (int retryIndex = 0; retryIndex < lockTimeoutRetryCount; ++retryIndex) {
676 try {
677 txn = environment.beginTransaction(null, null);
678 try {
679 for (final Event event : batch.getEvents()) {
680 try {
681 final Map<String, String> headers = event.getHeaders();
682 key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
683 database.delete(txn, key);
684 } catch (final Exception ex) {
685 LOGGER.error("Error deleting key from database", ex);
686 }
687 }
688 txn.commit();
689 long count = dbCounter.get();
690 while (!dbCounter.compareAndSet(count, count - batch.getEvents().size())) {
691 count = dbCounter.get();
692 }
693 exception = null;
694 break;
695 } catch (final LockConflictException lce) {
696 exception = lce;
697 if (cursor != null) {
698 try {
699 cursor.close();
700 cursor = null;
701 } catch (Exception ex) {
702
703 }
704 }
705 if (txn != null) {
706 try {
707 txn.abort();
708 txn = null;
709 } catch (Exception ex) {
710
711 }
712 }
713 } catch (final Exception ex) {
714 LOGGER.error("Unable to commit transaction", ex);
715 if (txn != null) {
716 txn.abort();
717 }
718 }
719 } catch (LockConflictException lce) {
720 exception = lce;
721 if (cursor != null) {
722 try {
723 cursor.close();
724 cursor = null;
725 } catch (Exception ex) {
726
727 }
728 }
729 if (txn != null) {
730 try {
731 txn.abort();
732 txn = null;
733 } catch (Exception ex) {
734
735 }
736 }
737 } finally {
738 if (cursor != null) {
739 cursor.close();
740 cursor = null;
741 }
742 if (txn != null) {
743 txn.abort();
744 txn = null;
745 }
746 }
747 try {
748 Thread.sleep(LOCK_TIMEOUT_SLEEP_MILLIS);
749 } catch (InterruptedException ie) {
750
751 }
752 }
753 if (exception != null) {
754 LOGGER.error("Unable to delete events from data base", exception);
755 }
756 }
757 } catch (final Exception ex) {
758 LOGGER.error("Error reading database", ex);
759 shutdown = true;
760 throw ex;
761 } finally {
762 if (cursor != null) {
763 cursor.close();
764 }
765 }
766
767 return errors;
768 }
769
770 private SimpleEvent createEvent(final DatabaseEntry data) {
771 final SimpleEvent event = new SimpleEvent();
772 try {
773 byte[] eventData = data.getData();
774 if (secretKey != null) {
775 final Cipher cipher = Cipher.getInstance("AES");
776 cipher.init(Cipher.DECRYPT_MODE, secretKey);
777 eventData = cipher.doFinal(eventData);
778 }
779 final ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
780 final DataInputStream dais = new DataInputStream(bais);
781 int length = dais.readInt();
782 final byte[] bytes = new byte[length];
783 dais.read(bytes, 0, length);
784 event.setBody(bytes);
785 length = dais.readInt();
786 final Map<String, String> map = new HashMap<String, String>(length);
787 for (int i = 0; i < length; ++i) {
788 final String headerKey = dais.readUTF();
789 final String value = dais.readUTF();
790 map.put(headerKey, value);
791 }
792 event.setHeaders(map);
793 return event;
794 } catch (final Exception ex) {
795 LOGGER.error("Error retrieving event", ex);
796 return null;
797 }
798 }
799
800 }
801
802
803
804
805 private static class DaemonThreadFactory implements ThreadFactory {
806 private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
807 private final ThreadGroup group;
808 private final AtomicInteger threadNumber = new AtomicInteger(1);
809 private final String namePrefix;
810
811 public DaemonThreadFactory() {
812 final SecurityManager securityManager = System.getSecurityManager();
813 group = (securityManager != null) ? securityManager.getThreadGroup() :
814 Thread.currentThread().getThreadGroup();
815 namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
816 }
817
818 @Override
819 public Thread newThread(final Runnable r) {
820 final Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
821 thread.setDaemon(true);
822 if (thread.getPriority() != Thread.NORM_PRIORITY) {
823 thread.setPriority(Thread.NORM_PRIORITY);
824 }
825 return thread;
826 }
827 }
828
829 private static class Gate {
830
831 private boolean isOpen = false;
832
833 public boolean isOpen() {
834 return isOpen;
835 }
836
837 public synchronized void open() {
838 isOpen = true;
839 notifyAll();
840 }
841
842 public synchronized void close() {
843 isOpen = false;
844 }
845
846 public synchronized void waitForOpen(long timeout) throws InterruptedException {
847 wait(timeout);
848 }
849 }
850 }