View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.io.Serializable;
20  import java.util.Locale;
21  
22  import org.apache.logging.log4j.core.Filter;
23  import org.apache.logging.log4j.core.Layout;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.appender.AbstractAppender;
26  import org.apache.logging.log4j.core.config.Property;
27  import org.apache.logging.log4j.core.config.plugins.Plugin;
28  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
29  import org.apache.logging.log4j.core.config.plugins.PluginElement;
30  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
31  import org.apache.logging.log4j.core.layout.Rfc5424Layout;
32  import org.apache.logging.log4j.core.net.Facility;
33  import org.apache.logging.log4j.core.util.Booleans;
34  import org.apache.logging.log4j.core.util.Integers;
35  
36  /**
37   * An Appender that uses the Avro protocol to route events to Flume.
38   */
39  @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
40  public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory {
41  
42      private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"};
43      private static final int DEFAULT_MAX_DELAY = 60000;
44  
45      private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5;
46  
47      private final AbstractFlumeManager manager;
48  
49      private final String mdcIncludes;
50      private final String mdcExcludes;
51      private final String mdcRequired;
52  
53      private final String eventPrefix;
54  
55      private final String mdcPrefix;
56  
57      private final boolean compressBody;
58  
59      private final FlumeEventFactory factory;
60  
61      /**
62       * Which Manager will be used by the appender instance.
63       */
64      private enum ManagerType {
65          AVRO, EMBEDDED, PERSISTENT;
66  
67          public static ManagerType getType(final String type) {
68              return valueOf(type.toUpperCase(Locale.US));
69          }
70      }
71  
72      private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout,
73                            final boolean ignoreExceptions, final String includes, final String excludes,
74                            final String required, final String mdcPrefix, final String eventPrefix,
75                            final boolean compress, final FlumeEventFactory factory, final AbstractFlumeManager manager) {
76          super(name, filter, layout, ignoreExceptions);
77          this.manager = manager;
78          this.mdcIncludes = includes;
79          this.mdcExcludes = excludes;
80          this.mdcRequired = required;
81          this.eventPrefix = eventPrefix;
82          this.mdcPrefix = mdcPrefix;
83          this.compressBody = compress;
84          this.factory = factory == null ? this : factory;
85      }
86  
87      /**
88       * Publish the event.
89       * @param event The LogEvent.
90       */
91      @Override
92      public void append(final LogEvent event) {
93          final String name = event.getLoggerName();
94          if (name != null) {
95              for (final String pkg : EXCLUDED_PACKAGES) {
96                  if (name.startsWith(pkg)) {
97                      return;
98                  }
99              }
100         }
101         final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
102             eventPrefix, compressBody);
103         flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
104         manager.send(flumeEvent);
105     }
106 
107     @Override
108     public void stop() {
109         super.stop();
110         manager.release();
111     }
112 
113     /**
114      * Create a Flume event.
115      * @param event The Log4j LogEvent.
116      * @param includes comma separated list of mdc elements to include.
117      * @param excludes comma separated list of mdc elements to exclude.
118      * @param required comma separated list of mdc elements that must be present with a value.
119      * @param mdcPrefix The prefix to add to MDC key names.
120      * @param eventPrefix The prefix to add to event fields.
121      * @param compress If true the body will be compressed.
122      * @return A Flume Event.
123      */
124     @Override
125     public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
126                                   final String required, final String mdcPrefix, final String eventPrefix,
127                                   final boolean compress) {
128         return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
129             eventPrefix, compressBody);
130     }
131 
132     /**
133      * Create a Flume Avro Appender.
134      * @param agents An array of Agents.
135      * @param properties Properties to pass to the embedded agent.
136      * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
137      * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
138      * @param type Avro (default), Embedded, or Persistent.
139      * @param dataDir The directory where the Flume FileChannel should write its data.
140      * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
141      *                          1000.
142      * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
143      * @param agentRetries The number of times to retry an agent before failing to the next agent.
144      * @param maxDelay The maximum number of seconds to wait for a complete batch.
145      * @param name The name of the Appender.
146      * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise
147      *               they are propagated to the caller.
148      * @param excludes A comma separated list of MDC elements to exclude.
149      * @param includes A comma separated list of MDC elements to include.
150      * @param required A comma separated list of MDC elements that are required.
151      * @param mdcPrefix The prefix to add to MDC key names.
152      * @param eventPrefix The prefix to add to event key names.
153      * @param compressBody If true the event body will be compressed.
154      * @param batchSize Number of events to include in a batch. Defaults to 1.
155      * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB.
156      * @param factory The factory to use to create Flume events.
157      * @param layout The layout to format the event.
158      * @param filter A Filter to filter events.
159      *
160      * @return A Flume Avro Appender.
161      */
162     @PluginFactory
163     public static FlumeAppender createAppender(@PluginElement("Agents") Agent[] agents,
164                                                @PluginElement("Properties") final Property[] properties,
165                                                @PluginAttribute("embedded") final String embedded,
166                                                @PluginAttribute("type") final String type,
167                                                @PluginAttribute("dataDir") final String dataDir,
168                                                @PluginAttribute("connectTimeout") final String connectionTimeout,
169                                                @PluginAttribute("requestTimeout") final String requestTimeout,
170                                                @PluginAttribute("agentRetries") final String agentRetries,
171                                                @PluginAttribute("maxDelay") final String maxDelay,
172                                                @PluginAttribute("name") final String name,
173                                                @PluginAttribute("ignoreExceptions") final String ignore,
174                                                @PluginAttribute("mdcExcludes") final String excludes,
175                                                @PluginAttribute("mdcIncludes") final String includes,
176                                                @PluginAttribute("mdcRequired") final String required,
177                                                @PluginAttribute("mdcPrefix") final String mdcPrefix,
178                                                @PluginAttribute("eventPrefix") final String eventPrefix,
179                                                @PluginAttribute("compress") final String compressBody,
180                                                @PluginAttribute("batchSize") final String batchSize,
181                                                @PluginAttribute("lockTimeoutRetries") final String lockTimeoutRetries,
182                                                @PluginElement("FlumeEventFactory") final FlumeEventFactory factory,
183                                                @PluginElement("Layout") Layout<? extends Serializable> layout,
184                                                @PluginElement("Filter") final Filter filter) {
185 
186         final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) :
187             (agents == null || agents.length == 0) && properties != null && properties.length > 0;
188         final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
189         final boolean compress = Booleans.parseBoolean(compressBody, true);
190         ManagerType managerType;
191         if (type != null) {
192             if (embed && embedded != null) {
193                 try {
194                     managerType = ManagerType.getType(type);
195                     LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
196                 } catch (final Exception ex) {
197                     LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type +
198                         " is invalid.");
199                     managerType = ManagerType.EMBEDDED;
200                 }
201             } else {
202                 try {
203                     managerType = ManagerType.getType(type);
204                 } catch (final Exception ex) {
205                     LOGGER.warn("Type " + type + " is invalid.");
206                     managerType = ManagerType.EMBEDDED;
207                 }
208             }
209         }  else if (embed) {
210            managerType = ManagerType.EMBEDDED;
211         }  else {
212            managerType = ManagerType.AVRO;
213         }
214 
215         final int batchCount = Integers.parseInt(batchSize, 1);
216         final int connectTimeout = Integers.parseInt(connectionTimeout, 0);
217         final int reqTimeout = Integers.parseInt(requestTimeout, 0);
218         final int retries = Integers.parseInt(agentRetries, 0);
219         final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT);
220         final int delay = Integers.parseInt(maxDelay, DEFAULT_MAX_DELAY);
221 
222         if (layout == null) {
223             final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER;
224             layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID,
225                     mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null,
226                     null);
227         }
228 
229         if (name == null) {
230             LOGGER.error("No name provided for Appender");
231             return null;
232         }
233 
234         AbstractFlumeManager manager;
235 
236         switch (managerType) {
237             case EMBEDDED:
238                 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
239                 break;
240             case AVRO:
241                 if (agents == null || agents.length == 0) {
242                     LOGGER.debug("No agents provided, using defaults");
243                     agents = new Agent[] {Agent.createAgent(null, null)};
244                 }
245                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
246                 break;
247             case PERSISTENT:
248                 if (agents == null || agents.length == 0) {
249                     LOGGER.debug("No agents provided, using defaults");
250                     agents = new Agent[] {Agent.createAgent(null, null)};
251                 }
252                 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
253                     connectTimeout, reqTimeout, delay, lockTimeoutRetryCount, dataDir);
254                 break;
255             default:
256                 LOGGER.debug("No manager type specified. Defaulting to AVRO");
257                 if (agents == null || agents.length == 0) {
258                     LOGGER.debug("No agents provided, using defaults");
259                     agents = new Agent[] {Agent.createAgent(null, null)};
260                 }
261                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
262         }
263 
264         if (manager == null) {
265             return null;
266         }
267 
268         return new FlumeAppender(name, filter, layout,  ignoreExceptions, includes,
269             excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
270     }
271 }