View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.Properties;
20  
21  import org.apache.flume.Event;
22  import org.apache.flume.api.RpcClient;
23  import org.apache.flume.api.RpcClientFactory;
24  import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
25  import org.apache.logging.log4j.core.appender.ManagerFactory;
26  
27  /**
28   * Manager for FlumeAvroAppenders.
29   */
30  public class FlumeAvroManager extends AbstractFlumeManager {
31  
32      private static final int MAX_RECONNECTS = 3;
33      private static final int MINIMUM_TIMEOUT = 1000;
34  
35      private static AvroManagerFactory factory = new AvroManagerFactory();
36  
37      private final Agent[] agents;
38  
39      private final int batchSize;
40  
41      private final int retries;
42  
43      private final int connectTimeout;
44  
45      private final int requestTimeout;
46  
47      private final int current = 0;
48  
49      private RpcClient rpcClient = null;
50  
51      /**
52       * Constructor
53       * @param name The unique name of this manager.
54       * @param agents An array of Agents.
55       * @param batchSize The number of events to include in a batch.
56       * @param retries The number of times to retry connecting before giving up.
57       * @param connectTimeout The connection timeout in ms.
58       * @param requestTimeout The request timeout in ms.
59       *
60       */
61      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
62                                 final int retries, final int connectTimeout, final int requestTimeout) {
63          super(name);
64          this.agents = agents;
65          this.batchSize = batchSize;
66          this.retries = retries;
67          this.connectTimeout = connectTimeout;
68          this.requestTimeout = requestTimeout;
69          this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
70      }
71  
72      /**
73       * Returns a FlumeAvroManager.
74       * @param name The name of the manager.
75       * @param agents The agents to use.
76       * @param batchSize The number of events to include in a batch.
77       * @param retries The number of times to retry connecting before giving up.
78       * @param connectTimeout The connection timeout in ms.
79       * @param requestTimeout The request timeout in ms.
80       * @return A FlumeAvroManager.
81       */
82      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
83                                                final int retries, final int connectTimeout, final int requestTimeout) {
84          if (agents == null || agents.length == 0) {
85              throw new IllegalArgumentException("At least one agent is required");
86          }
87  
88          if (batchSize <= 0) {
89              batchSize = 1;
90          }
91  
92          final StringBuilder sb = new StringBuilder("FlumeAvro[");
93          boolean first = true;
94          for (final Agent agent : agents) {
95              if (!first) {
96                  sb.append(",");
97              }
98              sb.append(agent.getHost()).append(":").append(agent.getPort());
99              first = false;
100         }
101         sb.append("]");
102         return getManager(sb.toString(), factory,
103                 new FactoryData(name, agents, batchSize, retries, connectTimeout, requestTimeout));
104     }
105 
106     /**
107      * Returns the agents.
108      * @return The agent array.
109      */
110     public Agent[] getAgents() {
111         return agents;
112     }
113 
114     /**
115      * Returns the index of the current agent.
116      * @return The index for the current agent.
117      */
118     public int getCurrent() {
119         return current;
120     }
121 
122     public int getRetries() {
123         return retries;
124     }
125 
126     public int getConnectTimeout() {
127         return connectTimeout;
128     }
129 
130     public int getRequestTimeout() {
131         return requestTimeout;
132     }
133 
134     public synchronized void send(final BatchEvent events) {
135         if (rpcClient == null) {
136             rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
137         }
138 
139         if (rpcClient != null) {
140             try {
141                 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
142                 rpcClient.appendBatch(events.getEvents());
143             } catch (final Exception ex) {
144                 rpcClient.close();
145                 rpcClient = null;
146                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
147                     agents[current].getPort();
148                 LOGGER.warn(msg, ex);
149                 throw new AppenderRuntimeException("No Flume agents are available");
150             }
151         }  else {
152             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
153                 agents[current].getPort();
154             LOGGER.warn(msg);
155             throw new AppenderRuntimeException("No Flume agents are available");
156         }
157     }
158 
159     @Override
160     public synchronized void send(final Event event)  {
161         if (rpcClient == null) {
162             rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
163         }
164 
165         if (rpcClient != null) {
166             try {
167                 rpcClient.append(event);
168             } catch (final Exception ex) {
169                 rpcClient.close();
170                 rpcClient = null;
171                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
172                     agents[current].getPort();
173                 LOGGER.warn(msg, ex);
174                 throw new AppenderRuntimeException("No Flume agents are available");
175             }
176         } else {
177             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
178                 agents[current].getPort();
179             LOGGER.warn(msg);
180             throw new AppenderRuntimeException("No Flume agents are available");
181         }
182     }
183 
184     /**
185      * There is a very good chance that this will always return the first agent even if it isn't available.
186      * @param agents The list of agents to choose from
187      * @return The FlumeEventAvroServer.
188      */
189 
190     private RpcClient connect(final Agent[] agents, int retries, final int connectTimeout, final int requestTimeout) {
191         try {
192             final Properties props = new Properties();
193 
194             props.put("client.type", agents.length > 1 ? "default_failover" : "default");
195 
196             int count = 1;
197             final StringBuilder sb = new StringBuilder();
198             for (final Agent agent : agents) {
199                 if (sb.length() > 0) {
200                     sb.append(" ");
201                 }
202                 final String hostName = "host" + count++;
203                 props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
204                 sb.append(hostName);
205             }
206             props.put("hosts", sb.toString());
207             if (batchSize > 0) {
208                 props.put("batch-size", Integer.toString(batchSize));
209             }
210             if (retries > 1) {
211                 if (retries > MAX_RECONNECTS) {
212                     retries = MAX_RECONNECTS;
213                 }
214                 props.put("max-attempts", Integer.toString(retries * agents.length));
215             }
216             if (requestTimeout >= MINIMUM_TIMEOUT) {
217                 props.put("request-timeout", Integer.toString(requestTimeout));
218             }
219             if (connectTimeout >= MINIMUM_TIMEOUT) {
220                 props.put("connect-timeout", Integer.toString(connectTimeout));
221             }
222             return RpcClientFactory.getInstance(props);
223         } catch (final Exception ex) {
224             LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
225             return null;
226         }
227     }
228 
229     @Override
230     protected void releaseSub() {
231         if (rpcClient != null) {
232             try {
233                 rpcClient.close();
234             } catch (final Exception ex) {
235                 LOGGER.error("Attempt to close RPC client failed", ex);
236             }
237         }
238         rpcClient = null;
239     }
240 
241     /**
242      * Factory data.
243      */
244     private static class FactoryData {
245         private final String name;
246         private final Agent[] agents;
247         private final int batchSize;
248         private final int retries;
249         private final int conntectTimeout;
250         private final int requestTimeout;
251 
252         /**
253          * Constructor.
254          * @param name The name of the Appender.
255          * @param agents The agents.
256          * @param batchSize The number of events to include in a batch.
257          */
258         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
259                            final int connectTimeout, final int requestTimeout) {
260             this.name = name;
261             this.agents = agents;
262             this.batchSize = batchSize;
263             this.retries = retries;
264             this.conntectTimeout = connectTimeout;
265             this.requestTimeout = requestTimeout;
266         }
267     }
268 
269     /**
270      * Avro Manager Factory.
271      */
272     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
273 
274         /**
275          * Create the FlumeAvroManager.
276          * @param name The name of the entity to manage.
277          * @param data The data required to create the entity.
278          * @return The FlumeAvroManager.
279          */
280         @Override
281         public FlumeAvroManager createManager(final String name, final FactoryData data) {
282             try {
283 
284                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
285                     data.conntectTimeout, data.requestTimeout);
286             } catch (final Exception ex) {
287                 LOGGER.error("Could not create FlumeAvroManager", ex);
288             }
289             return null;
290         }
291     }
292 
293 }