1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.util.Properties;
20
21 import org.apache.flume.Event;
22 import org.apache.flume.api.RpcClient;
23 import org.apache.flume.api.RpcClientFactory;
24 import org.apache.logging.log4j.core.appender.AppenderLoggingException;
25 import org.apache.logging.log4j.core.appender.ManagerFactory;
26
27
28
29
30 public class FlumeAvroManager extends AbstractFlumeManager {
31
32 private static final int MAX_RECONNECTS = 3;
33 private static final int MINIMUM_TIMEOUT = 1000;
34
35 private static AvroManagerFactory factory = new AvroManagerFactory();
36
37 private final Agent[] agents;
38
39 private final int batchSize;
40
41 private final long delayNanos;
42 private final int delayMillis;
43
44 private final int retries;
45
46 private final int connectTimeoutMillis;
47
48 private final int requestTimeoutMillis;
49
50 private final int current = 0;
51
52 private RpcClient rpcClient = null;
53
54 private BatchEvent batchEvent = new BatchEvent();
55 private long nextSend = 0;
56
57
58
59
60
61
62
63
64
65
66
67 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
68 final int delayMillis, final int retries, final int connectTimeout, final int requestTimeout) {
69 super(name);
70 this.agents = agents;
71 this.batchSize = batchSize;
72 this.delayMillis = delayMillis;
73 this.delayNanos = delayMillis * 1000000;
74 this.retries = retries;
75 this.connectTimeoutMillis = connectTimeout;
76 this.requestTimeoutMillis = requestTimeout;
77 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize, final int delayMillis,
92 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
93 if (agents == null || agents.length == 0) {
94 throw new IllegalArgumentException("At least one agent is required");
95 }
96
97 if (batchSize <= 0) {
98 batchSize = 1;
99 }
100
101 final StringBuilder sb = new StringBuilder("FlumeAvro[");
102 boolean first = true;
103 for (final Agent agent : agents) {
104 if (!first) {
105 sb.append(',');
106 }
107 sb.append(agent.getHost()).append(':').append(agent.getPort());
108 first = false;
109 }
110 sb.append(']');
111 return getManager(sb.toString(), factory,
112 new FactoryData(name, agents, batchSize, delayMillis, retries, connectTimeoutMillis, requestTimeoutMillis));
113 }
114
115
116
117
118
119 public Agent[] getAgents() {
120 return agents;
121 }
122
123
124
125
126
127 public int getCurrent() {
128 return current;
129 }
130
131 public int getRetries() {
132 return retries;
133 }
134
135 public int getConnectTimeoutMillis() {
136 return connectTimeoutMillis;
137 }
138
139 public int getRequestTimeoutMillis() {
140 return requestTimeoutMillis;
141 }
142
143 public int getBatchSize() {
144 return batchSize;
145 }
146
147 public int getDelayMillis() {
148 return delayMillis;
149 }
150
151 public synchronized void send(final BatchEvent events) {
152 if (rpcClient == null) {
153 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
154 }
155
156 if (rpcClient != null) {
157 try {
158 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
159 rpcClient.appendBatch(events.getEvents());
160 } catch (final Exception ex) {
161 rpcClient.close();
162 rpcClient = null;
163 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
164 agents[current].getPort();
165 LOGGER.warn(msg, ex);
166 throw new AppenderLoggingException("No Flume agents are available");
167 }
168 } else {
169 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
170 agents[current].getPort();
171 LOGGER.warn(msg);
172 throw new AppenderLoggingException("No Flume agents are available");
173 }
174 }
175
176 @Override
177 public synchronized void send(final Event event) {
178 if (batchSize == 1) {
179 if (rpcClient == null) {
180 rpcClient = connect(agents, retries, connectTimeoutMillis, requestTimeoutMillis);
181 }
182
183 if (rpcClient != null) {
184 try {
185 rpcClient.append(event);
186 } catch (final Exception ex) {
187 rpcClient.close();
188 rpcClient = null;
189 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
190 agents[current].getPort();
191 LOGGER.warn(msg, ex);
192 throw new AppenderLoggingException("No Flume agents are available");
193 }
194 } else {
195 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ':' +
196 agents[current].getPort();
197 LOGGER.warn(msg);
198 throw new AppenderLoggingException("No Flume agents are available");
199 }
200 } else {
201 batchEvent.addEvent(event);
202 final int eventCount = batchEvent.getEvents().size();
203 if (eventCount == 1) {
204 nextSend = System.nanoTime() + delayNanos;
205 }
206 if (eventCount >= batchSize || System.nanoTime() >= nextSend) {
207 send(batchEvent);
208 batchEvent = new BatchEvent();
209 }
210 }
211 }
212
213
214
215
216
217
218 private RpcClient connect(final Agent[] agents, int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
219 try {
220 final Properties props = new Properties();
221
222 props.put("client.type", "default_failover");
223
224 int agentCount = 1;
225 final StringBuilder sb = new StringBuilder();
226 for (final Agent agent : agents) {
227 if (sb.length() > 0) {
228 sb.append(' ');
229 }
230 final String hostName = "host" + agentCount++;
231 props.put("hosts." + hostName, agent.getHost() + ':' + agent.getPort());
232 sb.append(hostName);
233 }
234 props.put("hosts", sb.toString());
235 if (batchSize > 0) {
236 props.put("batch-size", Integer.toString(batchSize));
237 }
238 if (retries > 1) {
239 if (retries > MAX_RECONNECTS) {
240 retries = MAX_RECONNECTS;
241 }
242 props.put("max-attempts", Integer.toString(retries * agents.length));
243 }
244 if (requestTimeoutMillis >= MINIMUM_TIMEOUT) {
245 props.put("request-timeout", Integer.toString(requestTimeoutMillis));
246 }
247 if (connectTimeoutMillis >= MINIMUM_TIMEOUT) {
248 props.put("connect-timeout", Integer.toString(connectTimeoutMillis));
249 }
250 return RpcClientFactory.getInstance(props);
251 } catch (final Exception ex) {
252 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
253 return null;
254 }
255 }
256
257 @Override
258 protected void releaseSub() {
259 if (rpcClient != null) {
260 try {
261 synchronized(this) {
262 try {
263 if (batchSize > 1 && batchEvent.getEvents().size() > 0) {
264 send(batchEvent);
265 }
266 } catch (final Exception ex) {
267 LOGGER.error("Error sending final batch: {}", ex.getMessage());
268 }
269 }
270 rpcClient.close();
271 } catch (final Exception ex) {
272 LOGGER.error("Attempt to close RPC client failed", ex);
273 }
274 }
275 rpcClient = null;
276 }
277
278
279
280
281 private static class FactoryData {
282 private final String name;
283 private final Agent[] agents;
284 private final int batchSize;
285 private final int delayMillis;
286 private final int retries;
287 private final int conntectTimeoutMillis;
288 private final int requestTimeoutMillis;
289
290
291
292
293
294
295
296 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int delayMillis,
297 final int retries, final int connectTimeoutMillis, final int requestTimeoutMillis) {
298 this.name = name;
299 this.agents = agents;
300 this.batchSize = batchSize;
301 this.delayMillis = delayMillis;
302 this.retries = retries;
303 this.conntectTimeoutMillis = connectTimeoutMillis;
304 this.requestTimeoutMillis = requestTimeoutMillis;
305 }
306 }
307
308
309
310
311 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
312
313
314
315
316
317
318
319 @Override
320 public FlumeAvroManager createManager(final String name, final FactoryData data) {
321 try {
322
323 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.delayMillis,
324 data.retries, data.conntectTimeoutMillis, data.requestTimeoutMillis);
325 } catch (final Exception ex) {
326 LOGGER.error("Could not create FlumeAvroManager", ex);
327 }
328 return null;
329 }
330 }
331
332 }