1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.io.Serializable;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ArrayBlockingQueue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.LogEvent;
30 import org.apache.logging.log4j.core.async.RingBufferLogEvent;
31 import org.apache.logging.log4j.core.config.AppenderControl;
32 import org.apache.logging.log4j.core.config.AppenderRef;
33 import org.apache.logging.log4j.core.config.Configuration;
34 import org.apache.logging.log4j.core.config.ConfigurationException;
35 import org.apache.logging.log4j.core.config.plugins.Plugin;
36 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
37 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
38 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
39 import org.apache.logging.log4j.core.config.plugins.PluginElement;
40 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
41 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
42 import org.apache.logging.log4j.core.util.Constants;
43
44
45
46
47
48
49 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50 public final class AsyncAppender extends AbstractAppender {
51
52 private static final long serialVersionUID = 1L;
53 private static final int DEFAULT_QUEUE_SIZE = 128;
54 private static final String SHUTDOWN = "Shutdown";
55
56 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
57 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
58
59 private final BlockingQueue<Serializable> queue;
60 private final int queueSize;
61 private final boolean blocking;
62 private final long shutdownTimeout;
63 private final Configuration config;
64 private final AppenderRef[] appenderRefs;
65 private final String errorRef;
66 private final boolean includeLocation;
67 private AppenderControl errorAppender;
68 private AsyncThread thread;
69
70 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
71 final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
72 final long shutdownTimeout, final Configuration config, final boolean includeLocation) {
73 super(name, filter, null, ignoreExceptions);
74 this.queue = new ArrayBlockingQueue<>(queueSize);
75 this.queueSize = queueSize;
76 this.blocking = blocking;
77 this.shutdownTimeout = shutdownTimeout;
78 this.config = config;
79 this.appenderRefs = appenderRefs;
80 this.errorRef = errorRef;
81 this.includeLocation = includeLocation;
82 }
83
84 @Override
85 public void start() {
86 final Map<String, Appender> map = config.getAppenders();
87 final List<AppenderControl> appenders = new ArrayList<>();
88 for (final AppenderRef appenderRef : appenderRefs) {
89 final Appender appender = map.get(appenderRef.getRef());
90 if (appender != null) {
91 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
92 } else {
93 LOGGER.error("No appender named {} was configured", appenderRef);
94 }
95 }
96 if (errorRef != null) {
97 final Appender appender = map.get(errorRef);
98 if (appender != null) {
99 errorAppender = new AppenderControl(appender, null, null);
100 } else {
101 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
102 }
103 }
104 if (appenders.size() > 0) {
105 thread = new AsyncThread(appenders, queue);
106 thread.setName("AsyncAppender-" + getName());
107 } else if (errorRef == null) {
108 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
109 }
110
111 thread.start();
112 super.start();
113 }
114
115 @Override
116 public void stop() {
117 super.stop();
118 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
119 thread.shutdown();
120 try {
121 thread.join(shutdownTimeout);
122 } catch (final InterruptedException ex) {
123 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
124 }
125 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
126 }
127
128
129
130
131
132
133 @Override
134 public void append(LogEvent logEvent) {
135 if (!isStarted()) {
136 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
137 }
138 if (!(logEvent instanceof Log4jLogEvent)) {
139 if (!(logEvent instanceof RingBufferLogEvent)) {
140 return;
141 }
142 logEvent = ((RingBufferLogEvent) logEvent).createMemento();
143 }
144 if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) {
145 logEvent.getMessage().getFormattedMessage();
146 }
147 final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
148 boolean appendSuccessful = false;
149 if (blocking) {
150 if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
151
152
153 coreEvent.setEndOfBatch(false);
154 appendSuccessful = thread.callAppenders(coreEvent);
155 } else {
156 final Serializable serialized = Log4jLogEvent.serialize(coreEvent, includeLocation);
157 try {
158
159 queue.put(serialized);
160 appendSuccessful = true;
161 } catch (final InterruptedException e) {
162
163
164
165
166
167
168
169
170
171
172
173 appendSuccessful = queue.offer(serialized);
174 if (!appendSuccessful) {
175 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
176 getName());
177 }
178
179 Thread.currentThread().interrupt();
180 }
181 }
182 } else {
183 appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
184 if (!appendSuccessful) {
185 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
186 }
187 }
188 if (!appendSuccessful && errorAppender != null) {
189 errorAppender.callAppender(coreEvent);
190 }
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210 @PluginFactory
211 public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
212 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
213 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
214 @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout,
215 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
216 @PluginAttribute("name") final String name,
217 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
218 @PluginElement("Filter") final Filter filter, @PluginConfiguration final Configuration config,
219 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
220 if (name == null) {
221 LOGGER.error("No name provided for AsyncAppender");
222 return null;
223 }
224 if (appenderRefs == null) {
225 LOGGER.error("No appender references provided to AsyncAppender {}", name);
226 }
227
228 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
229 shutdownTimeout, config, includeLocation);
230 }
231
232
233
234
235 private class AsyncThread extends Thread {
236
237 private volatile boolean shutdown = false;
238 private final List<AppenderControl> appenders;
239 private final BlockingQueue<Serializable> queue;
240
241 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
242 this.appenders = appenders;
243 this.queue = queue;
244 setDaemon(true);
245 setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement());
246 }
247
248 @Override
249 public void run() {
250 isAppenderThread.set(Boolean.TRUE);
251 while (!shutdown) {
252 Serializable s;
253 try {
254 s = queue.take();
255 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
256 shutdown = true;
257 continue;
258 }
259 } catch (final InterruptedException ex) {
260 break;
261 }
262 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
263 event.setEndOfBatch(queue.isEmpty());
264 final boolean success = callAppenders(event);
265 if (!success && errorAppender != null) {
266 try {
267 errorAppender.callAppender(event);
268 } catch (final Exception ex) {
269
270 }
271 }
272 }
273
274 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
275 queue.size());
276 int count = 0;
277 int ignored = 0;
278 while (!queue.isEmpty()) {
279 try {
280 final Serializable s = queue.take();
281 if (Log4jLogEvent.canDeserialize(s)) {
282 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
283 event.setEndOfBatch(queue.isEmpty());
284 callAppenders(event);
285 count++;
286 } else {
287 ignored++;
288 LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
289 }
290 } catch (final InterruptedException ex) {
291
292
293 }
294 }
295 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
296 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
297 }
298
299
300
301
302
303
304
305
306
307 boolean callAppenders(final Log4jLogEvent event) {
308 boolean success = false;
309 for (final AppenderControl control : appenders) {
310 try {
311 control.callAppender(event);
312 success = true;
313 } catch (final Exception ex) {
314
315 }
316 }
317 return success;
318 }
319
320 public void shutdown() {
321 shutdown = true;
322 if (queue.isEmpty()) {
323 queue.offer(SHUTDOWN);
324 }
325 }
326 }
327
328
329
330
331
332
333 public String[] getAppenderRefStrings() {
334 final String[] result = new String[appenderRefs.length];
335 for (int i = 0; i < result.length; i++) {
336 result[i] = appenderRefs[i].getRef();
337 }
338 return result;
339 }
340
341
342
343
344
345
346
347 public boolean isIncludeLocation() {
348 return includeLocation;
349 }
350
351
352
353
354
355
356
357 public boolean isBlocking() {
358 return blocking;
359 }
360
361
362
363
364
365
366 public String getErrorRef() {
367 return errorRef;
368 }
369
370 public int getQueueCapacity() {
371 return queueSize;
372 }
373
374 public int getQueueRemainingCapacity() {
375 return queue.remainingCapacity();
376 }
377 }