View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TransferQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.AbstractLogEvent;
28  import org.apache.logging.log4j.core.Appender;
29  import org.apache.logging.log4j.core.Core;
30  import org.apache.logging.log4j.core.Filter;
31  import org.apache.logging.log4j.core.LogEvent;
32  import org.apache.logging.log4j.core.Logger;
33  import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
34  import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
35  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
36  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
37  import org.apache.logging.log4j.core.async.BlockingQueueFactory;
38  import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
39  import org.apache.logging.log4j.core.async.EventRoute;
40  import org.apache.logging.log4j.core.async.InternalAsyncUtil;
41  import org.apache.logging.log4j.core.config.AppenderControl;
42  import org.apache.logging.log4j.core.config.AppenderRef;
43  import org.apache.logging.log4j.core.config.Configuration;
44  import org.apache.logging.log4j.core.config.ConfigurationException;
45  import org.apache.logging.log4j.core.config.plugins.Plugin;
46  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
47  import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
48  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
49  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
50  import org.apache.logging.log4j.core.config.plugins.PluginElement;
51  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
52  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
53  import org.apache.logging.log4j.core.util.Log4jThread;
54  import org.apache.logging.log4j.message.Message;
55  
56  /**
57   * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
58   * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
59   * references.
60   */
61  @Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true)
62  public final class AsyncAppender extends AbstractAppender {
63  
64      private static final int DEFAULT_QUEUE_SIZE = 1024;
65      private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() {
66      };
67  
68      private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
69  
70      private final BlockingQueue<LogEvent> queue;
71      private final int queueSize;
72      private final boolean blocking;
73      private final long shutdownTimeout;
74      private final Configuration config;
75      private final AppenderRef[] appenderRefs;
76      private final String errorRef;
77      private final boolean includeLocation;
78      private AppenderControl errorAppender;
79      private AsyncThread thread;
80      private AsyncQueueFullPolicy asyncQueueFullPolicy;
81  
82      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
83                            final String errorRef, final int queueSize, final boolean blocking,
84                            final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
85                            final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
86          super(name, filter, null, ignoreExceptions);
87          this.queue = blockingQueueFactory.create(queueSize);
88          this.queueSize = queueSize;
89          this.blocking = blocking;
90          this.shutdownTimeout = shutdownTimeout;
91          this.config = config;
92          this.appenderRefs = appenderRefs;
93          this.errorRef = errorRef;
94          this.includeLocation = includeLocation;
95      }
96  
97      @Override
98      public void start() {
99          final Map<String, Appender> map = config.getAppenders();
100         final List<AppenderControl> appenders = new ArrayList<>();
101         for (final AppenderRef appenderRef : appenderRefs) {
102             final Appender appender = map.get(appenderRef.getRef());
103             if (appender != null) {
104                 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
105             } else {
106                 LOGGER.error("No appender named {} was configured", appenderRef);
107             }
108         }
109         if (errorRef != null) {
110             final Appender appender = map.get(errorRef);
111             if (appender != null) {
112                 errorAppender = new AppenderControl(appender, null, null);
113             } else {
114                 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
115             }
116         }
117         if (appenders.size() > 0) {
118             thread = new AsyncThread(appenders, queue);
119             thread.setName("AsyncAppender-" + getName());
120         } else if (errorRef == null) {
121             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
122         }
123         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
124 
125         thread.start();
126         super.start();
127     }
128 
129     @Override
130     public boolean stop(final long timeout, final TimeUnit timeUnit) {
131         setStopping();
132         super.stop(timeout, timeUnit, false);
133         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
134         thread.shutdown();
135         try {
136             thread.join(shutdownTimeout);
137         } catch (final InterruptedException ex) {
138             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
139         }
140         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
141 
142         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
143             LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
144                 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
145         }
146         setStopped();
147         return true;
148     }
149 
150     /**
151      * Actual writing occurs here.
152      *
153      * @param logEvent The LogEvent.
154      */
155     @Override
156     public void append(final LogEvent logEvent) {
157         if (!isStarted()) {
158             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
159         }
160         final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
161         InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
162         if (!transfer(memento)) {
163             if (blocking) {
164                 if (Logger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031
165                     // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock
166                     final Message message = AsyncQueueFullMessageUtil.transform(logEvent.getMessage());
167                     logMessageInCurrentThread(new Log4jLogEvent.Builder(logEvent).setMessage(message).build());
168                 } else {
169                     // delegate to the event router (which may discard, enqueue and block, or log in current thread)
170                     final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
171                     route.logMessage(this, memento);
172                 }
173             } else {
174                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
175                 logToErrorAppenderIfNecessary(false, memento);
176             }
177         }
178     }
179 
180     private boolean transfer(final LogEvent memento) {
181         return queue instanceof TransferQueue
182             ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
183             : queue.offer(memento);
184     }
185 
186     /**
187      * FOR INTERNAL USE ONLY.
188      *
189      * @param logEvent the event to log
190      */
191     public void logMessageInCurrentThread(final LogEvent logEvent) {
192         logEvent.setEndOfBatch(queue.isEmpty());
193         final boolean appendSuccessful = thread.callAppenders(logEvent);
194         logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
195     }
196 
197     /**
198      * FOR INTERNAL USE ONLY.
199      *
200      * @param logEvent the event to log
201      */
202     public void logMessageInBackgroundThread(final LogEvent logEvent) {
203         try {
204             // wait for free slots in the queue
205             queue.put(logEvent);
206         } catch (final InterruptedException e) {
207             final boolean appendSuccessful = handleInterruptedException(logEvent);
208             logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
209         }
210     }
211 
212     // LOG4J2-1049: Some applications use Thread.interrupt() to send
213     // messages between application threads. This does not necessarily
214     // mean that the queue is full. To prevent dropping a log message,
215     // quickly try to offer the event to the queue again.
216     // (Yes, this means there is a possibility the same event is logged twice.)
217     //
218     // Finally, catching the InterruptedException means the
219     // interrupted flag has been cleared on the current thread.
220     // This may interfere with the application's expectation of
221     // being interrupted, so when we are done, we set the interrupted
222     // flag again.
223     private boolean handleInterruptedException(final LogEvent memento) {
224         final boolean appendSuccessful = queue.offer(memento);
225         if (!appendSuccessful) {
226             LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
227                 getName());
228         }
229         // set the interrupted flag again.
230         Thread.currentThread().interrupt();
231         return appendSuccessful;
232     }
233 
234     private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
235         if (!appendSuccessful && errorAppender != null) {
236             errorAppender.callAppender(logEvent);
237         }
238     }
239 
240     /**
241      * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
242      * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
243      * pre-2.7.
244      *
245      * @param appenderRefs     The Appenders to reference.
246      * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
247      * @param blocking         True if the Appender should wait when the queue is full. The default is true.
248      * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
249      *                         in the queue on shutdown. The default is zero which means to wait forever.
250      * @param size             The size of the event queue. The default is 128.
251      * @param name             The name of the Appender.
252      * @param includeLocation  whether to include location information. The default is false.
253      * @param filter           The Filter or null.
254      * @param config           The Configuration.
255      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
256      *                         otherwise they are propagated to the caller.
257      * @return The AsyncAppender.
258      * @deprecated use {@link Builder} instead
259      */
260     @Deprecated
261     public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
262                                                final boolean blocking, final long shutdownTimeout, final int size,
263                                                final String name, final boolean includeLocation, final Filter filter,
264                                                final Configuration config, final boolean ignoreExceptions) {
265         if (name == null) {
266             LOGGER.error("No name provided for AsyncAppender");
267             return null;
268         }
269         if (appenderRefs == null) {
270             LOGGER.error("No appender references provided to AsyncAppender {}", name);
271         }
272 
273         return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
274             shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
275     }
276 
277     @PluginBuilderFactory
278     public static Builder newBuilder() {
279         return new Builder();
280     }
281 
282     public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
283 
284         @PluginElement("AppenderRef")
285         @Required(message = "No appender references provided to AsyncAppender")
286         private AppenderRef[] appenderRefs;
287 
288         @PluginBuilderAttribute
289         @PluginAliases("error-ref")
290         private String errorRef;
291 
292         @PluginBuilderAttribute
293         private boolean blocking = true;
294 
295         @PluginBuilderAttribute
296         private long shutdownTimeout = 0L;
297 
298         @PluginBuilderAttribute
299         private int bufferSize = DEFAULT_QUEUE_SIZE;
300 
301         @PluginBuilderAttribute
302         @Required(message = "No name provided for AsyncAppender")
303         private String name;
304 
305         @PluginBuilderAttribute
306         private boolean includeLocation = false;
307 
308         @PluginElement("Filter")
309         private Filter filter;
310 
311         @PluginConfiguration
312         private Configuration configuration;
313 
314         @PluginBuilderAttribute
315         private boolean ignoreExceptions = true;
316 
317         @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
318         private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
319 
320         public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
321             this.appenderRefs = appenderRefs;
322             return this;
323         }
324 
325         public Builder setErrorRef(final String errorRef) {
326             this.errorRef = errorRef;
327             return this;
328         }
329 
330         public Builder setBlocking(final boolean blocking) {
331             this.blocking = blocking;
332             return this;
333         }
334 
335         public Builder setShutdownTimeout(final long shutdownTimeout) {
336             this.shutdownTimeout = shutdownTimeout;
337             return this;
338         }
339 
340         public Builder setBufferSize(final int bufferSize) {
341             this.bufferSize = bufferSize;
342             return this;
343         }
344 
345         public Builder setName(final String name) {
346             this.name = name;
347             return this;
348         }
349 
350         public Builder setIncludeLocation(final boolean includeLocation) {
351             this.includeLocation = includeLocation;
352             return this;
353         }
354 
355         public Builder setFilter(final Filter filter) {
356             this.filter = filter;
357             return this;
358         }
359 
360         public Builder setConfiguration(final Configuration configuration) {
361             this.configuration = configuration;
362             return this;
363         }
364 
365         public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
366             this.ignoreExceptions = ignoreExceptions;
367             return this;
368         }
369 
370         public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
371             this.blockingQueueFactory = blockingQueueFactory;
372             return this;
373         }
374 
375         @Override
376         public AsyncAppender build() {
377             return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
378                 shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
379         }
380     }
381 
382     /**
383      * Thread that calls the Appenders.
384      */
385     private class AsyncThread extends Log4jThread {
386 
387         private volatile boolean shutdown = false;
388         private final List<AppenderControl> appenders;
389         private final BlockingQueue<LogEvent> queue;
390 
391         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
392             super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
393             this.appenders = appenders;
394             this.queue = queue;
395             setDaemon(true);
396         }
397 
398         @Override
399         public void run() {
400             while (!shutdown) {
401                 LogEvent event;
402                 try {
403                     event = queue.take();
404                     if (event == SHUTDOWN_LOG_EVENT) {
405                         shutdown = true;
406                         continue;
407                     }
408                 } catch (final InterruptedException ex) {
409                     break; // LOG4J2-830
410                 }
411                 event.setEndOfBatch(queue.isEmpty());
412                 final boolean success = callAppenders(event);
413                 if (!success && errorAppender != null) {
414                     try {
415                         errorAppender.callAppender(event);
416                     } catch (final Exception ex) {
417                         // Silently accept the error.
418                     }
419                 }
420             }
421             // Process any remaining items in the queue.
422             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
423                 queue.size());
424             int count = 0;
425             int ignored = 0;
426             while (!queue.isEmpty()) {
427                 try {
428                     final LogEvent event = queue.take();
429                     if (event instanceof Log4jLogEvent) {
430                         final Log4jLogEvent logEvent = (Log4jLogEvent) event;
431                         logEvent.setEndOfBatch(queue.isEmpty());
432                         callAppenders(logEvent);
433                         count++;
434                     } else {
435                         ignored++;
436                         LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
437                     }
438                 } catch (final InterruptedException ex) {
439                     // May have been interrupted to shut down.
440                     // Here we ignore interrupts and try to process all remaining events.
441                 }
442             }
443             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
444                 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
445         }
446 
447         /**
448          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
449          * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
450          * exceptions are silently ignored.
451          *
452          * @param event the event to forward to the registered appenders
453          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
454          */
455         boolean callAppenders(final LogEvent event) {
456             boolean success = false;
457             for (final AppenderControl control : appenders) {
458                 try {
459                     control.callAppender(event);
460                     success = true;
461                 } catch (final Exception ex) {
462                     // If no appender is successful the error appender will get it.
463                 }
464             }
465             return success;
466         }
467 
468         public void shutdown() {
469             shutdown = true;
470             if (queue.isEmpty()) {
471                 queue.offer(SHUTDOWN_LOG_EVENT);
472             }
473             if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
474                 this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
475             }
476         }
477     }
478 
479     /**
480      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
481      *
482      * @return the names of the sink appenders
483      */
484     public String[] getAppenderRefStrings() {
485         final String[] result = new String[appenderRefs.length];
486         for (int i = 0; i < result.length; i++) {
487             result[i] = appenderRefs[i].getRef();
488         }
489         return result;
490     }
491 
492     /**
493      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
494      * the class and method where the logging call was made.
495      *
496      * @return {@code true} if location is included with every event, {@code false} otherwise
497      */
498     public boolean isIncludeLocation() {
499         return includeLocation;
500     }
501 
502     /**
503      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
504      * dropped when the queue is full.
505      *
506      * @return whether this AsyncAppender will block or drop events when the queue is full.
507      */
508     public boolean isBlocking() {
509         return blocking;
510     }
511 
512     /**
513      * Returns the name of the appender that any errors are logged to or {@code null}.
514      *
515      * @return the name of the appender that any errors are logged to or {@code null}
516      */
517     public String getErrorRef() {
518         return errorRef;
519     }
520 
521     public int getQueueCapacity() {
522         return queueSize;
523     }
524 
525     public int getQueueRemainingCapacity() {
526         return queue.remainingCapacity();
527     }
528 }