1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import org.apache.avro.AvroRemoteException;
20 import org.apache.avro.ipc.NettyTransceiver;
21 import org.apache.avro.ipc.Transceiver;
22 import org.apache.avro.ipc.specific.SpecificRequestor;
23 import org.apache.flume.source.avro.AvroFlumeEvent;
24 import org.apache.flume.source.avro.AvroSourceProtocol;
25 import org.apache.flume.source.avro.Status;
26 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
27 import org.apache.logging.log4j.core.appender.ManagerFactory;
28
29 import java.io.IOException;
30 import java.net.InetSocketAddress;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36
37
38
39
40 public class FlumeAvroManager extends AbstractFlumeManager {
41
42
43
44
45 public static final int DEFAULT_RECONNECTION_DELAY = 500;
46
47 private static final int DEFAULT_RECONNECTS = 3;
48
49 private static ManagerFactory factory = new AvroManagerFactory();
50
51 private AvroSourceProtocol client;
52
53 private final Agent[] agents;
54
55 private final int batchSize;
56
57 private final EventList events = new EventList();
58
59 private int current = 0;
60
61 private Transceiver transceiver;
62
63
64
65
66
67
68
69 protected FlumeAvroManager(String name, String shortName, Agent[] agents, int batchSize) {
70 super(name);
71 this.agents = agents;
72 this.batchSize = batchSize;
73 this.client = connect(agents);
74 }
75
76
77
78
79
80
81
82 public static FlumeAvroManager getManager(String name, Agent[] agents, int batchSize) {
83 if (agents == null || agents.length == 0) {
84 throw new IllegalArgumentException("At least one agent is required");
85 }
86
87 if (batchSize <= 0) {
88 batchSize = 1;
89 }
90
91 StringBuilder sb = new StringBuilder("FlumeAvro[");
92 boolean first = true;
93 for (Agent agent : agents) {
94 if (!first) {
95 sb.append(",");
96 }
97 sb.append(agent.getHost()).append(":").append(agent.getPort());
98 first = false;
99 }
100 sb.append("]");
101 return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(name, agents, batchSize));
102 }
103
104
105
106
107
108 public Agent[] getAgents() {
109 return agents;
110 }
111
112
113
114
115
116 public int getCurrent() {
117 return current;
118 }
119
120 @Override
121 public synchronized void send(FlumeEvent event, int delay, int retries) {
122 if (delay == 0) {
123 delay = DEFAULT_RECONNECTION_DELAY;
124 }
125 if (retries == 0) {
126 retries = DEFAULT_RECONNECTS;
127 }
128 AvroFlumeEvent avroEvent = new AvroFlumeEvent();
129 avroEvent.body = ByteBuffer.wrap(event.getBody());
130 avroEvent.headers = new HashMap<CharSequence, CharSequence>();
131
132 for (Map.Entry<String, String> entry : event.getHeaders().entrySet()) {
133 avroEvent.headers.put(entry.getKey(), entry.getValue());
134 }
135
136 List<AvroFlumeEvent> batch = batchSize > 1 ? events.addAndGet(avroEvent, batchSize) : null;
137 if (batch == null && batchSize > 1) {
138 return;
139 }
140
141 int i = 0;
142
143 String msg = "Error writing to " + getName();
144
145 do {
146 try {
147 Status status = (batch == null) ? client.append(avroEvent) : client.appendBatch(batch);
148 if (!status.equals(Status.OK)) {
149 throw new AvroRemoteException("RPC communication failed to " + agents[current].getHost() +
150 ":" + agents[current].getPort());
151 }
152 return;
153 } catch (Exception ex) {
154 if (i == retries - 1) {
155 msg = "Error writing to " + getName() + " at " + agents[current].getHost() + ":" +
156 agents[current].getPort();
157 LOGGER.warn(msg, ex);
158 break;
159 }
160 sleep(delay);
161 }
162 } while (++i < retries);
163
164 for (int index = 0; index < agents.length; ++index) {
165 if (index == current) {
166 continue;
167 }
168 Agent agent = agents[index];
169 i = 0;
170 do {
171 try {
172 transceiver = null;
173 AvroSourceProtocol c = connect(agent.getHost(), agent.getPort());
174 Status status = (batch == null) ? c.append(avroEvent) : c.appendBatch(batch);
175 if (!status.equals(Status.OK)) {
176 if (i == retries - 1) {
177 String warnMsg = "RPC communication failed to " + getName() + " at " +
178 agent.getHost() + ":" + agent.getPort();
179 LOGGER.warn(warnMsg);
180 }
181 continue;
182 }
183 client = c;
184 current = i;
185 return;
186 } catch (Exception ex) {
187 if (i == retries - 1) {
188 String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
189 agent.getPort();
190 LOGGER.warn(warnMsg, ex);
191 break;
192 }
193 sleep(delay);
194 }
195 } while (++i < retries);
196 }
197
198 throw new AppenderRuntimeException(msg);
199
200 }
201
202 private void sleep(int delay) {
203 try {
204 Thread.sleep(delay);
205 } catch (InterruptedException ex) {
206 Thread.currentThread().interrupt();
207 }
208 }
209
210
211
212
213
214
215 private AvroSourceProtocol connect(Agent[] agents) {
216 int i = 0;
217 for (Agent agent : agents) {
218 AvroSourceProtocol server = connect(agent.getHost(), agent.getPort());
219 if (server != null) {
220 current = i;
221 return server;
222 }
223 ++i;
224 }
225 throw new AppenderRuntimeException("Unable to connect to any agents");
226 }
227
228 private AvroSourceProtocol connect(String hostname, int port) {
229 try {
230 if (transceiver == null) {
231 transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
232 }
233 } catch (IOException ioe) {
234 LOGGER.error("Unable to create transceiver", ioe);
235 return null;
236 }
237 try {
238 return SpecificRequestor.getClient(AvroSourceProtocol.class, transceiver);
239 } catch (IOException ioe) {
240 LOGGER.error("Unable to create Avro client");
241 return null;
242 }
243 }
244
245 @Override
246 protected void releaseSub() {
247 if (transceiver != null) {
248 try {
249 transceiver.close();
250 } catch (IOException ioe) {
251 LOGGER.error("Attempt to clean up Avro transceiver failed", ioe);
252 }
253 }
254 client = null;
255 }
256
257
258
259
260 private static class EventList extends ArrayList<AvroFlumeEvent> {
261
262 public synchronized List<AvroFlumeEvent> addAndGet(AvroFlumeEvent event, int batchSize) {
263 super.add(event);
264 if (this.size() >= batchSize) {
265 List<AvroFlumeEvent> events = new ArrayList<AvroFlumeEvent>();
266 events.addAll(this);
267 clear();
268 return events;
269 } else {
270 return null;
271 }
272 }
273 }
274
275
276
277
278 private static class FactoryData {
279 private String name;
280 private Agent[] agents;
281 private int batchSize;
282
283
284
285
286
287
288
289 public FactoryData(String name, Agent[] agents, int batchSize) {
290 this.name = name;
291 this.agents = agents;
292 this.batchSize = batchSize;
293 }
294 }
295
296
297
298
299 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
300
301
302
303
304
305
306
307 public FlumeAvroManager createManager(String name, FactoryData data) {
308 try {
309
310 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize);
311 } catch (Exception ex) {
312 LOGGER.error("Could not create FlumeAvroManager", ex);
313 }
314 return null;
315 }
316 }
317
318 }