View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TransferQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.AbstractLogEvent;
28  import org.apache.logging.log4j.core.Appender;
29  import org.apache.logging.log4j.core.Core;
30  import org.apache.logging.log4j.core.Filter;
31  import org.apache.logging.log4j.core.LogEvent;
32  import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
33  import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
34  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
35  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
36  import org.apache.logging.log4j.core.async.BlockingQueueFactory;
37  import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
38  import org.apache.logging.log4j.core.async.EventRoute;
39  import org.apache.logging.log4j.core.async.InternalAsyncUtil;
40  import org.apache.logging.log4j.core.config.AppenderControl;
41  import org.apache.logging.log4j.core.config.AppenderRef;
42  import org.apache.logging.log4j.core.config.Configuration;
43  import org.apache.logging.log4j.core.config.ConfigurationException;
44  import org.apache.logging.log4j.core.config.plugins.Plugin;
45  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
46  import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
47  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
48  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
49  import org.apache.logging.log4j.core.config.plugins.PluginElement;
50  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
51  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
52  import org.apache.logging.log4j.core.util.Log4jThread;
53  import org.apache.logging.log4j.spi.AbstractLogger;
54  
55  /**
56   * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
57   * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
58   * references.
59   */
60  @Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
61  public final class AsyncAppender extends AbstractAppender {
62  
63      private static final int DEFAULT_QUEUE_SIZE = 1024;
64      private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
65      };
66  
67      private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
68  
69      private final BlockingQueue<LogEvent> queue;
70      private final int queueSize;
71      private final boolean blocking;
72      private final long shutdownTimeout;
73      private final Configuration config;
74      private final AppenderRef[] appenderRefs;
75      private final String errorRef;
76      private final boolean includeLocation;
77      private AppenderControl errorAppender;
78      private AsyncThread thread;
79      private AsyncQueueFullPolicy asyncQueueFullPolicy;
80  
81      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
82                            final String errorRef, final int queueSize, final boolean blocking,
83                            final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
84                            final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
85          super(name, filter, null, ignoreExceptions);
86          this.queue = blockingQueueFactory.create(queueSize);
87          this.queueSize = queueSize;
88          this.blocking = blocking;
89          this.shutdownTimeout = shutdownTimeout;
90          this.config = config;
91          this.appenderRefs = appenderRefs;
92          this.errorRef = errorRef;
93          this.includeLocation = includeLocation;
94      }
95  
96      @Override
97      public void start() {
98          final Map<String, Appender> map = config.getAppenders();
99          final List<AppenderControl> appenders = new ArrayList<>();
100         for (final AppenderRef appenderRef : appenderRefs) {
101             final Appender appender = map.get(appenderRef.getRef());
102             if (appender != null) {
103                 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
104             } else {
105                 LOGGER.error("No appender named {} was configured", appenderRef);
106             }
107         }
108         if (errorRef != null) {
109             final Appender appender = map.get(errorRef);
110             if (appender != null) {
111                 errorAppender = new AppenderControl(appender, null, null);
112             } else {
113                 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
114             }
115         }
116         if (appenders.size() > 0) {
117             thread = new AsyncThread(appenders, queue);
118             thread.setName("AsyncAppender-" + getName());
119         } else if (errorRef == null) {
120             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
121         }
122         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
123 
124         thread.start();
125         super.start();
126     }
127 
128     @Override
129     public boolean stop(final long timeout, final TimeUnit timeUnit) {
130         setStopping();
131         super.stop(timeout, timeUnit, false);
132         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
133         thread.shutdown();
134         try {
135             thread.join(shutdownTimeout);
136         } catch (final InterruptedException ex) {
137             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
138         }
139         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
140 
141         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
142             LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
143                 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
144         }
145         setStopped();
146         return true;
147     }
148 
149     /**
150      * Actual writing occurs here.
151      *
152      * @param logEvent The LogEvent.
153      */
154     @Override
155     public void append(final LogEvent logEvent) {
156         if (!isStarted()) {
157             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
158         }
159         final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
160         InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
161         if (!transfer(memento)) {
162             if (blocking) {
163                 if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
164                     // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
165                     AsyncQueueFullMessageUtil.logWarningToStatusLogger();
166                     logMessageInCurrentThread(logEvent);
167                 } else {
168                     // delegate to the event router (which may discard, enqueue and block, or log in current thread)
169                     final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
170                     route.logMessage(this, memento);
171                 }
172             } else {
173                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
174                 logToErrorAppenderIfNecessary(false, memento);
175             }
176         }
177     }
178 
179     private boolean transfer(final LogEvent memento) {
180         return queue instanceof TransferQueue
181             ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
182             : queue.offer(memento);
183     }
184 
185     /**
186      * FOR INTERNAL USE ONLY.
187      *
188      * @param logEvent the event to log
189      */
190     public void logMessageInCurrentThread(final LogEvent logEvent) {
191         logEvent.setEndOfBatch(queue.isEmpty());
192         final boolean appendSuccessful = thread.callAppenders(logEvent);
193         logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
194     }
195 
196     /**
197      * FOR INTERNAL USE ONLY.
198      *
199      * @param logEvent the event to log
200      */
201     public void logMessageInBackgroundThread(final LogEvent logEvent) {
202         try {
203             // wait for free slots in the queue
204             queue.put(logEvent);
205         } catch (final InterruptedException e) {
206             final boolean appendSuccessful = handleInterruptedException(logEvent);
207             logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
208         }
209     }
210 
211     // LOG4J2-1049: Some applications use Thread.interrupt() to send
212     // messages between application threads. This does not necessarily
213     // mean that the queue is full. To prevent dropping a log message,
214     // quickly try to offer the event to the queue again.
215     // (Yes, this means there is a possibility the same event is logged twice.)
216     //
217     // Finally, catching the InterruptedException means the
218     // interrupted flag has been cleared on the current thread.
219     // This may interfere with the application's expectation of
220     // being interrupted, so when we are done, we set the interrupted
221     // flag again.
222     private boolean handleInterruptedException(final LogEvent memento) {
223         final boolean appendSuccessful = queue.offer(memento);
224         if (!appendSuccessful) {
225             LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
226                 getName());
227         }
228         // set the interrupted flag again.
229         Thread.currentThread().interrupt();
230         return appendSuccessful;
231     }
232 
233     private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
234         if (!appendSuccessful && errorAppender != null) {
235             errorAppender.callAppender(logEvent);
236         }
237     }
238 
239     /**
240      * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
241      * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
242      * pre-2.7.
243      *
244      * @param appenderRefs     The Appenders to reference.
245      * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
246      * @param blocking         True if the Appender should wait when the queue is full. The default is true.
247      * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
248      *                         in the queue on shutdown. The default is zero which means to wait forever.
249      * @param size             The size of the event queue. The default is 128.
250      * @param name             The name of the Appender.
251      * @param includeLocation  whether to include location information. The default is false.
252      * @param filter           The Filter or null.
253      * @param config           The Configuration.
254      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
255      *                         otherwise they are propagated to the caller.
256      * @return The AsyncAppender.
257      * @deprecated use {@link Builder} instead
258      */
259     @Deprecated
260     public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
261                                                final boolean blocking, final long shutdownTimeout, final int size,
262                                                final String name, final boolean includeLocation, final Filter filter,
263                                                final Configuration config, final boolean ignoreExceptions) {
264         if (name == null) {
265             LOGGER.error("No name provided for AsyncAppender");
266             return null;
267         }
268         if (appenderRefs == null) {
269             LOGGER.error("No appender references provided to AsyncAppender {}", name);
270         }
271 
272         return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
273             shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
274     }
275 
276     @PluginBuilderFactory
277     public static Builder newBuilder() {
278         return new Builder();
279     }
280 
281     public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
282 
283         @PluginElement("AppenderRef")
284         @Required(message = "No appender references provided to AsyncAppender")
285         private AppenderRef[] appenderRefs;
286 
287         @PluginBuilderAttribute
288         @PluginAliases("error-ref")
289         private String errorRef;
290 
291         @PluginBuilderAttribute
292         private boolean blocking = true;
293 
294         @PluginBuilderAttribute
295         private long shutdownTimeout = 0L;
296 
297         @PluginBuilderAttribute
298         private int bufferSize = DEFAULT_QUEUE_SIZE;
299 
300         @PluginBuilderAttribute
301         @Required(message = "No name provided for AsyncAppender")
302         private String name;
303 
304         @PluginBuilderAttribute
305         private boolean includeLocation = false;
306 
307         @PluginElement("Filter")
308         private Filter filter;
309 
310         @PluginConfiguration
311         private Configuration configuration;
312 
313         @PluginBuilderAttribute
314         private boolean ignoreExceptions = true;
315 
316         @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
317         private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
318 
319         public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
320             this.appenderRefs = appenderRefs;
321             return this;
322         }
323 
324         public Builder setErrorRef(final String errorRef) {
325             this.errorRef = errorRef;
326             return this;
327         }
328 
329         public Builder setBlocking(final boolean blocking) {
330             this.blocking = blocking;
331             return this;
332         }
333 
334         public Builder setShutdownTimeout(final long shutdownTimeout) {
335             this.shutdownTimeout = shutdownTimeout;
336             return this;
337         }
338 
339         public Builder setBufferSize(final int bufferSize) {
340             this.bufferSize = bufferSize;
341             return this;
342         }
343 
344         public Builder setName(final String name) {
345             this.name = name;
346             return this;
347         }
348 
349         public Builder setIncludeLocation(final boolean includeLocation) {
350             this.includeLocation = includeLocation;
351             return this;
352         }
353 
354         public Builder setFilter(final Filter filter) {
355             this.filter = filter;
356             return this;
357         }
358 
359         public Builder setConfiguration(final Configuration configuration) {
360             this.configuration = configuration;
361             return this;
362         }
363 
364         public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
365             this.ignoreExceptions = ignoreExceptions;
366             return this;
367         }
368 
369         public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
370             this.blockingQueueFactory = blockingQueueFactory;
371             return this;
372         }
373 
374         @Override
375         public AsyncAppender build() {
376             return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
377                 shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
378         }
379     }
380 
381     /**
382      * Thread that calls the Appenders.
383      */
384     private class AsyncThread extends Log4jThread {
385 
386         private volatile boolean shutdown = false;
387         private final List<AppenderControl> appenders;
388         private final BlockingQueue<LogEvent> queue;
389 
390         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
391             super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
392             this.appenders = appenders;
393             this.queue = queue;
394             setDaemon(true);
395         }
396 
397         @Override
398         public void run() {
399             while (!shutdown) {
400                 LogEvent event;
401                 try {
402                     event = queue.take();
403                     if (event == SHUTDOWN_LOG_EVENT) {
404                         shutdown = true;
405                         continue;
406                     }
407                 } catch (final InterruptedException ex) {
408                     break; // LOG4J2-830
409                 }
410                 event.setEndOfBatch(queue.isEmpty());
411                 final boolean success = callAppenders(event);
412                 if (!success && errorAppender != null) {
413                     try {
414                         errorAppender.callAppender(event);
415                     } catch (final Exception ex) {
416                         // Silently accept the error.
417                     }
418                 }
419             }
420             // Process any remaining items in the queue.
421             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
422                 queue.size());
423             int count = 0;
424             int ignored = 0;
425             while (!queue.isEmpty()) {
426                 try {
427                     final LogEvent event = queue.take();
428                     if (event instanceof Log4jLogEvent) {
429                         final Log4jLogEvent logEvent = (Log4jLogEvent) event;
430                         logEvent.setEndOfBatch(queue.isEmpty());
431                         callAppenders(logEvent);
432                         count++;
433                     } else {
434                         ignored++;
435                         LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
436                     }
437                 } catch (final InterruptedException ex) {
438                     // May have been interrupted to shut down.
439                     // Here we ignore interrupts and try to process all remaining events.
440                 }
441             }
442             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
443                 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
444         }
445 
446         /**
447          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
448          * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
449          * exceptions are silently ignored.
450          *
451          * @param event the event to forward to the registered appenders
452          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
453          */
454         boolean callAppenders(final LogEvent event) {
455             boolean success = false;
456             for (final AppenderControl control : appenders) {
457                 try {
458                     control.callAppender(event);
459                     success = true;
460                 } catch (final Exception ex) {
461                     // If no appender is successful the error appender will get it.
462                 }
463             }
464             return success;
465         }
466 
467         public void shutdown() {
468             shutdown = true;
469             if (queue.isEmpty()) {
470                 queue.offer(SHUTDOWN_LOG_EVENT);
471             }
472             if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
473                 this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
474             }
475         }
476     }
477 
478     /**
479      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
480      *
481      * @return the names of the sink appenders
482      */
483     public String[] getAppenderRefStrings() {
484         final String[] result = new String[appenderRefs.length];
485         for (int i = 0; i < result.length; i++) {
486             result[i] = appenderRefs[i].getRef();
487         }
488         return result;
489     }
490 
491     /**
492      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
493      * the class and method where the logging call was made.
494      *
495      * @return {@code true} if location is included with every event, {@code false} otherwise
496      */
497     public boolean isIncludeLocation() {
498         return includeLocation;
499     }
500 
501     /**
502      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
503      * dropped when the queue is full.
504      *
505      * @return whether this AsyncAppender will block or drop events when the queue is full.
506      */
507     public boolean isBlocking() {
508         return blocking;
509     }
510 
511     /**
512      * Returns the name of the appender that any errors are logged to or {@code null}.
513      *
514      * @return the name of the appender that any errors are logged to or {@code null}
515      */
516     public String getErrorRef() {
517         return errorRef;
518     }
519 
520     public int getQueueCapacity() {
521         return queueSize;
522     }
523 
524     public int getQueueRemainingCapacity() {
525         return queue.remainingCapacity();
526     }
527 
528     /**
529      * Returns the number of elements in the queue.
530      * 
531      * @return the number of elements in the queue. 
532      * @since 2.11.1
533      */
534     public int getQueueSize() {
535         return queue.size();
536     }
537 }