View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.HashMap;
20  import java.util.Locale;
21  import java.util.Map;
22  
23  import org.apache.flume.Event;
24  import org.apache.flume.EventDeliveryException;
25  import org.apache.flume.agent.embedded.EmbeddedAgent;
26  import org.apache.logging.log4j.LoggingException;
27  import org.apache.logging.log4j.core.appender.ManagerFactory;
28  import org.apache.logging.log4j.core.config.ConfigurationException;
29  import org.apache.logging.log4j.core.config.Property;
30  import org.apache.logging.log4j.core.util.NameUtil;
31  import org.apache.logging.log4j.util.PropertiesUtil;
32  import org.apache.logging.log4j.util.Strings;
33  
34  /**
35   *
36   */
37  public class FlumeEmbeddedManager extends AbstractFlumeManager {
38  
39      private static final String FILE_SEP = PropertiesUtil.getProperties().getStringProperty("file.separator");
40  
41      private static final String IN_MEMORY = "InMemory";
42  
43      private static FlumeManagerFactory factory = new FlumeManagerFactory();
44  
45      private final EmbeddedAgent agent;
46  
47      private final String shortName;
48  
49  
50      /**
51       * Constructor
52       * @param name The unique name of this manager.
53       * @param shortName The short version of the agent name.
54       * @param agent The embedded agent.
55       */
56      protected FlumeEmbeddedManager(final String name, final String shortName, final EmbeddedAgent agent) {
57          super(name);
58          this.agent = agent;
59          this.shortName = shortName;
60      }
61  
62      /**
63       * Returns a FlumeEmbeddedManager.
64       * @param name The name of the manager.
65       * @param agents The agents to use.
66       * @param properties Properties for the embedded manager.
67       * @param batchSize The number of events to include in a batch.
68       * @param dataDir The directory where the Flume FileChannel should write to.
69       * @return A FlumeAvroManager.
70       */
71      public static FlumeEmbeddedManager getManager(final String name, final Agent[] agents, final Property[] properties,
72                                                    int batchSize, final String dataDir) {
73  
74          if (batchSize <= 0) {
75              batchSize = 1;
76          }
77  
78          if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
79              throw new IllegalArgumentException("Either an Agent or properties are required");
80          } else if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
81              throw new IllegalArgumentException("Cannot configure both Agents and Properties.");
82          }
83  
84          final StringBuilder sb = new StringBuilder();
85          boolean first = true;
86  
87          if (agents != null && agents.length > 0) {
88              sb.append(name).append('[');
89              for (final Agent agent : agents) {
90                  if (!first) {
91                      sb.append('_');
92                  }
93                  sb.append(agent.getHost()).append('-').append(agent.getPort());
94                  first = false;
95              }
96              sb.append(']');
97          } else {
98              String sep = Strings.EMPTY;
99              sb.append(name).append('-');
100             final StringBuilder props = new StringBuilder();
101             for (final Property prop : properties) {
102                 props.append(sep);
103                 props.append(prop.getName()).append('=').append(prop.getValue());
104                 sep = "_";
105             }
106             sb.append(NameUtil.md5(props.toString()));
107         }
108         return getManager(sb.toString(), factory,
109                 new FactoryData(name, agents, properties, batchSize, dataDir));
110     }
111 
112     @Override
113     public void send(final Event event) {
114         try {
115             agent.put(event);
116         } catch (final EventDeliveryException ex) {
117             throw new LoggingException("Unable to deliver event to Flume Appender " + shortName, ex);
118         }
119     }
120 
121     @Override
122     protected void releaseSub() {
123         agent.stop();
124     }
125 
126     /**
127      * Factory data.
128      */
129     private static class FactoryData {
130         private final Agent[] agents;
131         private final Property[] properties;
132         private final int batchSize;
133         private final String dataDir;
134         private final String name;
135 
136         /**
137          * Constructor.
138          * @param name The name of the Appender.
139          * @param agents The agents.
140          * @param properties The Flume configuration properties.
141          * @param batchSize The number of events to include in a batch.
142          * @param dataDir The directory where Flume should write to.
143          */
144         public FactoryData(final String name, final Agent[] agents, final Property[] properties, final int batchSize,
145                            final String dataDir) {
146             this.name = name;
147             this.agents = agents;
148             this.batchSize = batchSize;
149             this.properties = properties;
150             this.dataDir = dataDir;
151         }
152     }
153 
154     /**
155      * Avro Manager Factory.
156      */
157     private static class FlumeManagerFactory implements ManagerFactory<FlumeEmbeddedManager, FactoryData> {
158 
159         /**
160          * Create the FlumeAvroManager.
161          * @param name The name of the entity to manage.
162          * @param data The data required to create the entity.
163          * @return The FlumeAvroManager.
164          */
165         @Override
166         public FlumeEmbeddedManager createManager(final String name, final FactoryData data) {
167             try {
168                 final Map<String, String> props = createProperties(data.name, data.agents, data.properties,
169                     data.batchSize, data.dataDir);
170                 final EmbeddedAgent agent = new EmbeddedAgent(name);
171                 agent.configure(props);
172                 agent.start();
173                 LOGGER.debug("Created Agent " + name);
174                 return new FlumeEmbeddedManager(name, data.name, agent);
175             } catch (final Exception ex) {
176                 LOGGER.error("Could not create FlumeEmbeddedManager", ex);
177             }
178             return null;
179         }
180 
181         private Map<String, String> createProperties(final String name, final Agent[] agents,
182                                                      final Property[] properties, final int batchSize, String dataDir) {
183             final Map<String, String> props = new HashMap<>();
184 
185             if ((agents == null || agents.length == 0) && (properties == null || properties.length == 0)) {
186                 LOGGER.error("No Flume configuration provided");
187                 throw new ConfigurationException("No Flume configuration provided");
188             }
189 
190             if (agents != null && agents.length > 0 && properties != null && properties.length > 0) {
191                 LOGGER.error("Agents and Flume configuration cannot both be specified");
192                 throw new ConfigurationException("Agents and Flume configuration cannot both be specified");
193             }
194 
195             if (agents != null && agents.length > 0) {
196 
197                 if (Strings.isNotEmpty(dataDir)) {
198                     if (dataDir.equals(IN_MEMORY)) {
199                         props.put("channel.type", "memory");
200                     } else {
201                         props.put("channel.type", "file");
202 
203                         if (!dataDir.endsWith(FILE_SEP)) {
204                             dataDir = dataDir + FILE_SEP;
205                         }
206 
207                         props.put("channel.checkpointDir", dataDir + "checkpoint");
208                         props.put("channel.dataDirs", dataDir + "data");
209                     }
210 
211                 } else {
212                     props.put("channel.type", "file");
213                 }
214 
215                 final StringBuilder sb = new StringBuilder();
216                 String leading = Strings.EMPTY;
217                 final int priority = agents.length;
218                 for (int i = 0; i < priority; ++i) {
219                     sb.append(leading).append("agent").append(i);
220                     leading = " ";
221                     final String prefix = "agent" + i;
222                     props.put(prefix + ".type", "avro");
223                     props.put(prefix + ".hostname", agents[i].getHost());
224                     props.put(prefix + ".port", Integer.toString(agents[i].getPort()));
225                     props.put(prefix + ".batch-size", Integer.toString(batchSize));
226                     props.put("processor.priority." + prefix, Integer.toString(agents.length - i));
227                 }
228                 props.put("sinks", sb.toString());
229                 props.put("processor.type", "failover");
230             } else {
231                 String[] sinks = null;
232 
233                 for (final Property property : properties) {
234                     final String key = property.getName();
235 
236                     if (Strings.isEmpty(key)) {
237                         final String msg = "A property name must be provided";
238                         LOGGER.error(msg);
239                         throw new ConfigurationException(msg);
240                     }
241 
242                     final String upperKey = key.toUpperCase(Locale.ENGLISH);
243 
244                     if (upperKey.startsWith(name.toUpperCase(Locale.ENGLISH))) {
245                         final String msg =
246                             "Specification of the agent name is not allowed in Flume Appender configuration: " + key;
247                         LOGGER.error(msg);
248                         throw new ConfigurationException(msg);
249                     }
250 
251                     final String value = property.getValue();
252                     if (Strings.isEmpty(value)) {
253                         final String msg = "A value for property " + key + " must be provided";
254                         LOGGER.error(msg);
255                         throw new ConfigurationException(msg);
256                     }
257 
258                     if (upperKey.equals("SINKS")) {
259                         sinks = value.trim().split(" ");
260                     }
261 
262                     props.put(key, value);
263                 }
264 
265                 if (sinks == null || sinks.length == 0) {
266                     final String msg = "At least one Sink must be specified";
267                     LOGGER.error(msg);
268                     throw new ConfigurationException(msg);
269                 }
270             }
271             return props;
272         }
273 
274     }
275 
276 }