View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.Properties;
20  
21  import org.apache.flume.Event;
22  import org.apache.flume.api.RpcClient;
23  import org.apache.flume.api.RpcClientFactory;
24  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
25  import org.apache.logging.log4j.core.appender.ManagerFactory;
26  
27  /**
28   * Manager for FlumeAvroAppenders.
29   */
30  public class FlumeAvroManager extends AbstractFlumeManager {
31  
32      private static final int MAX_RECONNECTS = 3;
33      private static final int MINIMUM_TIMEOUT = 1000;
34  
35      private static AvroManagerFactory factory = new AvroManagerFactory();
36  
37      private final Agent[] agents;
38  
39      private final int batchSize;
40  
41      private final int retries;
42  
43      private final int connectTimeout;
44  
45      private final int requestTimeout;
46  
47      private final int current = 0;
48  
49      private RpcClient rpcClient = null;
50  
51      /**
52       * Constructor
53       * @param name The unique name of this manager.
54       * @param agents An array of Agents.
55       * @param batchSize The number of events to include in a batch.
56       * @param retries The number of times to retry connecting before giving up.
57       * @param connectTimeout The connection timeout in ms.
58       * @param requestTimeout The request timeout in ms.
59       *
60       */
61      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
62                                 final int retries, final int connectTimeout, final int requestTimeout) {
63          super(name);
64          this.agents = agents;
65          this.batchSize = batchSize;
66          this.retries = retries;
67          this.connectTimeout = connectTimeout;
68          this.requestTimeout = requestTimeout;
69          this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
70      }
71  
72      /**
73       * Returns a FlumeAvroManager.
74       * @param name The name of the manager.
75       * @param agents The agents to use.
76       * @param batchSize The number of events to include in a batch.
77       * @param retries The number of times to retry connecting before giving up.
78       * @param connectTimeout The connection timeout in ms.
79       * @param requestTimeout The request timeout in ms.
80       * @return A FlumeAvroManager.
81       */
82      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
83                                                final int retries, final int connectTimeout, final int requestTimeout) {
84          if (agents == null || agents.length == 0) {
85              throw new IllegalArgumentException("At least one agent is required");
86          }
87  
88          if (batchSize <= 0) {
89              batchSize = 1;
90          }
91  
92          final StringBuilder sb = new StringBuilder("FlumeAvro[");
93          boolean first = true;
94          for (final Agent agent : agents) {
95              if (!first) {
96                  sb.append(',');
97              }
98              sb.append(agent.getHost()).append(':').append(agent.getPort());
99              first = false;
100         }
101         sb.append(']');
102         return getManager(sb.toString(), factory,
103                 new FactoryData(name, agents, batchSize, retries, connectTimeout, requestTimeout));
104     }
105 
106     /**
107      * Returns the agents.
108      * @return The agent array.
109      */
110     public Agent[] getAgents() {
111         return agents;
112     }
113 
114     /**
115      * Returns the index of the current agent.
116      * @return The index for the current agent.
117      */
118     public int getCurrent() {
119         return current;
120     }
121 
122     public int getRetries() {
123         return retries;
124     }
125 
126     public int getConnectTimeout() {
127         return connectTimeout;
128     }
129 
130     public int getRequestTimeout() {
131         return requestTimeout;
132     }
133 
134     public int getBatchSize() {
135         return batchSize;
136     }
137 
138     public synchronized void send(final BatchEvent events) {
139         if (rpcClient == null) {
140             rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
141         }
142 
143         if (rpcClient != null) {
144             try {
145                 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
146                 rpcClient.appendBatch(events.getEvents());
147             } catch (final Exception ex) {
148                 rpcClient.close();
149                 rpcClient = null;
150                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
151                     agents[current].getPort();
152                 LOGGER.warn(msg, ex);
153                 throw new AppenderLoggingException("No Flume agents are available");
154             }
155         }  else {
156             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
157                 agents[current].getPort();
158             LOGGER.warn(msg);
159             throw new AppenderLoggingException("No Flume agents are available");
160         }
161     }
162 
163     @Override
164     public synchronized void send(final Event event)  {
165         if (rpcClient == null) {
166             rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
167         }
168 
169         if (rpcClient != null) {
170             try {
171                 rpcClient.append(event);
172             } catch (final Exception ex) {
173                 rpcClient.close();
174                 rpcClient = null;
175                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
176                     agents[current].getPort();
177                 LOGGER.warn(msg, ex);
178                 throw new AppenderLoggingException("No Flume agents are available");
179             }
180         } else {
181             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
182                 agents[current].getPort();
183             LOGGER.warn(msg);
184             throw new AppenderLoggingException("No Flume agents are available");
185         }
186     }
187 
188     /**
189      * There is a very good chance that this will always return the first agent even if it isn't available.
190      * @param agents The list of agents to choose from
191      * @return The FlumeEventAvroServer.
192      */
193 
194     private RpcClient connect(final Agent[] agents, int retries, final int connectTimeout, final int requestTimeout) {
195         try {
196             final Properties props = new Properties();
197 
198             props.put("client.type", "default_failover");
199 
200             int count = 1;
201             final StringBuilder sb = new StringBuilder();
202             for (final Agent agent : agents) {
203                 if (sb.length() > 0) {
204                     sb.append(' ');
205                 }
206                 final String hostName = "host" + count++;
207                 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
208                 sb.append(hostName);
209             }
210             props.put("hosts", sb.toString());
211             if (batchSize > 0) {
212                 props.put("batch-size", Integer.toString(batchSize));
213             }
214             if (retries > 1) {
215                 if (retries > MAX_RECONNECTS) {
216                     retries = MAX_RECONNECTS;
217                 }
218                 props.put("max-attempts", Integer.toString(retries * agents.length));
219             }
220             if (requestTimeout >= MINIMUM_TIMEOUT) {
221                 props.put("request-timeout", Integer.toString(requestTimeout));
222             }
223             if (connectTimeout >= MINIMUM_TIMEOUT) {
224                 props.put("connect-timeout", Integer.toString(connectTimeout));
225             }
226             return RpcClientFactory.getInstance(props);
227         } catch (final Exception ex) {
228             LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
229             return null;
230         }
231     }
232 
233     @Override
234     protected void releaseSub() {
235         if (rpcClient != null) {
236             try {
237                 rpcClient.close();
238             } catch (final Exception ex) {
239                 LOGGER.error("Attempt to close RPC client failed", ex);
240             }
241         }
242         rpcClient = null;
243     }
244 
245     /**
246      * Factory data.
247      */
248     private static class FactoryData {
249         private final String name;
250         private final Agent[] agents;
251         private final int batchSize;
252         private final int retries;
253         private final int conntectTimeout;
254         private final int requestTimeout;
255 
256         /**
257          * Constructor.
258          * @param name The name of the Appender.
259          * @param agents The agents.
260          * @param batchSize The number of events to include in a batch.
261          */
262         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
263                            final int connectTimeout, final int requestTimeout) {
264             this.name = name;
265             this.agents = agents;
266             this.batchSize = batchSize;
267             this.retries = retries;
268             this.conntectTimeout = connectTimeout;
269             this.requestTimeout = requestTimeout;
270         }
271     }
272 
273     /**
274      * Avro Manager Factory.
275      */
276     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
277 
278         /**
279          * Create the FlumeAvroManager.
280          * @param name The name of the entity to manage.
281          * @param data The data required to create the entity.
282          * @return The FlumeAvroManager.
283          */
284         @Override
285         public FlumeAvroManager createManager(final String name, final FactoryData data) {
286             try {
287 
288                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
289                     data.conntectTimeout, data.requestTimeout);
290             } catch (final Exception ex) {
291                 LOGGER.error("Could not create FlumeAvroManager", ex);
292             }
293             return null;
294         }
295     }
296 
297 }