1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.ArrayBlockingQueue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.atomic.AtomicLong;
25
26 import org.apache.logging.log4j.core.AbstractLogEvent;
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.LogEvent;
30 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
31 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
32 import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
33 import org.apache.logging.log4j.core.async.EventRoute;
34 import org.apache.logging.log4j.core.config.AppenderControl;
35 import org.apache.logging.log4j.core.config.AppenderRef;
36 import org.apache.logging.log4j.core.config.Configuration;
37 import org.apache.logging.log4j.core.config.ConfigurationException;
38 import org.apache.logging.log4j.core.config.plugins.Plugin;
39 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
40 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
41 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
42 import org.apache.logging.log4j.core.config.plugins.PluginElement;
43 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
44 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
45 import org.apache.logging.log4j.core.util.Constants;
46
47
48
49
50
51
52 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
53 public final class AsyncAppender extends AbstractAppender {
54
55 private static final int DEFAULT_QUEUE_SIZE = 128;
56 private static final LogEvent SHUTDOWN = new AbstractLogEvent() {
57 };
58
59 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
60
61 private final BlockingQueue<LogEvent> queue;
62 private final int queueSize;
63 private final boolean blocking;
64 private final long shutdownTimeout;
65 private final Configuration config;
66 private final AppenderRef[] appenderRefs;
67 private final String errorRef;
68 private final boolean includeLocation;
69 private AppenderControl errorAppender;
70 private AsyncThread thread;
71 private AsyncQueueFullPolicy asyncQueueFullPolicy;
72
73 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
74 final String errorRef, final int queueSize, final boolean blocking,
75 final boolean ignoreExceptions,
76 final long shutdownTimeout, final Configuration config, final boolean includeLocation) {
77 super(name, filter, null, ignoreExceptions);
78 this.queue = new ArrayBlockingQueue<>(queueSize);
79 this.queueSize = queueSize;
80 this.blocking = blocking;
81 this.shutdownTimeout = shutdownTimeout;
82 this.config = config;
83 this.appenderRefs = appenderRefs;
84 this.errorRef = errorRef;
85 this.includeLocation = includeLocation;
86 }
87
88 @Override
89 public void start() {
90 final Map<String, Appender> map = config.getAppenders();
91 final List<AppenderControl> appenders = new ArrayList<>();
92 for (final AppenderRef appenderRef : appenderRefs) {
93 final Appender appender = map.get(appenderRef.getRef());
94 if (appender != null) {
95 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
96 } else {
97 LOGGER.error("No appender named {} was configured", appenderRef);
98 }
99 }
100 if (errorRef != null) {
101 final Appender appender = map.get(errorRef);
102 if (appender != null) {
103 errorAppender = new AppenderControl(appender, null, null);
104 } else {
105 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
106 }
107 }
108 if (appenders.size() > 0) {
109 thread = new AsyncThread(appenders, queue);
110 thread.setName("AsyncAppender-" + getName());
111 } else if (errorRef == null) {
112 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
113 }
114 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
115
116 thread.start();
117 super.start();
118 }
119
120 @Override
121 public void stop() {
122 super.stop();
123 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
124 thread.shutdown();
125 try {
126 thread.join(shutdownTimeout);
127 } catch (final InterruptedException ex) {
128 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
129 }
130 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
131
132 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
133 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
134 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
135 }
136 }
137
138
139
140
141
142
143 @Override
144 public void append(final LogEvent logEvent) {
145 if (!isStarted()) {
146 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
147 }
148 if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) {
149 logEvent.getMessage().getFormattedMessage();
150 }
151 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
152 if (!queue.offer(memento)) {
153 if (blocking) {
154
155 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
156 route.logMessage(this, memento);
157 } else {
158 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
159 logToErrorAppenderIfNecessary(false, memento);
160 }
161 }
162 }
163
164
165
166
167
168
169 public void logMessageInCurrentThread(final LogEvent logEvent) {
170 logEvent.setEndOfBatch(queue.isEmpty());
171 final boolean appendSuccessful = thread.callAppenders(logEvent);
172 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
173 }
174
175
176
177
178
179
180 public void logMessageInBackgroundThread(final LogEvent logEvent) {
181 try {
182
183 queue.put(logEvent);
184 } catch (final InterruptedException e) {
185 final boolean appendSuccessful = handleInterruptedException(logEvent);
186 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
187 }
188 }
189
190
191
192
193
194
195
196
197
198
199
200
201 private boolean handleInterruptedException(final LogEvent memento) {
202 final boolean appendSuccessful = queue.offer(memento);
203 if (!appendSuccessful) {
204 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
205 getName());
206 }
207
208 Thread.currentThread().interrupt();
209 return appendSuccessful;
210 }
211
212 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
213 if (!appendSuccessful && errorAppender != null) {
214 errorAppender.callAppender(logEvent);
215 }
216 }
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 @PluginFactory
236 public static AsyncAppender createAppender(
237
238 @PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
239 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
240 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
241 @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout,
242 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
243 @PluginAttribute("name") final String name,
244 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
245 @PluginElement("Filter") final Filter filter,
246 @PluginConfiguration final Configuration config,
247 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
248
249 if (name == null) {
250 LOGGER.error("No name provided for AsyncAppender");
251 return null;
252 }
253 if (appenderRefs == null) {
254 LOGGER.error("No appender references provided to AsyncAppender {}", name);
255 }
256
257 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
258 shutdownTimeout, config, includeLocation);
259 }
260
261
262
263
264 private class AsyncThread extends Thread {
265
266 private volatile boolean shutdown = false;
267 private final List<AppenderControl> appenders;
268 private final BlockingQueue<LogEvent> queue;
269
270 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
271 this.appenders = appenders;
272 this.queue = queue;
273 setDaemon(true);
274 setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement());
275 }
276
277 @Override
278 public void run() {
279 while (!shutdown) {
280 LogEvent event;
281 try {
282 event = queue.take();
283 if (event == SHUTDOWN) {
284 shutdown = true;
285 continue;
286 }
287 } catch (final InterruptedException ex) {
288 break;
289 }
290 event.setEndOfBatch(queue.isEmpty());
291 final boolean success = callAppenders(event);
292 if (!success && errorAppender != null) {
293 try {
294 errorAppender.callAppender(event);
295 } catch (final Exception ex) {
296
297 }
298 }
299 }
300
301 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
302 queue.size());
303 int count = 0;
304 int ignored = 0;
305 while (!queue.isEmpty()) {
306 try {
307 final LogEvent event = queue.take();
308 if (event instanceof Log4jLogEvent) {
309 final Log4jLogEvent logEvent = (Log4jLogEvent) event;
310 logEvent.setEndOfBatch(queue.isEmpty());
311 callAppenders(logEvent);
312 count++;
313 } else {
314 ignored++;
315 LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
316 }
317 } catch (final InterruptedException ex) {
318
319
320 }
321 }
322 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
323 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
324 }
325
326
327
328
329
330
331
332
333
334 boolean callAppenders(final LogEvent event) {
335 boolean success = false;
336 for (final AppenderControl control : appenders) {
337 try {
338 control.callAppender(event);
339 success = true;
340 } catch (final Exception ex) {
341
342 }
343 }
344 return success;
345 }
346
347 public void shutdown() {
348 shutdown = true;
349 if (queue.isEmpty()) {
350 queue.offer(SHUTDOWN);
351 }
352 if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
353 this.interrupt();
354 }
355 }
356 }
357
358
359
360
361
362
363 public String[] getAppenderRefStrings() {
364 final String[] result = new String[appenderRefs.length];
365 for (int i = 0; i < result.length; i++) {
366 result[i] = appenderRefs[i].getRef();
367 }
368 return result;
369 }
370
371
372
373
374
375
376
377 public boolean isIncludeLocation() {
378 return includeLocation;
379 }
380
381
382
383
384
385
386
387 public boolean isBlocking() {
388 return blocking;
389 }
390
391
392
393
394
395
396 public String getErrorRef() {
397 return errorRef;
398 }
399
400 public int getQueueCapacity() {
401 return queueSize;
402 }
403
404 public int getQueueRemainingCapacity() {
405 return queue.remainingCapacity();
406 }
407 }