1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TransferQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.AbstractLogEvent;
28 import org.apache.logging.log4j.core.Appender;
29 import org.apache.logging.log4j.core.Core;
30 import org.apache.logging.log4j.core.Filter;
31 import org.apache.logging.log4j.core.LogEvent;
32 import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
33 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
34 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
35 import org.apache.logging.log4j.core.async.BlockingQueueFactory;
36 import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
37 import org.apache.logging.log4j.core.async.EventRoute;
38 import org.apache.logging.log4j.core.config.AppenderControl;
39 import org.apache.logging.log4j.core.config.AppenderRef;
40 import org.apache.logging.log4j.core.config.Configuration;
41 import org.apache.logging.log4j.core.config.ConfigurationException;
42 import org.apache.logging.log4j.core.config.plugins.Plugin;
43 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
44 import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
45 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
46 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
47 import org.apache.logging.log4j.core.config.plugins.PluginElement;
48 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
49 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
50 import org.apache.logging.log4j.core.util.Constants;
51 import org.apache.logging.log4j.core.util.Log4jThread;
52 import org.apache.logging.log4j.message.AsynchronouslyFormattable;
53 import org.apache.logging.log4j.message.Message;
54
55
56
57
58
59
60 @Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
61 public final class AsyncAppender extends AbstractAppender {
62
63 private static final int DEFAULT_QUEUE_SIZE = 128;
64 private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
65 };
66
67 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
68
69 private final BlockingQueue<LogEvent> queue;
70 private final int queueSize;
71 private final boolean blocking;
72 private final long shutdownTimeout;
73 private final Configuration config;
74 private final AppenderRef[] appenderRefs;
75 private final String errorRef;
76 private final boolean includeLocation;
77 private AppenderControl errorAppender;
78 private AsyncThread thread;
79 private AsyncQueueFullPolicy asyncQueueFullPolicy;
80
81 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
82 final String errorRef, final int queueSize, final boolean blocking,
83 final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
84 final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
85 super(name, filter, null, ignoreExceptions);
86 this.queue = blockingQueueFactory.create(queueSize);
87 this.queueSize = queueSize;
88 this.blocking = blocking;
89 this.shutdownTimeout = shutdownTimeout;
90 this.config = config;
91 this.appenderRefs = appenderRefs;
92 this.errorRef = errorRef;
93 this.includeLocation = includeLocation;
94 }
95
96 @Override
97 public void start() {
98 final Map<String, Appender> map = config.getAppenders();
99 final List<AppenderControl> appenders = new ArrayList<>();
100 for (final AppenderRef appenderRef : appenderRefs) {
101 final Appender appender = map.get(appenderRef.getRef());
102 if (appender != null) {
103 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
104 } else {
105 LOGGER.error("No appender named {} was configured", appenderRef);
106 }
107 }
108 if (errorRef != null) {
109 final Appender appender = map.get(errorRef);
110 if (appender != null) {
111 errorAppender = new AppenderControl(appender, null, null);
112 } else {
113 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
114 }
115 }
116 if (appenders.size() > 0) {
117 thread = new AsyncThread(appenders, queue);
118 thread.setName("AsyncAppender-" + getName());
119 } else if (errorRef == null) {
120 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
121 }
122 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
123
124 thread.start();
125 super.start();
126 }
127
128 @Override
129 public boolean stop(final long timeout, final TimeUnit timeUnit) {
130 setStopping();
131 super.stop(timeout, timeUnit, false);
132 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
133 thread.shutdown();
134 try {
135 thread.join(shutdownTimeout);
136 } catch (final InterruptedException ex) {
137 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
138 }
139 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
140
141 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
142 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
143 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
144 }
145 setStopped();
146 return true;
147 }
148
149
150
151
152
153
154 @Override
155 public void append(final LogEvent logEvent) {
156 if (!isStarted()) {
157 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
158 }
159 if (!canFormatMessageInBackground(logEvent.getMessage())) {
160 logEvent.getMessage().getFormattedMessage();
161 }
162 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
163 if (!transfer(memento)) {
164 if (blocking) {
165
166 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
167 route.logMessage(this, memento);
168 } else {
169 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
170 logToErrorAppenderIfNecessary(false, memento);
171 }
172 }
173 }
174
175 private boolean canFormatMessageInBackground(final Message message) {
176 return Constants.FORMAT_MESSAGES_IN_BACKGROUND
177 || message.getClass().isAnnotationPresent(AsynchronouslyFormattable.class);
178 }
179
180 private boolean transfer(final LogEvent memento) {
181 return queue instanceof TransferQueue
182 ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
183 : queue.offer(memento);
184 }
185
186
187
188
189
190
191 public void logMessageInCurrentThread(final LogEvent logEvent) {
192 logEvent.setEndOfBatch(queue.isEmpty());
193 final boolean appendSuccessful = thread.callAppenders(logEvent);
194 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
195 }
196
197
198
199
200
201
202 public void logMessageInBackgroundThread(final LogEvent logEvent) {
203 try {
204
205 queue.put(logEvent);
206 } catch (final InterruptedException e) {
207 final boolean appendSuccessful = handleInterruptedException(logEvent);
208 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
209 }
210 }
211
212
213
214
215
216
217
218
219
220
221
222
223 private boolean handleInterruptedException(final LogEvent memento) {
224 final boolean appendSuccessful = queue.offer(memento);
225 if (!appendSuccessful) {
226 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
227 getName());
228 }
229
230 Thread.currentThread().interrupt();
231 return appendSuccessful;
232 }
233
234 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
235 if (!appendSuccessful && errorAppender != null) {
236 errorAppender.callAppender(logEvent);
237 }
238 }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 @Deprecated
261 public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
262 final boolean blocking, final long shutdownTimeout, final int size,
263 final String name, final boolean includeLocation, final Filter filter,
264 final Configuration config, final boolean ignoreExceptions) {
265 if (name == null) {
266 LOGGER.error("No name provided for AsyncAppender");
267 return null;
268 }
269 if (appenderRefs == null) {
270 LOGGER.error("No appender references provided to AsyncAppender {}", name);
271 }
272
273 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
274 shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
275 }
276
277 @PluginBuilderFactory
278 public static Builder newBuilder() {
279 return new Builder();
280 }
281
282 public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
283
284 @PluginElement("AppenderRef")
285 @Required(message = "No appender references provided to AsyncAppender")
286 private AppenderRef[] appenderRefs;
287
288 @PluginBuilderAttribute
289 @PluginAliases("error-ref")
290 private String errorRef;
291
292 @PluginBuilderAttribute
293 private boolean blocking = true;
294
295 @PluginBuilderAttribute
296 private long shutdownTimeout = 0L;
297
298 @PluginBuilderAttribute
299 private int bufferSize = DEFAULT_QUEUE_SIZE;
300
301 @PluginBuilderAttribute
302 @Required(message = "No name provided for AsyncAppender")
303 private String name;
304
305 @PluginBuilderAttribute
306 private boolean includeLocation = false;
307
308 @PluginElement("Filter")
309 private Filter filter;
310
311 @PluginConfiguration
312 private Configuration configuration;
313
314 @PluginBuilderAttribute
315 private boolean ignoreExceptions = true;
316
317 @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
318 private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
319
320 public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
321 this.appenderRefs = appenderRefs;
322 return this;
323 }
324
325 public Builder setErrorRef(final String errorRef) {
326 this.errorRef = errorRef;
327 return this;
328 }
329
330 public Builder setBlocking(final boolean blocking) {
331 this.blocking = blocking;
332 return this;
333 }
334
335 public Builder setShutdownTimeout(final long shutdownTimeout) {
336 this.shutdownTimeout = shutdownTimeout;
337 return this;
338 }
339
340 public Builder setBufferSize(final int bufferSize) {
341 this.bufferSize = bufferSize;
342 return this;
343 }
344
345 public Builder setName(final String name) {
346 this.name = name;
347 return this;
348 }
349
350 public Builder setIncludeLocation(final boolean includeLocation) {
351 this.includeLocation = includeLocation;
352 return this;
353 }
354
355 public Builder setFilter(final Filter filter) {
356 this.filter = filter;
357 return this;
358 }
359
360 public Builder setConfiguration(final Configuration configuration) {
361 this.configuration = configuration;
362 return this;
363 }
364
365 public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
366 this.ignoreExceptions = ignoreExceptions;
367 return this;
368 }
369
370 public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
371 this.blockingQueueFactory = blockingQueueFactory;
372 return this;
373 }
374
375 @Override
376 public AsyncAppender build() {
377 return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
378 shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
379 }
380 }
381
382
383
384
385 private class AsyncThread extends Log4jThread {
386
387 private volatile boolean shutdown = false;
388 private final List<AppenderControl> appenders;
389 private final BlockingQueue<LogEvent> queue;
390
391 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
392 super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
393 this.appenders = appenders;
394 this.queue = queue;
395 setDaemon(true);
396 }
397
398 @Override
399 public void run() {
400 while (!shutdown) {
401 LogEvent event;
402 try {
403 event = queue.take();
404 if (event == SHUTDOWN_LOG_EVENT) {
405 shutdown = true;
406 continue;
407 }
408 } catch (final InterruptedException ex) {
409 break;
410 }
411 event.setEndOfBatch(queue.isEmpty());
412 final boolean success = callAppenders(event);
413 if (!success && errorAppender != null) {
414 try {
415 errorAppender.callAppender(event);
416 } catch (final Exception ex) {
417
418 }
419 }
420 }
421
422 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
423 queue.size());
424 int count = 0;
425 int ignored = 0;
426 while (!queue.isEmpty()) {
427 try {
428 final LogEvent event = queue.take();
429 if (event instanceof Log4jLogEvent) {
430 final Log4jLogEvent logEvent = (Log4jLogEvent) event;
431 logEvent.setEndOfBatch(queue.isEmpty());
432 callAppenders(logEvent);
433 count++;
434 } else {
435 ignored++;
436 LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
437 }
438 } catch (final InterruptedException ex) {
439
440
441 }
442 }
443 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
444 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
445 }
446
447
448
449
450
451
452
453
454
455 boolean callAppenders(final LogEvent event) {
456 boolean success = false;
457 for (final AppenderControl control : appenders) {
458 try {
459 control.callAppender(event);
460 success = true;
461 } catch (final Exception ex) {
462
463 }
464 }
465 return success;
466 }
467
468 public void shutdown() {
469 shutdown = true;
470 if (queue.isEmpty()) {
471 queue.offer(SHUTDOWN_LOG_EVENT);
472 }
473 if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
474 this.interrupt();
475 }
476 }
477 }
478
479
480
481
482
483
484 public String[] getAppenderRefStrings() {
485 final String[] result = new String[appenderRefs.length];
486 for (int i = 0; i < result.length; i++) {
487 result[i] = appenderRefs[i].getRef();
488 }
489 return result;
490 }
491
492
493
494
495
496
497
498 public boolean isIncludeLocation() {
499 return includeLocation;
500 }
501
502
503
504
505
506
507
508 public boolean isBlocking() {
509 return blocking;
510 }
511
512
513
514
515
516
517 public String getErrorRef() {
518 return errorRef;
519 }
520
521 public int getQueueCapacity() {
522 return queueSize;
523 }
524
525 public int getQueueRemainingCapacity() {
526 return queue.remainingCapacity();
527 }
528 }