View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.logging.log4j.core.Filter;
20  import org.apache.logging.log4j.core.Layout;
21  import org.apache.logging.log4j.core.LogEvent;
22  import org.apache.logging.log4j.core.appender.AbstractAppender;
23  import org.apache.logging.log4j.core.config.Property;
24  import org.apache.logging.log4j.core.config.plugins.Plugin;
25  import org.apache.logging.log4j.core.config.plugins.PluginAttr;
26  import org.apache.logging.log4j.core.config.plugins.PluginElement;
27  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
28  import org.apache.logging.log4j.core.layout.RFC5424Layout;
29  
30  import java.io.Serializable;
31  import java.util.Locale;
32  
33  /**
34   * An Appender that uses the Avro protocol to route events to Flume.
35   */
36  @Plugin(name = "Flume", category = "Core", elementType = "appender", printObject = true)
37  public final class FlumeAppender<T extends Serializable> extends AbstractAppender<T> implements FlumeEventFactory {
38  
39      private final AbstractFlumeManager manager;
40  
41      private final String mdcIncludes;
42      private final String mdcExcludes;
43      private final String mdcRequired;
44  
45      private final String eventPrefix;
46  
47      private final String mdcPrefix;
48  
49      private final boolean compressBody;
50  
51      private final FlumeEventFactory factory;
52  
53      private enum ManagerType {
54          AVRO, EMBEDDED, PERSISTENT;
55  
56          public static ManagerType getType(String type) {
57              return valueOf(type.toUpperCase(Locale.US));
58          }
59      }
60  
61      private FlumeAppender(final String name, final Filter filter, final Layout<T> layout, final boolean handleException,
62                            final String includes, final String excludes, final String required, final String mdcPrefix,
63                            final String eventPrefix, final boolean compress,
64                            final FlumeEventFactory factory, final AbstractFlumeManager manager) {
65          super(name, filter, layout, handleException);
66          this.manager = manager;
67          this.mdcIncludes = includes;
68          this.mdcExcludes = excludes;
69          this.mdcRequired = required;
70          this.eventPrefix = eventPrefix;
71          this.mdcPrefix = mdcPrefix;
72          this.compressBody = compress;
73          this.factory = factory == null ? this : factory;
74      }
75  
76      /**
77       * Publish the event.
78       * @param event The LogEvent.
79       */
80      public void append(final LogEvent event) {
81  
82          final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
83              eventPrefix, compressBody);
84          flumeEvent.setBody(getLayout().toByteArray(flumeEvent));
85          manager.send(flumeEvent);
86      }
87  
88      @Override
89      public void stop() {
90          super.stop();
91          manager.release();
92      }
93  
94      /**
95       * Create a Flume event.
96       * @param event The Log4j LogEvent.
97       * @param includes comma separated list of mdc elements to include.
98       * @param excludes comma separated list of mdc elements to exclude.
99       * @param required comma separated list of mdc elements that must be present with a value.
100      * @param mdcPrefix The prefix to add to MDC key names.
101      * @param eventPrefix The prefix to add to event fields.
102      * @param compress If true the body will be compressed.
103      * @return A Flume Event.
104      */
105     public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes,
106                                   final String required, final String mdcPrefix, final String eventPrefix,
107                                   final boolean compress) {
108         return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
109             eventPrefix, compressBody);
110     }
111 
112     /**
113      * Create a Flume Avro Appender.
114      * @param agents An array of Agents.
115      * @param properties Properties to pass to the embedded agent.
116      * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used.
117      * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i>
118      * @param type Avro (default), Embedded, or Persistent.
119      * @param dataDir The directory where the Flume FileChannel should write its data.
120      * @param connectionTimeout The amount of time in milliseconds to wait before a connection times out. Minimum is
121      *                          1000.
122      * @param requestTimeout The amount of time in milliseconds to wait before a request times out. Minimum is 1000.
123      * @param agentRetries The number of times to retry an agent before failing to the next agent.
124      * @param maxDelay The maximum number of seconds to wait for a complete batch.
125      * @param name The name of the Appender.
126      * @param suppress If true exceptions will be handled in the appender.
127      * @param excludes A comma separated list of MDC elements to exclude.
128      * @param includes A comma separated list of MDC elements to include.
129      * @param required A comma separated list of MDC elements that are required.
130      * @param mdcPrefix The prefix to add to MDC key names.
131      * @param eventPrefix The prefix to add to event key names.
132      * @param compressBody If true the event body will be compressed.
133      * @param batchSize Number of events to include in a batch. Defaults to 1.
134      * @param factory The factory to use to create Flume events.
135      * @param layout The layout to format the event.
136      * @param filter A Filter to filter events.
137      * @return A Flume Avro Appender.
138      */
139     @PluginFactory
140     public static <S extends Serializable> FlumeAppender<S> createAppender(@PluginElement("agents") Agent[] agents,
141                                                    @PluginElement("properties") final Property[] properties,
142                                                    @PluginAttr("embedded") final String embedded,
143                                                    @PluginAttr("type") final String type,
144                                                    @PluginAttr("dataDir") final String dataDir,
145                                                    @PluginAttr("connectTimeout") final String connectionTimeout,
146                                                    @PluginAttr("requestTimeout") final String requestTimeout,
147                                                    @PluginAttr("agentRetries") final String agentRetries,
148                                                    @PluginAttr("maxDelay") final String maxDelay,
149                                                    @PluginAttr("name") final String name,
150                                                    @PluginAttr("suppressExceptions") final String suppress,
151                                                    @PluginAttr("mdcExcludes") final String excludes,
152                                                    @PluginAttr("mdcIncludes") final String includes,
153                                                    @PluginAttr("mdcRequired") final String required,
154                                                    @PluginAttr("mdcPrefix") final String mdcPrefix,
155                                                    @PluginAttr("eventPrefix") final String eventPrefix,
156                                                    @PluginAttr("compress") final String compressBody,
157                                                    @PluginAttr("batchSize") final String batchSize,
158                                                    @PluginElement("flumeEventFactory") final FlumeEventFactory factory,
159                                                    @PluginElement("layout") Layout<S> layout,
160                                                    @PluginElement("filters") final Filter filter) {
161 
162         final boolean embed = embedded != null ? Boolean.valueOf(embedded) :
163             (agents == null || agents.length == 0) && properties != null && properties.length > 0;
164         final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
165         final boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
166         ManagerType managerType;
167         if (type != null) {
168             if (embed && embedded != null) {
169                 try {
170                     managerType = ManagerType.getType(type);
171                     LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type);
172                 } catch (Exception ex) {
173                     LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + " is invalid.");
174                     managerType = ManagerType.EMBEDDED;
175                 }
176             } else {
177                 try {
178                     managerType = ManagerType.getType(type);
179                 } catch (Exception ex) {
180                     LOGGER.warn("Type " + type + " is invalid.");
181                     managerType = ManagerType.EMBEDDED;
182                 }
183             }
184         }  else if (embed) {
185            managerType = ManagerType.EMBEDDED;
186         }  else {
187            managerType = ManagerType.AVRO;
188         }
189 
190         final int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
191         final int connectTimeout = connectionTimeout == null ? 0 : Integer.parseInt(connectionTimeout);
192         final int reqTimeout = requestTimeout == null ? 0 : Integer.parseInt(requestTimeout);
193         final int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
194         final int delay = maxDelay == null ? 60000 : Integer.parseInt(maxDelay);
195 
196         if (layout == null) {
197             @SuppressWarnings({"unchecked", "UnnecessaryLocalVariable"})
198             Layout<S> l = (Layout<S>)RFC5424Layout.createLayout(null, null, null, "True", null, mdcPrefix, eventPrefix,
199                     null, null, null, excludes, includes, required, null, null, null, null);
200             layout = l;
201         }
202 
203         if (name == null) {
204             LOGGER.error("No name provided for Appender");
205             return null;
206         }
207 
208         AbstractFlumeManager manager;
209 
210         switch (managerType) {
211             case EMBEDDED:
212                 manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir);
213                 break;
214             case AVRO:
215                 if (agents == null || agents.length == 0) {
216                     LOGGER.debug("No agents provided, using defaults");
217                     agents = new Agent[] {Agent.createAgent(null, null)};
218                 }
219                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
220                 break;
221             case PERSISTENT:
222                 if (agents == null || agents.length == 0) {
223                     LOGGER.debug("No agents provided, using defaults");
224                     agents = new Agent[] {Agent.createAgent(null, null)};
225                 }
226                 manager = FlumePersistentManager.getManager(name, agents, properties, batchCount, retries,
227                     connectTimeout, reqTimeout, delay, dataDir);
228                 break;
229             default:
230                 LOGGER.debug("No manager type specified. Defaulting to AVRO");
231                 if (agents == null || agents.length == 0) {
232                     LOGGER.debug("No agents provided, using defaults");
233                     agents = new Agent[] {Agent.createAgent(null, null)};
234                 }
235                 manager = FlumeAvroManager.getManager(name, agents, batchCount, retries, connectTimeout, reqTimeout);
236         }
237 
238         if (manager == null) {
239             return null;
240         }
241 
242         return new FlumeAppender<S>(name, filter, layout,  handleExceptions, includes,
243             excludes, required, mdcPrefix, eventPrefix, compress, factory, manager);
244     }
245 }