View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.flume.appender;
18  
19  import org.apache.avro.AvroRemoteException;
20  import org.apache.avro.ipc.NettyTransceiver;
21  import org.apache.avro.ipc.Transceiver;
22  import org.apache.avro.ipc.specific.SpecificRequestor;
23  import org.apache.flume.source.avro.AvroFlumeEvent;
24  import org.apache.flume.source.avro.AvroSourceProtocol;
25  import org.apache.flume.source.avro.Status;
26  import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
27  import org.apache.logging.log4j.core.appender.ManagerFactory;
28  
29  import java.io.IOException;
30  import java.net.InetSocketAddress;
31  import java.nio.ByteBuffer;
32  import java.util.ArrayList;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  /**
38   * Manager for FlumeAvroAppenders.
39   */
40  public class FlumeAvroManager extends AbstractFlumeManager {
41  
42      /**
43        The default reconnection delay (500 milliseconds or .5 seconds).
44       */
45      public static final int DEFAULT_RECONNECTION_DELAY   = 500;
46  
47      private static final int DEFAULT_RECONNECTS = 3;
48  
49      private static ManagerFactory factory = new AvroManagerFactory();
50  
51      private AvroSourceProtocol client;
52  
53      private final Agent[] agents;
54  
55      private final int batchSize;
56  
57      private final EventList events = new EventList();
58  
59      private int current = 0;
60  
61      private Transceiver transceiver;
62  
63      /**
64       * Constructor
65       * @param name The unique name of this manager.
66       * @param agents An array of Agents.
67       * @param batchSize The number of evetns to include in a batch.
68       */
69      protected FlumeAvroManager(String name, String shortName, Agent[] agents, int batchSize) {
70          super(name);
71          this.agents = agents;
72          this.batchSize = batchSize;
73          this.client = connect(agents);
74      }
75  
76      /**
77       * Returns a FlumeAvroManager.
78       * @param agents The agents to use.
79       * @param batchSize The number of events to include in a batch.
80       * @return A FlumeAvroManager.
81       */
82      public static FlumeAvroManager getManager(String name, Agent[] agents, int batchSize) {
83          if (agents == null || agents.length == 0) {
84              throw new IllegalArgumentException("At least one agent is required");
85          }
86  
87          if (batchSize <= 0) {
88              batchSize = 1;
89          }
90  
91          StringBuilder sb = new StringBuilder("FlumeAvro[");
92          boolean first = true;
93          for (Agent agent : agents) {
94              if (!first) {
95                  sb.append(",");
96              }
97              sb.append(agent.getHost()).append(":").append(agent.getPort());
98              first = false;
99          }
100         sb.append("]");
101         return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
102     }
103 
104     /**
105      * Returns the agents.
106      * @return The agent array.
107      */
108     public Agent[] getAgents() {
109         return agents;
110     }
111 
112     /**
113      * Returns the index of the current agent.
114      * @return The index for the current agent.
115      */
116     public int getCurrent() {
117         return current;
118     }
119 
120     @Override
121     public synchronized void send(FlumeEvent event, int delay, int retries)  {
122         if (delay == 0) {
123             delay = DEFAULT_RECONNECTION_DELAY;
124         }
125         if (retries == 0) {
126             retries = DEFAULT_RECONNECTS;
127         }
128         AvroFlumeEvent avroEvent = new AvroFlumeEvent();
129         avroEvent.body = ByteBuffer.wrap(event.getBody());
130         avroEvent.headers = new HashMap<CharSequence, CharSequence>();
131 
132         for (Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
133           avroEvent.headers.put(entry.getKey(), entry.getValue());
134         }
135 
136         List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
137         if (batch == null && batchSize > 1) {
138             return;
139         }
140 
141         int i = 0;
142 
143         String msg = "Error writing to " + getName();
144 
145         do {
146             try {
147                 Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
148                 if (!status.equals(Status.OK)) {
149                     throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
150                         ":" + agents[current].getPort());
151                 }
152                 return;
153             } catch (Exception ex) {
154                 if (i == retries - 1) {
155                     msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
156                         agents[current].getPort();
157                     LOGGER.warn(msg, ex);
158                     break;
159                 }
160                 sleep(delay);
161             }
162         } while (++i < retries);
163 
164         for (int index = 0; index < agents.length; ++index) {
165             if (index == current) {
166                 continue;
167             }
168             Agent agent = agents[index];
169             i = 0;
170             do {
171                 try {
172                     transceiver = null;
173                     AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
174                     Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
175                     if (!status.equals(Status.OK)) {
176                         if (i == retries - 1) {
177                             String warnMsg = "RPC communication failed to " + getName() + " at " +
178                                 agent.getHost() + ":" + agent.getPort();
179                             LOGGER.warn(warnMsg);
180                         }
181                         continue;
182                     }
183                     client = c;
184                     current = i;
185                     return;
186                 } catch (Exception ex) {
187                     if (i == retries - 1) {
188                         String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
189                             agent.getPort();
190                         LOGGER.warn(warnMsg, ex);
191                         break;
192                     }
193                     sleep(delay);
194                 }
195             } while (++i < retries);
196         }
197 
198         throw new AppenderRuntimeException(msg);
199 
200     }
201 
202     private void sleep(int delay) {
203         try {
204             Thread.sleep(delay);
205         } catch (InterruptedException ex) {
206             Thread.currentThread().interrupt();
207         }
208     }
209 
210     /**
211      * There is a very good chance that this will always return the first agent even if it isn't available.
212      * @param agents The list of agents to choose from
213      * @return The FlumeEventAvroServer.
214      */
215     private AvroSourceProtocol connect(Agent[] agents) {
216         int i = 0;
217         for (Agent agent : agents) {
218             AvroSourceProtocol server = connect(agent.getHost(), agent.getPort());
219             if (server != null) {
220                 current = i;
221                 return server;
222             }
223             ++i;
224         }
225         throw new AppenderRuntimeException("Unable to connect to any agents");
226     }
227 
228     private AvroSourceProtocol connect(String hostname, int port) {
229         try {
230             if (transceiver == null) {
231                 transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
232             }
233         } catch (IOException ioe) {
234             LOGGER.error("Unable to create transceiver", ioe);
235             return null;
236         }
237         try {
238             return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver);
239         } catch (IOException ioe) {
240             LOGGER.error("Unable to create Avro client");
241             return null;
242         }
243     }
244 
245     @Override
246     protected void releaseSub() {
247         if (transceiver != null) {
248             try {
249                 transceiver.close();
250             } catch (IOException ioe) {
251                 LOGGER.error("Attempt to clean up Avro transceiver failed", ioe);
252             }
253         }
254         client = null;
255     }
256 
257     /**
258      * Thread-safe List management of a batch.
259      */
260     private static class EventList extends ArrayList<AvroFlumeEvent> {
261 
262         public synchronized List<AvroFlumeEvent> addAndGet(AvroFlumeEvent event, int batchSize) {
263             super.add(event);
264             if (this.size() >= batchSize) {
265                 List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>();
266                 events.addAll(this);
267                 clear();
268                 return events;
269             } else {
270                 return null;
271             }
272         }
273     }
274 
275     /**
276      * Factory data.
277      */
278     private static class FactoryData {
279         private String name;
280         private Agent[] agents;
281         private int batchSize;
282 
283         /**
284          * Constructor.
285          * @param name The name of the Appender.
286          * @param agents The agents.
287          * @param batchSize The number of events to include in a batch.
288          */
289         public FactoryData(String name, Agent[] agents, int batchSize) {
290             this.name = name;
291             this.agents = agents;
292             this.batchSize = batchSize;
293         }
294     }
295 
296     /**
297      * Avro Manager Factory.
298      */
299     private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
300 
301         /**
302          * Create the FlumeAvroManager.
303          * @param name The name of the entity to manage.
304          * @param data The data required to create the entity.
305          * @return The FlumeAvroManager.
306          */
307         public FlumeAvroManager createManager(String name, FactoryData data) {
308             try {
309 
310                 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
311             } catch (Exception ex) {
312                 LOGGER.error("Could not create FlumeAvroManager", ex);
313             }
314             return null;
315         }
316     }
317 
318 }