1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.flume.appender;
18
19 import java.util.Properties;
20
21 import org.apache.flume.Event;
22 import org.apache.flume.api.RpcClient;
23 import org.apache.flume.api.RpcClientFactory;
24 import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
25 import org.apache.logging.log4j.core.appender.ManagerFactory;
26
27
28
29
30 public class FlumeAvroManager extends AbstractFlumeManager {
31
32 private static final int MAX_RECONNECTS = 3;
33 private static final int MINIMUM_TIMEOUT = 1000;
34
35 private static AvroManagerFactory factory = new AvroManagerFactory();
36
37 private final Agent[] agents;
38
39 private final int batchSize;
40
41 private final int retries;
42
43 private final int connectTimeout;
44
45 private final int requestTimeout;
46
47 private final int current = 0;
48
49 private RpcClient rpcClient = null;
50
51
52
53
54
55
56
57
58
59
60
61 protected FlumeAvroManager(final String name, final String shortName, final Agent[] agents, final int batchSize,
62 final int retries, final int connectTimeout, final int requestTimeout) {
63 super(name);
64 this.agents = agents;
65 this.batchSize = batchSize;
66 this.retries = retries;
67 this.connectTimeout = connectTimeout;
68 this.requestTimeout = requestTimeout;
69 this.rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
70 }
71
72
73
74
75
76
77
78
79
80
81
82 public static FlumeAvroManager getManager(final String name, final Agent[] agents, int batchSize,
83 final int retries, final int connectTimeout, final int requestTimeout) {
84 if (agents == null || agents.length == 0) {
85 throw new IllegalArgumentException("At least one agent is required");
86 }
87
88 if (batchSize <= 0) {
89 batchSize = 1;
90 }
91
92 final StringBuilder sb = new StringBuilder("FlumeAvro[");
93 boolean first = true;
94 for (final Agent agent : agents) {
95 if (!first) {
96 sb.append(",");
97 }
98 sb.append(agent.getHost()).append(":").append(agent.getPort());
99 first = false;
100 }
101 sb.append("]");
102 return getManager(sb.toString(), factory,
103 new FactoryData(name, agents, batchSize, retries, connectTimeout, requestTimeout));
104 }
105
106
107
108
109
110 public Agent[] getAgents() {
111 return agents;
112 }
113
114
115
116
117
118 public int getCurrent() {
119 return current;
120 }
121
122 public int getRetries() {
123 return retries;
124 }
125
126 public int getConnectTimeout() {
127 return connectTimeout;
128 }
129
130 public int getRequestTimeout() {
131 return requestTimeout;
132 }
133
134 public synchronized void send(final BatchEvent events) {
135 if (rpcClient == null) {
136 rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
137 }
138
139 if (rpcClient != null) {
140 try {
141 LOGGER.trace("Sending batch of {} events", events.getEvents().size());
142 rpcClient.appendBatch(events.getEvents());
143 } catch (final Exception ex) {
144 rpcClient.close();
145 rpcClient = null;
146 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
147 agents[current].getPort();
148 LOGGER.warn(msg, ex);
149 throw new AppenderRuntimeException("No Flume agents are available");
150 }
151 } else {
152 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
153 agents[current].getPort();
154 LOGGER.warn(msg);
155 throw new AppenderRuntimeException("No Flume agents are available");
156 }
157 }
158
159 @Override
160 public synchronized void send(final Event event) {
161 if (rpcClient == null) {
162 rpcClient = connect(agents, retries, connectTimeout, requestTimeout);
163 }
164
165 if (rpcClient != null) {
166 try {
167 rpcClient.append(event);
168 } catch (final Exception ex) {
169 rpcClient.close();
170 rpcClient = null;
171 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
172 agents[current].getPort();
173 LOGGER.warn(msg, ex);
174 throw new AppenderRuntimeException("No Flume agents are available");
175 }
176 } else {
177 final String msg = "Unable to write to " + getName() + " at " + agents[current].getHost() + ":" +
178 agents[current].getPort();
179 LOGGER.warn(msg);
180 throw new AppenderRuntimeException("No Flume agents are available");
181 }
182 }
183
184
185
186
187
188
189
190 private RpcClient connect(final Agent[] agents, int retries, final int connectTimeout, final int requestTimeout) {
191 try {
192 final Properties props = new Properties();
193
194 props.put("client.type", agents.length > 1 ? "default_failover" : "default");
195
196 int count = 1;
197 final StringBuilder sb = new StringBuilder();
198 for (final Agent agent : agents) {
199 if (sb.length() > 0) {
200 sb.append(" ");
201 }
202 final String hostName = "host" + count++;
203 props.put("hosts." + hostName, agent.getHost() + ":" + agent.getPort());
204 sb.append(hostName);
205 }
206 props.put("hosts", sb.toString());
207 if (batchSize > 0) {
208 props.put("batch-size", Integer.toString(batchSize));
209 }
210 if (retries > 1) {
211 if (retries > MAX_RECONNECTS) {
212 retries = MAX_RECONNECTS;
213 }
214 props.put("max-attempts", Integer.toString(retries * agents.length));
215 }
216 if (requestTimeout >= MINIMUM_TIMEOUT) {
217 props.put("request-timeout", Integer.toString(requestTimeout));
218 }
219 if (connectTimeout >= MINIMUM_TIMEOUT) {
220 props.put("connect-timeout", Integer.toString(connectTimeout));
221 }
222 return RpcClientFactory.getInstance(props);
223 } catch (final Exception ex) {
224 LOGGER.error("Unable to create Flume RPCClient: {}", ex.getMessage());
225 return null;
226 }
227 }
228
229 @Override
230 protected void releaseSub() {
231 if (rpcClient != null) {
232 try {
233 rpcClient.close();
234 } catch (final Exception ex) {
235 LOGGER.error("Attempt to close RPC client failed", ex);
236 }
237 }
238 rpcClient = null;
239 }
240
241
242
243
244 private static class FactoryData {
245 private final String name;
246 private final Agent[] agents;
247 private final int batchSize;
248 private final int retries;
249 private final int conntectTimeout;
250 private final int requestTimeout;
251
252
253
254
255
256
257
258 public FactoryData(final String name, final Agent[] agents, final int batchSize, final int retries,
259 final int connectTimeout, final int requestTimeout) {
260 this.name = name;
261 this.agents = agents;
262 this.batchSize = batchSize;
263 this.retries = retries;
264 this.conntectTimeout = connectTimeout;
265 this.requestTimeout = requestTimeout;
266 }
267 }
268
269
270
271
272 private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
273
274
275
276
277
278
279
280 @Override
281 public FlumeAvroManager createManager(final String name, final FactoryData data) {
282 try {
283
284 return new FlumeAvroManager(name, data.name, data.agents, data.batchSize, data.retries,
285 data.conntectTimeout, data.requestTimeout);
286 } catch (final Exception ex) {
287 LOGGER.error("Could not create FlumeAvroManager", ex);
288 }
289 return null;
290 }
291 }
292
293 }