View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import java.util.Properties;
20  import java.util.concurrent.TimeUnit;
21  
22  import org.apache.flume.Event;
23  import org.apache.flume.api.RpcClient;
24  import org.apache.flume.api.RpcClientFactory;
25  import org.apache.logging.log4j.core.appender.AppenderLoggingException;
26  import org.apache.logging.log4j.core.appender.ManagerFactory;
27  
28  /**
29   * Manager for FlumeAvroAppenders.
30   */
31  public class FlumeAvroManager extends AbstractFlumeManager {
32  
33      private static final int MAX_RECONNECTS = 3;
34      private static final int MINIMUM_TIMEOUT = 1000;
35  
36      private static AvroManagerFactory factory = new AvroManagerFactory();
37  
38      private final Agent[] agents;
39  
40      private final int batchSize;
41  
42      private final long delayNanos;
43      private final int delayMillis;
44  
45      private final int retries;
46  
47      private final int connectTimeoutMillis;
48  
49      private final int requestTimeoutMillis;
50  
51      private final int current = 0;
52  
53      private RpcClient rpcClient = null;
54  
55      private BatchEvent batchEvent = new BatchEvent();
56      private long nextSend = 0;
57  
58      /**
59       * Constructor
60       * @param name The unique name of this manager.
61       * @param agents An array of Agents.
62       * @param batchSize The number of events to include in a batch.
63       * @param retries The number of times to retry connecting before giving up.
64       * @param connectTimeout The connection timeout in ms.
65       * @param requestTimeout The request timeout in ms.
66       *
67       */
68      protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
69                                 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
70          super(name);
71          this.agents = agents;
72          this.batchSize = batchSize;
73          this.delayMillis = delayMillis;
74          this.delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
75          this.retries = retries;
76          this.connectTimeoutMillis = connectTimeout;
77          this.requestTimeoutMillis = requestTimeout;
78          this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
79      }
80  
81      /**
82       * Returns a FlumeAvroManager.
83       * @param name The name of the manager.
84       * @param agents The agents to use.
85       * @param batchSize The number of events to include in a batch.
86       * @param delayMillis The number of milliseconds to wait before sending an incomplete batch.
87       * @param retries The number of times to retry connecting before giving up.
88       * @param connectTimeoutMillis The connection timeout in ms.
89       * @param requestTimeoutMillis The request timeout in ms.
90       * @return A FlumeAvroManager.
91       */
92      public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
93                                                final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
94          if (agents == null || agents.length == 0) {
95              throw new IllegalArgumentException("At least one agent is required");
96          }
97  
98          if (batchSize <= 0) {
99              batchSize = 1;
100         }
101 
102         final StringBuilder sb = new StringBuilder("FlumeAvro[");
103         boolean first = true;
104         for (final Agent agent : agents) {
105             if (!first) {
106                 sb.append(',');
107             }
108             sb.append(agent.getHost()).append(':').append(agent.getPort());
109             first = false;
110         }
111         sb.append(']');
112         return getManager(sb.toString(), factory,
113                 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
114     }
115 
116     /**
117      * Returns the agents.
118      * @return The agent array.
119      */
120     public Agent[] getAgents() {
121         return agents;
122     }
123 
124     /**
125      * Returns the index of the current agent.
126      * @return The index for the current agent.
127      */
128     public int getCurrent() {
129         return current;
130     }
131 
132     public int getRetries() {
133         return retries;
134     }
135 
136     public int getConnectTimeoutMillis() {
137         return connectTimeoutMillis;
138     }
139 
140     public int getRequestTimeoutMillis() {
141         return requestTimeoutMillis;
142     }
143 
144     public int getBatchSize() {
145         return batchSize;
146     }
147 
148     public int getDelayMillis() {
149         return delayMillis;
150     }
151 
152     public synchronized void send(final BatchEvent events) {
153         if (rpcClient == null) {
154             rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
155         }
156 
157         if (rpcClient != null) {
158             try {
159                 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
160                 rpcClient.appendBatch(events.getEvents());
161             } catch (final Exception ex) {
162                 rpcClient.close();
163                 rpcClient = null;
164                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
165                     agents[current].getPort();
166                 LOGGER.warn(msg, ex);
167                 throw new AppenderLoggingException("No Flume agents are available");
168             }
169         }  else {
170             final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
171                 agents[current].getPort();
172             LOGGER.warn(msg);
173             throw new AppenderLoggingException("No Flume agents are available");
174         }
175     }
176 
177     @Override
178     public synchronized void send(final Event event)  {
179         if (batchSize == 1) {
180             if (rpcClient == null) {
181                 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
182             }
183 
184             if (rpcClient != null) {
185                 try {
186                     rpcClient.append(event);
187                 } catch (final Exception ex) {
188                     rpcClient.close();
189                     rpcClient = null;
190                     final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
191                             agents[current].getPort();
192                     LOGGER.warn(msg, ex);
193                     throw new AppenderLoggingException("No Flume agents are available");
194                 }
195             } else {
196                 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
197                         agents[current].getPort();
198                 LOGGER.warn(msg);
199                 throw new AppenderLoggingException("No Flume agents are available");
200             }
201         } else {
202             batchEvent.addEvent(event);
203             final int eventCount = batchEvent.getEvents().size();
204             if (eventCount == 1) {
205                 nextSend = System.nanoTime() + delayNanos;
206             }
207             if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
208                 send(batchEvent);
209                 batchEvent = new BatchEvent();
210             }
211         }
212     }
213 
214     /**
215      * There is a very good chance that this will always return the first agent even if it isn't available.
216      * @param agents The list of agents to choose from
217      * @return The FlumeEventAvroServer.
218      */
219     private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
220         try {
221             final Properties props = new Properties();
222 
223             props.put("client.type", "default_failover");
224 
225             int agentCount = 1;
226             final StringBuilder sb = new StringBuilder();
227             for (final Agent agent : agents) {
228                 if (sb.length() > 0) {
229                     sb.append(' ');
230                 }
231                 final String hostName = "host" + agentCount++;
232                 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
233                 sb.append(hostName);
234             }
235             props.put("hosts", sb.toString());
236             if (batchSize > 0) {
237                 props.put("batch-size", Integer.toString(batchSize));
238             }
239             if (retries > 1) {
240                 if (retries > MAX_RECONNECTS) {
241                     retries = MAX_RECONNECTS;
242                 }
243                 props.put("max-attempts", Integer.toString(retries * agents.length));
244             }
245             if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
246                 props.put("request-timeout", Integer.toString(requestTimeoutMillis));
247             }
248             if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
249                 props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
250             }
251             return RpcClientFactory.getInstance(props);
252         } catch (final Exception ex) {
253             LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
254             return null;
255         }
256     }
257 
258     @Override
259     protected boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
260     	boolean closed = true;
261         if (rpcClient != null) {
262             try {
263                 synchronized(this) {
264                     try {
265                         if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
266                             send(batchEvent);
267                         }
268                     } catch (final Exception ex) {
269                         LOGGER.error("Error sending final batch: {}", ex.getMessage());
270                         closed = false;
271                     }
272                 }
273                 rpcClient.close();
274             } catch (final Exception ex) {
275                 LOGGER.error("Attempt to close RPC client failed", ex);
276                 closed = false;
277             }
278         }
279         rpcClient = null;
280         return closed;
281     }
282 
283     /**
284      * Factory data.
285      */
286     private static class FactoryData {
287         private final String name;
288         private final Agent[] agents;
289         private final int batchSize;
290         private final int delayMillis;
291         private final int retries;
292         private final int conntectTimeoutMillis;
293         private final int requestTimeoutMillis;
294 
295         /**
296          * Constructor.
297          * @param name The name of the Appender.
298          * @param agents The agents.
299          * @param batchSize The number of events to include in a batch.
300          */
301         public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
302                 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
303             this.name = name;
304             this.agents = agents;
305             this.batchSize = batchSize;
306             this.delayMillis = delayMillis;
307             this.retries = retries;
308             this.conntectTimeoutMillis = connectTimeoutMillis;
309             this.requestTimeoutMillis = requestTimeoutMillis;
310         }
311     }
312 
313     /**
314      * Avro Manager Factory.
315      */
316     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
317 
318         /**
319          * Create the FlumeAvroManager.
320          * @param name The name of the entity to manage.
321          * @param data The data required to create the entity.
322          * @return The FlumeAvroManager.
323          */
324         @Override
325         public FlumeAvroManager createManager(final String name, final FactoryData data) {
326             try {
327 
328                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
329                         data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
330             } catch (final Exception ex) {
331                 LOGGER.error("Could not create FlumeAvroManager", ex);
332             }
333             return null;
334         }
335     }
336 
337 }