View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.Properties;
20  
21  import org.apache.flume.Event;
22  import org.apache.flume.api.RpcClient;
23  import org.apache.flume.api.RpcClientFactory;
24  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
25  import org.apache.logging.log4j.core.appender.ManagerFactory;
26  
27  /**
28   * Manager for FlumeAvroAppenders.
29   */
30  public class FlumeAvroManager extends AbstractFlumeManager {
31  
32      private static final int MAX_RECONNECTS = 3;
33      private static final int MINIMUM_TIMEOUT = 1000;
34  
35      private static AvroManagerFactory factory = new AvroManagerFactory();
36  
37      private final Agent[] agents;
38  
39      private final int batchSize;
40  
41      private final long delayNanos;
42      private final int delayMillis;
43  
44      private final int retries;
45  
46      private final int connectTimeoutMillis;
47  
48      private final int requestTimeoutMillis;
49  
50      private final int current = 0;
51  
52      private RpcClient rpcClient = null;
53  
54      private BatchEvent batchEvent = new BatchEvent();
55      private long nextSend = 0;
56  
57      /**
58       * Constructor
59       * @param name The unique name of this manager.
60       * @param agents An array of Agents.
61       * @param batchSize The number of events to include in a batch.
62       * @param retries The number of times to retry connecting before giving up.
63       * @param connectTimeout The connection timeout in ms.
64       * @param requestTimeout The request timeout in ms.
65       *
66       */
67      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
68                                 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
69          super(name);
70          this.agents = agents;
71          this.batchSize = batchSize;
72          this.delayMillis = delayMillis;
73          this.delayNanos = delayMillis * 1000000;
74          this.retries = retries;
75          this.connectTimeoutMillis = connectTimeout;
76          this.requestTimeoutMillis = requestTimeout;
77          this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
78      }
79  
80      /**
81       * Returns a FlumeAvroManager.
82       * @param name The name of the manager.
83       * @param agents The agents to use.
84       * @param batchSize The number of events to include in a batch.
85       * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
86       * @param retries The number of times to retry connecting before giving up.
87       * @param connectTimeoutMillis The connection timeout in ms.
88       * @param requestTimeoutMillis The request timeout in ms.
89       * @return A FlumeAvroManager.
90       */
91      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
92                                                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
93          if (agents == null || agents.length == 0) {
94              throw new IllegalArgumentException("At least one agent is required");
95          }
96  
97          if (batchSize <= 0) {
98              batchSize = 1;
99          }
100 
101         final StringBuilder sb = new StringBuilder("FlumeAvro[");
102         boolean first = true;
103         for (final Agent agent : agents) {
104             if (!first) {
105                 sb.append(',');
106             }
107             sb.append(agent.getHost()).append(':').append(agent.getPort());
108             first = false;
109         }
110         sb.append(']');
111         return getManager(sb.toString(), factory,
112                 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
113     }
114 
115     /**
116      * Returns the agents.
117      * @return The agent array.
118      */
119     public Agent[] getAgents() {
120         return agents;
121     }
122 
123     /**
124      * Returns the index of the current agent.
125      * @return The index for the current agent.
126      */
127     public int getCurrent() {
128         return current;
129     }
130 
131     public int getRetries() {
132         return retries;
133     }
134 
135     public int getConnectTimeoutMillis() {
136         return connectTimeoutMillis;
137     }
138 
139     public int getRequestTimeoutMillis() {
140         return requestTimeoutMillis;
141     }
142 
143     public int getBatchSize() {
144         return batchSize;
145     }
146 
147     public int getDelayMillis() {
148         return delayMillis;
149     }
150 
151     public synchronized void send(final BatchEvent events) {
152         if (rpcClient == null) {
153             rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
154         }
155 
156         if (rpcClient != null) {
157             try {
158                 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
159                 rpcClient.appendBatch(events.getEvents());
160             } catch (final Exception ex) {
161                 rpcClient.close();
162                 rpcClient = null;
163                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
164                     agents[current].getPort();
165                 LOGGER.warn(msg, ex);
166                 throw new AppenderLoggingException("No Flume agents are available");
167             }
168         }  else {
169             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
170                 agents[current].getPort();
171             LOGGER.warn(msg);
172             throw new AppenderLoggingException("No Flume agents are available");
173         }
174     }
175 
176     @Override
177     public synchronized void send(final Event event)  {
178         if (batchSize == 1) {
179             if (rpcClient == null) {
180                 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
181             }
182 
183             if (rpcClient != null) {
184                 try {
185                     rpcClient.append(event);
186                 } catch (final Exception ex) {
187                     rpcClient.close();
188                     rpcClient = null;
189                     final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
190                             agents[current].getPort();
191                     LOGGER.warn(msg, ex);
192                     throw new AppenderLoggingException("No Flume agents are available");
193                 }
194             } else {
195                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
196                         agents[current].getPort();
197                 LOGGER.warn(msg);
198                 throw new AppenderLoggingException("No Flume agents are available");
199             }
200         } else {
201             batchEvent.addEvent(event);
202             final int eventCount = batchEvent.getEvents().size();
203             if (eventCount == 1) {
204                 nextSend = System.nanoTime() + delayNanos;
205             }
206             if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
207                 send(batchEvent);
208                 batchEvent = new BatchEvent();
209             }
210         }
211     }
212 
213     /**
214      * There is a very good chance that this will always return the first agent even if it isn't available.
215      * @param agents The list of agents to choose from
216      * @return The FlumeEventAvroServer.
217      */
218     private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
219         try {
220             final Properties props = new Properties();
221 
222             props.put("client.type", "default_failover");
223 
224             int agentCount = 1;
225             final StringBuilder sb = new StringBuilder();
226             for (final Agent agent : agents) {
227                 if (sb.length() > 0) {
228                     sb.append(' ');
229                 }
230                 final String hostName = "host" + agentCount++;
231                 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
232                 sb.append(hostName);
233             }
234             props.put("hosts", sb.toString());
235             if (batchSize > 0) {
236                 props.put("batch-size", Integer.toString(batchSize));
237             }
238             if (retries > 1) {
239                 if (retries > MAX_RECONNECTS) {
240                     retries = MAX_RECONNECTS;
241                 }
242                 props.put("max-attempts", Integer.toString(retries * agents.length));
243             }
244             if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
245                 props.put("request-timeout", Integer.toString(requestTimeoutMillis));
246             }
247             if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
248                 props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
249             }
250             return RpcClientFactory.getInstance(props);
251         } catch (final Exception ex) {
252             LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
253             return null;
254         }
255     }
256 
257     @Override
258     protected void releaseSub() {
259         if (rpcClient != null) {
260             try {
261                 synchronized(this) {
262                     try {
263                         if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
264                             send(batchEvent);
265                         }
266                     } catch (final Exception ex) {
267                         LOGGER.error("Error sending final batch: {}", ex.getMessage());
268                     }
269                 }
270                 rpcClient.close();
271             } catch (final Exception ex) {
272                 LOGGER.error("Attempt to close RPC client failed", ex);
273             }
274         }
275         rpcClient = null;
276     }
277 
278     /**
279      * Factory data.
280      */
281     private static class FactoryData {
282         private final String name;
283         private final Agent[] agents;
284         private final int batchSize;
285         private final int delayMillis;
286         private final int retries;
287         private final int conntectTimeoutMillis;
288         private final int requestTimeoutMillis;
289 
290         /**
291          * Constructor.
292          * @param name The name of the Appender.
293          * @param agents The agents.
294          * @param batchSize The number of events to include in a batch.
295          */
296         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
297                 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
298             this.name = name;
299             this.agents = agents;
300             this.batchSize = batchSize;
301             this.delayMillis = delayMillis;
302             this.retries = retries;
303             this.conntectTimeoutMillis = connectTimeoutMillis;
304             this.requestTimeoutMillis = requestTimeoutMillis;
305         }
306     }
307 
308     /**
309      * Avro Manager Factory.
310      */
311     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
312 
313         /**
314          * Create the FlumeAvroManager.
315          * @param name The name of the entity to manage.
316          * @param data The data required to create the entity.
317          * @return The FlumeAvroManager.
318          */
319         @Override
320         public FlumeAvroManager createManager(final String name, final FactoryData data) {
321             try {
322 
323                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
324                         data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
325             } catch (final Exception ex) {
326                 LOGGER.error("Could not create FlumeAvroManager", ex);
327             }
328             return null;
329         }
330     }
331 
332 }