1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TransferQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.AbstractLogEvent;
28 import org.apache.logging.log4j.core.Appender;
29 import org.apache.logging.log4j.core.Filter;
30 import org.apache.logging.log4j.core.LogEvent;
31 import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
32 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
33 import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
34 import org.apache.logging.log4j.core.async.BlockingQueueFactory;
35 import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
36 import org.apache.logging.log4j.core.async.EventRoute;
37 import org.apache.logging.log4j.core.config.AppenderControl;
38 import org.apache.logging.log4j.core.config.AppenderRef;
39 import org.apache.logging.log4j.core.config.Configuration;
40 import org.apache.logging.log4j.core.config.ConfigurationException;
41 import org.apache.logging.log4j.core.config.plugins.Plugin;
42 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
43 import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
44 import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
45 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
46 import org.apache.logging.log4j.core.config.plugins.PluginElement;
47 import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
48 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
49 import org.apache.logging.log4j.core.util.Constants;
50 import org.apache.logging.log4j.core.util.Log4jThread;
51
52
53
54
55
56
57 @Plugin(name = "Async", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
58 public final class AsyncAppender extends AbstractAppender {
59
60 private static final int DEFAULT_QUEUE_SIZE = 128;
61 private static final LogEvent SHUTDOWN = new AbstractLogEvent() {
62 };
63
64 private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
65
66 private final BlockingQueue<LogEvent> queue;
67 private final int queueSize;
68 private final boolean blocking;
69 private final long shutdownTimeout;
70 private final Configuration config;
71 private final AppenderRef[] appenderRefs;
72 private final String errorRef;
73 private final boolean includeLocation;
74 private AppenderControl errorAppender;
75 private AsyncThread thread;
76 private AsyncQueueFullPolicy asyncQueueFullPolicy;
77
78 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
79 final String errorRef, final int queueSize, final boolean blocking,
80 final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
81 final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
82 super(name, filter, null, ignoreExceptions);
83 this.queue = blockingQueueFactory.create(queueSize);
84 this.queueSize = queueSize;
85 this.blocking = blocking;
86 this.shutdownTimeout = shutdownTimeout;
87 this.config = config;
88 this.appenderRefs = appenderRefs;
89 this.errorRef = errorRef;
90 this.includeLocation = includeLocation;
91 }
92
93 @Override
94 public void start() {
95 final Map<String, Appender> map = config.getAppenders();
96 final List<AppenderControl> appenders = new ArrayList<>();
97 for (final AppenderRef appenderRef : appenderRefs) {
98 final Appender appender = map.get(appenderRef.getRef());
99 if (appender != null) {
100 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
101 } else {
102 LOGGER.error("No appender named {} was configured", appenderRef);
103 }
104 }
105 if (errorRef != null) {
106 final Appender appender = map.get(errorRef);
107 if (appender != null) {
108 errorAppender = new AppenderControl(appender, null, null);
109 } else {
110 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
111 }
112 }
113 if (appenders.size() > 0) {
114 thread = new AsyncThread(appenders, queue);
115 thread.setName("AsyncAppender-" + getName());
116 } else if (errorRef == null) {
117 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
118 }
119 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
120
121 thread.start();
122 super.start();
123 }
124
125 @Override
126 public boolean stop(final long timeout, final TimeUnit timeUnit) {
127 setStopping();
128 super.stop(timeout, timeUnit, false);
129 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
130 thread.shutdown();
131 try {
132 thread.join(shutdownTimeout);
133 } catch (final InterruptedException ex) {
134 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
135 }
136 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
137
138 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
139 LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
140 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
141 }
142 setStopped();
143 return true;
144 }
145
146
147
148
149
150
151 @Override
152 public void append(final LogEvent logEvent) {
153 if (!isStarted()) {
154 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
155 }
156 if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) {
157 logEvent.getMessage().getFormattedMessage();
158 }
159 final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
160 if (!transfer(memento)) {
161 if (blocking) {
162
163 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
164 route.logMessage(this, memento);
165 } else {
166 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
167 logToErrorAppenderIfNecessary(false, memento);
168 }
169 }
170 }
171
172 private boolean transfer(final LogEvent memento) {
173 return queue instanceof TransferQueue
174 ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
175 : queue.offer(memento);
176 }
177
178
179
180
181
182
183 public void logMessageInCurrentThread(final LogEvent logEvent) {
184 logEvent.setEndOfBatch(queue.isEmpty());
185 final boolean appendSuccessful = thread.callAppenders(logEvent);
186 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
187 }
188
189
190
191
192
193
194 public void logMessageInBackgroundThread(final LogEvent logEvent) {
195 try {
196
197 queue.put(logEvent);
198 } catch (final InterruptedException e) {
199 final boolean appendSuccessful = handleInterruptedException(logEvent);
200 logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
201 }
202 }
203
204
205
206
207
208
209
210
211
212
213
214
215 private boolean handleInterruptedException(final LogEvent memento) {
216 final boolean appendSuccessful = queue.offer(memento);
217 if (!appendSuccessful) {
218 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
219 getName());
220 }
221
222 Thread.currentThread().interrupt();
223 return appendSuccessful;
224 }
225
226 private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
227 if (!appendSuccessful && errorAppender != null) {
228 errorAppender.callAppender(logEvent);
229 }
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 @Deprecated
253 public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
254 final boolean blocking, final long shutdownTimeout, final int size,
255 final String name, final boolean includeLocation, final Filter filter,
256 final Configuration config, final boolean ignoreExceptions) {
257 if (name == null) {
258 LOGGER.error("No name provided for AsyncAppender");
259 return null;
260 }
261 if (appenderRefs == null) {
262 LOGGER.error("No appender references provided to AsyncAppender {}", name);
263 }
264
265 return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
266 shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
267 }
268
269 @PluginBuilderFactory
270 public static Builder newBuilder() {
271 return new Builder();
272 }
273
274 public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
275
276 @PluginElement("AppenderRef")
277 @Required(message = "No appender references provided to AsyncAppender")
278 private AppenderRef[] appenderRefs;
279
280 @PluginBuilderAttribute
281 @PluginAliases("error-ref")
282 private String errorRef;
283
284 @PluginBuilderAttribute
285 private boolean blocking = true;
286
287 @PluginBuilderAttribute
288 private long shutdownTimeout = 0L;
289
290 @PluginBuilderAttribute
291 private int bufferSize = DEFAULT_QUEUE_SIZE;
292
293 @PluginBuilderAttribute
294 @Required(message = "No name provided for AsyncAppender")
295 private String name;
296
297 @PluginBuilderAttribute
298 private boolean includeLocation = false;
299
300 @PluginElement("Filter")
301 private Filter filter;
302
303 @PluginConfiguration
304 private Configuration configuration;
305
306 @PluginBuilderAttribute
307 private boolean ignoreExceptions = true;
308
309 @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
310 private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
311
312 public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
313 this.appenderRefs = appenderRefs;
314 return this;
315 }
316
317 public Builder setErrorRef(final String errorRef) {
318 this.errorRef = errorRef;
319 return this;
320 }
321
322 public Builder setBlocking(final boolean blocking) {
323 this.blocking = blocking;
324 return this;
325 }
326
327 public Builder setShutdownTimeout(final long shutdownTimeout) {
328 this.shutdownTimeout = shutdownTimeout;
329 return this;
330 }
331
332 public Builder setBufferSize(final int bufferSize) {
333 this.bufferSize = bufferSize;
334 return this;
335 }
336
337 public Builder setName(final String name) {
338 this.name = name;
339 return this;
340 }
341
342 public Builder setIncludeLocation(final boolean includeLocation) {
343 this.includeLocation = includeLocation;
344 return this;
345 }
346
347 public Builder setFilter(final Filter filter) {
348 this.filter = filter;
349 return this;
350 }
351
352 public Builder setConfiguration(final Configuration configuration) {
353 this.configuration = configuration;
354 return this;
355 }
356
357 public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
358 this.ignoreExceptions = ignoreExceptions;
359 return this;
360 }
361
362 public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
363 this.blockingQueueFactory = blockingQueueFactory;
364 return this;
365 }
366
367 @Override
368 public AsyncAppender build() {
369 return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
370 shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
371 }
372 }
373
374
375
376
377 private class AsyncThread extends Log4jThread {
378
379 private volatile boolean shutdown = false;
380 private final List<AppenderControl> appenders;
381 private final BlockingQueue<LogEvent> queue;
382
383 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
384 super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
385 this.appenders = appenders;
386 this.queue = queue;
387 setDaemon(true);
388 }
389
390 @Override
391 public void run() {
392 while (!shutdown) {
393 LogEvent event;
394 try {
395 event = queue.take();
396 if (event == SHUTDOWN) {
397 shutdown = true;
398 continue;
399 }
400 } catch (final InterruptedException ex) {
401 break;
402 }
403 event.setEndOfBatch(queue.isEmpty());
404 final boolean success = callAppenders(event);
405 if (!success && errorAppender != null) {
406 try {
407 errorAppender.callAppender(event);
408 } catch (final Exception ex) {
409
410 }
411 }
412 }
413
414 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
415 queue.size());
416 int count = 0;
417 int ignored = 0;
418 while (!queue.isEmpty()) {
419 try {
420 final LogEvent event = queue.take();
421 if (event instanceof Log4jLogEvent) {
422 final Log4jLogEvent logEvent = (Log4jLogEvent) event;
423 logEvent.setEndOfBatch(queue.isEmpty());
424 callAppenders(logEvent);
425 count++;
426 } else {
427 ignored++;
428 LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
429 }
430 } catch (final InterruptedException ex) {
431
432
433 }
434 }
435 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
436 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
437 }
438
439
440
441
442
443
444
445
446
447 boolean callAppenders(final LogEvent event) {
448 boolean success = false;
449 for (final AppenderControl control : appenders) {
450 try {
451 control.callAppender(event);
452 success = true;
453 } catch (final Exception ex) {
454
455 }
456 }
457 return success;
458 }
459
460 public void shutdown() {
461 shutdown = true;
462 if (queue.isEmpty()) {
463 queue.offer(SHUTDOWN);
464 }
465 if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
466 this.interrupt();
467 }
468 }
469 }
470
471
472
473
474
475
476 public String[] getAppenderRefStrings() {
477 final String[] result = new String[appenderRefs.length];
478 for (int i = 0; i < result.length; i++) {
479 result[i] = appenderRefs[i].getRef();
480 }
481 return result;
482 }
483
484
485
486
487
488
489
490 public boolean isIncludeLocation() {
491 return includeLocation;
492 }
493
494
495
496
497
498
499
500 public boolean isBlocking() {
501 return blocking;
502 }
503
504
505
506
507
508
509 public String getErrorRef() {
510 return errorRef;
511 }
512
513 public int getQueueCapacity() {
514 return queueSize;
515 }
516
517 public int getQueueRemainingCapacity() {
518 return queue.remainingCapacity();
519 }
520 }