View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import com.sleepycat.je.Cursor;
20  import com.sleepycat.je.Database;
21  import com.sleepycat.je.DatabaseConfig;
22  import com.sleepycat.je.DatabaseEntry;
23  import com.sleepycat.je.Environment;
24  import com.sleepycat.je.EnvironmentConfig;
25  import com.sleepycat.je.LockMode;
26  import com.sleepycat.je.OperationStatus;
27  import com.sleepycat.je.StatsConfig;
28  import org.apache.flume.Event;
29  import org.apache.flume.event.SimpleEvent;
30  import org.apache.logging.log4j.LoggingException;
31  import org.apache.logging.log4j.core.appender.ManagerFactory;
32  import org.apache.logging.log4j.core.config.Property;
33  import org.apache.logging.log4j.core.config.plugins.PluginManager;
34  import org.apache.logging.log4j.core.config.plugins.PluginType;
35  import org.apache.logging.log4j.core.helpers.FileUtils;
36  import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
37  
38  import javax.crypto.Cipher;
39  import javax.crypto.SecretKey;
40  import java.io.ByteArrayInputStream;
41  import java.io.ByteArrayOutputStream;
42  import java.io.DataInputStream;
43  import java.io.DataOutputStream;
44  import java.io.File;
45  import java.nio.charset.Charset;
46  import java.util.HashMap;
47  import java.util.Map;
48  import java.util.concurrent.LinkedBlockingQueue;
49  import java.util.concurrent.TimeUnit;
50  
51  /**
52   *
53   */
54  public class FlumePersistentManager extends FlumeAvroManager {
55  
56      public static final String KEY_PROVIDER = "keyProvider";
57  
58      private static final Charset UTF8 = Charset.forName("UTF-8");
59  
60      private static final String SHUTDOWN = "Shutdown";
61  
62      private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
63  
64      private static BDBManagerFactory factory = new BDBManagerFactory();
65  
66      private Database database;
67  
68      private final WriterThread worker;
69  
70      private final LinkedBlockingQueue<byte []> queue = new LinkedBlockingQueue<byte[]>();
71  
72      private final SecretKey secretKey;
73  
74      private final int delay;
75  
76      /**
77       * Constructor
78       * @param name The unique name of this manager.
79       * @param agents An array of Agents.
80       * @param batchSize The number of events to include in a batch.
81       * @param database The database to write to.
82       */
83      protected FlumePersistentManager(final String name, final String shortName, final Agent[] agents,
84                                       final int batchSize, final int retries, final int connectionTimeout,
85                                       final int requestTimeout, final int delay, final Database database,
86                                       SecretKey secretKey) {
87          super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
88          this.delay = delay;
89          this.database = database;
90          this.worker = new WriterThread(database, this, queue, batchSize, secretKey);
91          this.worker.start();
92          this.secretKey = secretKey;
93      }
94  
95  
96      /**
97       * Returns a FlumeAvroManager.
98       * @param name The name of the manager.
99       * @param agents The agents to use.
100      * @param batchSize The number of events to include in a batch.
101      * @return A FlumeAvroManager.
102      */
103     public static FlumePersistentManager getManager(final String name, final Agent[] agents, Property[] properties,
104                                                     int batchSize, final int retries, final int connectionTimeout,
105                                                     final int requestTimeout, final int delay, final String dataDir) {
106         if (agents == null || agents.length == 0) {
107             throw new IllegalArgumentException("At least one agent is required");
108         }
109 
110         if (batchSize <= 0) {
111             batchSize = 1;
112         }
113         String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
114 
115         final StringBuilder sb = new StringBuilder("FlumePersistent[");
116         boolean first = true;
117         for (final Agent agent : agents) {
118             if (!first) {
119                 sb.append(",");
120             }
121             sb.append(agent.getHost()).append(":").append(agent.getPort());
122             first = false;
123         }
124         sb.append("]");
125         sb.append(" ").append(dataDirectory);
126         return getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize, retries,
127             connectionTimeout, requestTimeout, delay, dataDir, properties));
128     }
129 
130     @Override
131     public synchronized void send(final Event event)  {
132         if (worker.isShutdown()) {
133             throw new LoggingException("Unable to record event");
134         }
135 
136         Map<String, String> headers = event.getHeaders();
137         byte[] keyData = headers.get(FlumeEvent.GUID).getBytes(UTF8);
138         try {
139             ByteArrayOutputStream baos = new ByteArrayOutputStream();
140             DataOutputStream daos = new DataOutputStream(baos);
141             daos.writeInt(event.getBody().length);
142             daos.write(event.getBody(), 0, event.getBody().length);
143             daos.writeInt(event.getHeaders().size());
144             for (Map.Entry<String, String> entry : headers.entrySet()) {
145                 daos.writeUTF(entry.getKey());
146                 daos.writeUTF(entry.getValue());
147             }
148             byte[] eventData = baos.toByteArray();
149             if (secretKey != null) {
150                 Cipher cipher = Cipher.getInstance("AES");
151                 cipher.init(Cipher.ENCRYPT_MODE, secretKey);
152                 eventData = cipher.doFinal(eventData);
153             }
154             final DatabaseEntry key = new DatabaseEntry(keyData);
155             final DatabaseEntry data = new DatabaseEntry(eventData);
156             database.put(null, key, data);
157             queue.add(keyData);
158         } catch (Exception ex) {
159             throw new LoggingException("Exception occurred writing log event", ex);
160         }
161     }
162 
163     @Override
164     protected void releaseSub() {
165         LOGGER.debug("Shutting down FlumePersistentManager");
166         worker.shutdown();
167         try {
168             worker.join();
169         } catch (InterruptedException ex) {
170             LOGGER.debug("Interrupted while waiting for worker to complete");
171         }
172         try {
173             LOGGER.debug("FlumePersistenceManager dataset status: {}", database.getStats(new StatsConfig()));
174             database.close();
175         } catch (final Exception ex) {
176             LOGGER.warn("Failed to close database", ex);
177         }
178         super.releaseSub();
179     }
180 
181     private void doSend(final SimpleEvent event) {
182         LOGGER.debug("Sending event to Flume");
183         super.send(event);
184     }
185 
186     /**
187      * Factory data.
188      */
189     private static class FactoryData {
190         private final String name;
191         private final Agent[] agents;
192         private final int batchSize;
193         private final String dataDir;
194         private final int retries;
195         private final int connectionTimeout;
196         private final int requestTimeout;
197         private final int delay;
198         private final Property[] properties;
199 
200         /**
201          * Constructor.
202          * @param name The name of the Appender.
203          * @param agents The agents.
204          * @param batchSize The number of events to include in a batch.
205          * @param dataDir The directory for data.
206          */
207         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
208                            final int connectionTimeout, final int requestTimeout, final int delay,
209                            final String dataDir, final Property[] properties) {
210             this.name = name;
211             this.agents = agents;
212             this.batchSize = batchSize;
213             this.dataDir = dataDir;
214             this.retries = retries;
215             this.connectionTimeout = connectionTimeout;
216             this.requestTimeout = requestTimeout;
217             this.delay = delay;
218             this.properties = properties;
219         }
220     }
221 
222     /**
223      * Avro Manager Factory.
224      */
225     private static class BDBManagerFactory implements ManagerFactory<FlumePersistentManager, FactoryData> {
226 
227         /**
228          * Create the FlumeKratiManager.
229          * @param name The name of the entity to manage.
230          * @param data The data required to create the entity.
231          * @return The FlumeKratiManager.
232          */
233         public FlumePersistentManager createManager(final String name, final FactoryData data) {
234             SecretKey secretKey = null;
235 
236             Database database;
237 
238             Map<String, String> properties = new HashMap<String, String>();
239             if (data.properties != null) {
240                 for (Property property : data.properties) {
241                     properties.put(property.getName(), property.getValue());
242                 }
243             }
244 
245             try {
246 
247                 File dir = new File(data.dataDir);
248                 FileUtils.mkdir(dir, true);
249                 final EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
250                 dbEnvConfig.setTransactional(false);
251                 dbEnvConfig.setAllowCreate(true);
252                 final Environment environment = new Environment(dir, dbEnvConfig);
253                 final DatabaseConfig dbConfig = new DatabaseConfig();
254                 dbConfig.setTransactional(false);
255                 dbConfig.setAllowCreate(true);
256                 database = environment.openDatabase(null, name, dbConfig);
257             } catch (final Exception ex) {
258                 LOGGER.error("Could not create FlumePersistentManager", ex);
259                 return null;
260             }
261 
262             try {
263                 String key = null;
264                 for (Map.Entry<String, String> entry : properties.entrySet()) {
265                     if (entry.getKey().equalsIgnoreCase(KEY_PROVIDER)) {
266                         key = entry.getValue();
267                     }
268                 }
269                 if (key != null) {
270                     final PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
271                     manager.collectPlugins();
272                     final Map<String, PluginType> plugins = manager.getPlugins();
273                     if (plugins != null) {
274                         boolean found = false;
275                         for (Map.Entry<String, PluginType> entry : plugins.entrySet()) {
276                             if (entry.getKey().equalsIgnoreCase(key)) {
277                                 found = true;
278                                 Class cl = entry.getValue().getPluginClass();
279                                 try {
280                                     SecretKeyProvider provider = (SecretKeyProvider) cl.newInstance();
281                                     secretKey = provider.getSecretKey();
282                                     LOGGER.debug("Persisting events using SecretKeyProvider {}", cl.getName());
283                                 } catch (Exception ex) {
284                                     LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled",
285                                         cl.getName());
286                                 }
287                                 break;
288                             }
289                         }
290                         if (!found) {
291                             LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
292                         }
293                     } else {
294                         LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", key);
295                     }
296                 }
297             } catch (Exception ex) {
298                 LOGGER.warn("Error setting up encryption - encryption will be disabled", ex);
299             }
300             return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries,
301                 data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey);
302         }
303     }
304 
305     private static class WriterThread extends Thread  {
306         private volatile boolean shutdown = false;
307         private final Database database;
308         private final FlumePersistentManager manager;
309         private final LinkedBlockingQueue<byte[]> queue;
310         private final SecretKey secretKey;
311         private final int batchSize;
312 
313         public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]> queue,
314                             int batchsize, SecretKey secretKey) {
315             this.database = database;
316             this.manager = manager;
317             this.queue = queue;
318             this.batchSize = batchsize;
319             this.secretKey = secretKey;
320             this.setDaemon(true);
321         }
322 
323         public void shutdown() {
324             LOGGER.debug("Writer thread shutting down");
325             this.shutdown = true;
326             if (queue.size() == 0) {
327                 queue.add(SHUTDOWN.getBytes(UTF8));
328             }
329         }
330 
331         public boolean isShutdown() {
332             return shutdown;
333         }
334 
335         @Override
336         public void run() {
337             LOGGER.trace("WriterThread started");
338             long lastBatch = System.currentTimeMillis();
339             while (!shutdown) {
340                 if (database.count() >= batchSize ||
341                     database.count() > 0 && lastBatch + manager.delay > System.currentTimeMillis()) {
342                     lastBatch = System.currentTimeMillis();
343                     try {
344                         boolean errors = false;
345                         DatabaseEntry key = new DatabaseEntry();
346                         final DatabaseEntry data = new DatabaseEntry();
347                         final Cursor cursor = database.openCursor(null, null);
348                         try {
349                             queue.clear();
350                             OperationStatus status;
351                             try {
352                                 status = cursor.getFirst(key, data, LockMode.RMW);
353                                 if (batchSize > 1) {
354                                     BatchEvent batch = new BatchEvent();
355                                     for (int i = 0; status == OperationStatus.SUCCESS && i < batchSize; ++i) {
356                                         SimpleEvent event = createEvent(data);
357                                         if (event != null) {
358                                             batch.addEvent(event);
359                                         }
360                                         status = cursor.getNext(key, data, LockMode.RMW);
361                                     }
362                                     try {
363                                         manager.send(batch);
364                                     } catch (Exception ioe) {
365                                         LOGGER.error("Error sending events", ioe);
366                                         break;
367                                     }
368                                     for (Event event : batch.getEvents()) {
369                                         try {
370                                             Map<String, String> headers = event.getHeaders();
371                                             key = new DatabaseEntry(headers.get(FlumeEvent.GUID).getBytes(UTF8));
372                                             database.delete(null, key);
373                                         } catch (Exception ex) {
374                                             LOGGER.error("Error deleting key from database", ex);
375                                         }
376                                     }
377                                 } else {
378                                     while (status == OperationStatus.SUCCESS) {
379                                         SimpleEvent event = createEvent(data);
380                                         if (event != null) {
381                                             try {
382                                                 manager.doSend(event);
383                                             } catch (Exception ioe) {
384                                                 errors = true;
385                                                 LOGGER.error("Error sending event", ioe);
386                                                 break;
387                                             }
388                                             if (!errors) {
389                                                 try {
390                                                     cursor.delete();
391                                                 } catch (Exception ex) {
392                                                     LOGGER.error("Unable to delete event", ex);
393                                                 }
394                                             }
395                                         }
396                                         status = cursor.getNext(key, data, LockMode.RMW);
397                                     }
398                                 }
399                             } catch (Exception ex) {
400                                 LOGGER.error("Error reading database", ex);
401                                 shutdown = true;
402                                 break;
403                             }
404 
405                         } finally {
406                             cursor.close();
407                         }
408                         if (errors) {
409                             Thread.sleep(manager.delay);
410                             continue;
411                         }
412                     } catch (Exception ex) {
413                         LOGGER.warn("WriterThread encountered an exception. Continuing.", ex);
414                     }
415                 } else {
416                     try {
417                         if (database.count() >= batchSize) {
418                             continue;
419                         }
420                         queue.poll(manager.delay, TimeUnit.MILLISECONDS);
421                         LOGGER.debug("WriterThread notified of work");
422                     } catch (InterruptedException ie) {
423                         LOGGER.warn("WriterThread interrupted, continuing");
424                     } catch (Exception ex) {
425                         LOGGER.error("WriterThread encountered an exception waiting for work", ex);
426                         break;
427                     }
428                 }
429             }
430             LOGGER.trace("WriterThread exiting");
431         }
432 
433         private SimpleEvent createEvent(DatabaseEntry data) {
434             SimpleEvent event = new SimpleEvent();
435             try {
436                 byte[] eventData = data.getData();
437                 if (secretKey != null) {
438                     Cipher cipher = Cipher.getInstance("AES");
439                     cipher.init(Cipher.DECRYPT_MODE, secretKey);
440                     eventData = cipher.doFinal(eventData);
441                 }
442                 ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
443                 DataInputStream dais = new DataInputStream(bais);
444                 int length = dais.readInt();
445                 byte[] bytes = new byte[length];
446                 dais.read(bytes, 0, length);
447                 event.setBody(bytes);
448                 length = dais.readInt();
449                 Map<String, String> map = new HashMap<String, String>(length);
450                 for (int i = 0; i < length; ++i) {
451                     String headerKey = dais.readUTF();
452                     String value = dais.readUTF();
453                     map.put(headerKey, value);
454                 }
455                 event.setHeaders(map);
456                 return event;
457             } catch (Exception ex) {
458                 LOGGER.error("Error retrieving event", ex);
459                 return null;
460             }
461         }
462 
463     }
464 }