1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TransferQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.AbstractLogEvent;
28 import org.apache.logging.log4j.core.Appender;
29 import org.apache.logging.log4j.core.Core;
30 import org.apache.logging.log4j.core.Filter;
31 import org.apache.logging.log4j.core.LogEvent;
32 import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
33 import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
34 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
35 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
36 import org.apache.logging.log4j.core.async.BlockingQueueFactory;
37 import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
38 import org.apache.logging.log4j.core.async.EventRoute;
39 import org.apache.logging.log4j.core.async.InternalAsyncUtil;
40 import org.apache.logging.log4j.core.config.AppenderControl;
41 import org.apache.logging.log4j.core.config.AppenderRef;
42 import org.apache.logging.log4j.core.config.Configuration;
43 import org.apache.logging.log4j.core.config.ConfigurationException;
44 import org.apache.logging.log4j.core.config.plugins.Plugin;
45 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
46 import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
47 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
48 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
49 import org.apache.logging.log4j.core.config.plugins.PluginElement;
50 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
51 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
52 import org.apache.logging.log4j.core.util.Log4jThread;
53 import org.apache.logging.log4j.spi.AbstractLogger;
54
55
56
57
58
59
60 @Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
61 public final class AsyncAppender extends AbstractAppender {
62
63 private static final int DEFAULT_QUEUE_SIZE = 1024;
64 private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
65 };
66
67 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
68
69 private final BlockingQueue<LogEvent> queue;
70 private final int queueSize;
71 private final boolean blocking;
72 private final long shutdownTimeout;
73 private final Configuration config;
74 private final AppenderRef[] appenderRefs;
75 private final String errorRef;
76 private final boolean includeLocation;
77 private AppenderControl errorAppender;
78 private AsyncThread thread;
79 private AsyncQueueFullPolicy asyncQueueFullPolicy;
80
81 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
82 final String errorRef, final int queueSize, final boolean blocking,
83 final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
84 final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
85 super(name, filter, null, ignoreExceptions);
86 this.queue = blockingQueueFactory.create(queueSize);
87 this.queueSize = queueSize;
88 this.blocking = blocking;
89 this.shutdownTimeout = shutdownTimeout;
90 this.config = config;
91 this.appenderRefs = appenderRefs;
92 this.errorRef = errorRef;
93 this.includeLocation = includeLocation;
94 }
95
96 @Override
97 public void start() {
98 final Map<String, Appender> map = config.getAppenders();
99 final List<AppenderControl> appenders = new ArrayList<>();
100 for (final AppenderRef appenderRef : appenderRefs) {
101 final Appender appender = map.get(appenderRef.getRef());
102 if (appender != null) {
103 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
104 } else {
105 LOGGER.error("No appender named {} was configured", appenderRef);
106 }
107 }
108 if (errorRef != null) {
109 final Appender appender = map.get(errorRef);
110 if (appender != null) {
111 errorAppender = new AppenderControl(appender, null, null);
112 } else {
113 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
114 }
115 }
116 if (appenders.size() > 0) {
117 thread = new AsyncThread(appenders, queue);
118 thread.setName("AsyncAppender-" + getName());
119 } else if (errorRef == null) {
120 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
121 }
122 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
123
124 thread.start();
125 super.start();
126 }
127
128 @Override
129 public boolean stop(final long timeout, final TimeUnit timeUnit) {
130 setStopping();
131 super.stop(timeout, timeUnit, false);
132 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
133 thread.shutdown();
134 try {
135 thread.join(shutdownTimeout);
136 } catch (final InterruptedException ex) {
137 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
138 }
139 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
140
141 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
142 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
143 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
144 }
145 setStopped();
146 return true;
147 }
148
149
150
151
152
153
154 @Override
155 public void append(final LogEvent logEvent) {
156 if (!isStarted()) {
157 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
158 }
159 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
160 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
161 if (!transfer(memento)) {
162 if (blocking) {
163 if (AbstractLogger.getRecursionDepth() > 1) {
164
165 AsyncQueueFullMessageUtil.logWarningToStatusLogger();
166 logMessageInCurrentThread(logEvent);
167 } else {
168
169 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
170 route.logMessage(this, memento);
171 }
172 } else {
173 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
174 logToErrorAppenderIfNecessary(false, memento);
175 }
176 }
177 }
178
179 private boolean transfer(final LogEvent memento) {
180 return queue instanceof TransferQueue
181 ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
182 : queue.offer(memento);
183 }
184
185
186
187
188
189
190 public void logMessageInCurrentThread(final LogEvent logEvent) {
191 logEvent.setEndOfBatch(queue.isEmpty());
192 final boolean appendSuccessful = thread.callAppenders(logEvent);
193 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
194 }
195
196
197
198
199
200
201 public void logMessageInBackgroundThread(final LogEvent logEvent) {
202 try {
203
204 queue.put(logEvent);
205 } catch (final InterruptedException e) {
206 final boolean appendSuccessful = handleInterruptedException(logEvent);
207 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
208 }
209 }
210
211
212
213
214
215
216
217
218
219
220
221
222 private boolean handleInterruptedException(final LogEvent memento) {
223 final boolean appendSuccessful = queue.offer(memento);
224 if (!appendSuccessful) {
225 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
226 getName());
227 }
228
229 Thread.currentThread().interrupt();
230 return appendSuccessful;
231 }
232
233 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
234 if (!appendSuccessful && errorAppender != null) {
235 errorAppender.callAppender(logEvent);
236 }
237 }
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259 @Deprecated
260 public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
261 final boolean blocking, final long shutdownTimeout, final int size,
262 final String name, final boolean includeLocation, final Filter filter,
263 final Configuration config, final boolean ignoreExceptions) {
264 if (name == null) {
265 LOGGER.error("No name provided for AsyncAppender");
266 return null;
267 }
268 if (appenderRefs == null) {
269 LOGGER.error("No appender references provided to AsyncAppender {}", name);
270 }
271
272 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
273 shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
274 }
275
276 @PluginBuilderFactory
277 public static Builder newBuilder() {
278 return new Builder();
279 }
280
281 public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
282
283 @PluginElement("AppenderRef")
284 @Required(message = "No appender references provided to AsyncAppender")
285 private AppenderRef[] appenderRefs;
286
287 @PluginBuilderAttribute
288 @PluginAliases("error-ref")
289 private String errorRef;
290
291 @PluginBuilderAttribute
292 private boolean blocking = true;
293
294 @PluginBuilderAttribute
295 private long shutdownTimeout = 0L;
296
297 @PluginBuilderAttribute
298 private int bufferSize = DEFAULT_QUEUE_SIZE;
299
300 @PluginBuilderAttribute
301 @Required(message = "No name provided for AsyncAppender")
302 private String name;
303
304 @PluginBuilderAttribute
305 private boolean includeLocation = false;
306
307 @PluginElement("Filter")
308 private Filter filter;
309
310 @PluginConfiguration
311 private Configuration configuration;
312
313 @PluginBuilderAttribute
314 private boolean ignoreExceptions = true;
315
316 @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
317 private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
318
319 public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
320 this.appenderRefs = appenderRefs;
321 return this;
322 }
323
324 public Builder setErrorRef(final String errorRef) {
325 this.errorRef = errorRef;
326 return this;
327 }
328
329 public Builder setBlocking(final boolean blocking) {
330 this.blocking = blocking;
331 return this;
332 }
333
334 public Builder setShutdownTimeout(final long shutdownTimeout) {
335 this.shutdownTimeout = shutdownTimeout;
336 return this;
337 }
338
339 public Builder setBufferSize(final int bufferSize) {
340 this.bufferSize = bufferSize;
341 return this;
342 }
343
344 public Builder setName(final String name) {
345 this.name = name;
346 return this;
347 }
348
349 public Builder setIncludeLocation(final boolean includeLocation) {
350 this.includeLocation = includeLocation;
351 return this;
352 }
353
354 public Builder setFilter(final Filter filter) {
355 this.filter = filter;
356 return this;
357 }
358
359 public Builder setConfiguration(final Configuration configuration) {
360 this.configuration = configuration;
361 return this;
362 }
363
364 public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
365 this.ignoreExceptions = ignoreExceptions;
366 return this;
367 }
368
369 public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
370 this.blockingQueueFactory = blockingQueueFactory;
371 return this;
372 }
373
374 @Override
375 public AsyncAppender build() {
376 return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
377 shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
378 }
379 }
380
381
382
383
384 private class AsyncThread extends Log4jThread {
385
386 private volatile boolean shutdown = false;
387 private final List<AppenderControl> appenders;
388 private final BlockingQueue<LogEvent> queue;
389
390 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
391 super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
392 this.appenders = appenders;
393 this.queue = queue;
394 setDaemon(true);
395 }
396
397 @Override
398 public void run() {
399 while (!shutdown) {
400 LogEvent event;
401 try {
402 event = queue.take();
403 if (event == SHUTDOWN_LOG_EVENT) {
404 shutdown = true;
405 continue;
406 }
407 } catch (final InterruptedException ex) {
408 break;
409 }
410 event.setEndOfBatch(queue.isEmpty());
411 final boolean success = callAppenders(event);
412 if (!success && errorAppender != null) {
413 try {
414 errorAppender.callAppender(event);
415 } catch (final Exception ex) {
416
417 }
418 }
419 }
420
421 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
422 queue.size());
423 int count = 0;
424 int ignored = 0;
425 while (!queue.isEmpty()) {
426 try {
427 final LogEvent event = queue.take();
428 if (event instanceof Log4jLogEvent) {
429 final Log4jLogEvent logEvent = (Log4jLogEvent) event;
430 logEvent.setEndOfBatch(queue.isEmpty());
431 callAppenders(logEvent);
432 count++;
433 } else {
434 ignored++;
435 LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
436 }
437 } catch (final InterruptedException ex) {
438
439
440 }
441 }
442 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
443 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
444 }
445
446
447
448
449
450
451
452
453
454 boolean callAppenders(final LogEvent event) {
455 boolean success = false;
456 for (final AppenderControl control : appenders) {
457 try {
458 control.callAppender(event);
459 success = true;
460 } catch (final Exception ex) {
461
462 }
463 }
464 return success;
465 }
466
467 public void shutdown() {
468 shutdown = true;
469 if (queue.isEmpty()) {
470 queue.offer(SHUTDOWN_LOG_EVENT);
471 }
472 if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
473 this.interrupt();
474 }
475 }
476 }
477
478
479
480
481
482
483 public String[] getAppenderRefStrings() {
484 final String[] result = new String[appenderRefs.length];
485 for (int i = 0; i < result.length; i++) {
486 result[i] = appenderRefs[i].getRef();
487 }
488 return result;
489 }
490
491
492
493
494
495
496
497 public boolean isIncludeLocation() {
498 return includeLocation;
499 }
500
501
502
503
504
505
506
507 public boolean isBlocking() {
508 return blocking;
509 }
510
511
512
513
514
515
516 public String getErrorRef() {
517 return errorRef;
518 }
519
520 public int getQueueCapacity() {
521 return queueSize;
522 }
523
524 public int getQueueRemainingCapacity() {
525 return queue.remainingCapacity();
526 }
527
528
529
530
531
532
533
534 public int getQueueSize() {
535 return queue.size();
536 }
537 }