View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.concurrent.ArrayBlockingQueue;
23  import java.util.concurrent.BlockingQueue;
24  import java.util.concurrent.atomic.AtomicLong;
25  
26  import org.apache.logging.log4j.core.AbstractLogEvent;
27  import org.apache.logging.log4j.core.Appender;
28  import org.apache.logging.log4j.core.Filter;
29  import org.apache.logging.log4j.core.LogEvent;
30  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
31  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
32  import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
33  import org.apache.logging.log4j.core.async.EventRoute;
34  import org.apache.logging.log4j.core.config.AppenderControl;
35  import org.apache.logging.log4j.core.config.AppenderRef;
36  import org.apache.logging.log4j.core.config.Configuration;
37  import org.apache.logging.log4j.core.config.ConfigurationException;
38  import org.apache.logging.log4j.core.config.plugins.Plugin;
39  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
40  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
41  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
42  import org.apache.logging.log4j.core.config.plugins.PluginElement;
43  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
44  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
45  import org.apache.logging.log4j.core.util.Constants;
46  
47  /**
48   * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
49   * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
50   * references.
51   */
52  @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
53  public final class AsyncAppender extends AbstractAppender {
54  
55      private static final int DEFAULT_QUEUE_SIZE = 128;
56      private static final LogEvent SHUTDOWN = new AbstractLogEvent() {
57      };
58  
59      private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
60  
61      private final BlockingQueue<LogEvent> queue;
62      private final int queueSize;
63      private final boolean blocking;
64      private final long shutdownTimeout;
65      private final Configuration config;
66      private final AppenderRef[] appenderRefs;
67      private final String errorRef;
68      private final boolean includeLocation;
69      private AppenderControl errorAppender;
70      private AsyncThread thread;
71      private AsyncQueueFullPolicy asyncQueueFullPolicy;
72  
73      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
74                            final String errorRef, final int queueSize, final boolean blocking,
75                            final boolean ignoreExceptions,
76                            final long shutdownTimeout, final Configuration config, final boolean includeLocation) {
77          super(name, filter, null, ignoreExceptions);
78          this.queue = new ArrayBlockingQueue<>(queueSize);
79          this.queueSize = queueSize;
80          this.blocking = blocking;
81          this.shutdownTimeout = shutdownTimeout;
82          this.config = config;
83          this.appenderRefs = appenderRefs;
84          this.errorRef = errorRef;
85          this.includeLocation = includeLocation;
86      }
87  
88      @Override
89      public void start() {
90          final Map<String, Appender> map = config.getAppenders();
91          final List<AppenderControl> appenders = new ArrayList<>();
92          for (final AppenderRef appenderRef : appenderRefs) {
93              final Appender appender = map.get(appenderRef.getRef());
94              if (appender != null) {
95                  appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
96              } else {
97                  LOGGER.error("No appender named {} was configured", appenderRef);
98              }
99          }
100         if (errorRef != null) {
101             final Appender appender = map.get(errorRef);
102             if (appender != null) {
103                 errorAppender = new AppenderControl(appender, null, null);
104             } else {
105                 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
106             }
107         }
108         if (appenders.size() > 0) {
109             thread = new AsyncThread(appenders, queue);
110             thread.setName("AsyncAppender-" + getName());
111         } else if (errorRef == null) {
112             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
113         }
114         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
115 
116         thread.start();
117         super.start();
118     }
119 
120     @Override
121     public void stop() {
122         super.stop();
123         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
124         thread.shutdown();
125         try {
126             thread.join(shutdownTimeout);
127         } catch (final InterruptedException ex) {
128             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
129         }
130         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
131 
132         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
133             LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
134                 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
135         }
136     }
137 
138     /**
139      * Actual writing occurs here.
140      *
141      * @param logEvent The LogEvent.
142      */
143     @Override
144     public void append(final LogEvent logEvent) {
145         if (!isStarted()) {
146             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
147         }
148         if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
149             logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
150         }
151         final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
152         if (!queue.offer(memento)) {
153             if (blocking) {
154                 // delegate to the event router (which may discard, enqueue and block, or log in current thread)
155                 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
156                 route.logMessage(this, memento);
157             } else {
158                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
159                 logToErrorAppenderIfNecessary(false, memento);
160             }
161         }
162     }
163 
164     /**
165      * FOR INTERNAL USE ONLY.
166      *
167      * @param logEvent the event to log
168      */
169     public void logMessageInCurrentThread(final LogEvent logEvent) {
170         logEvent.setEndOfBatch(queue.isEmpty());
171         final boolean appendSuccessful = thread.callAppenders(logEvent);
172         logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
173     }
174 
175     /**
176      * FOR INTERNAL USE ONLY.
177      *
178      * @param logEvent the event to log
179      */
180     public void logMessageInBackgroundThread(final LogEvent logEvent) {
181         try {
182             // wait for free slots in the queue
183             queue.put(logEvent);
184         } catch (final InterruptedException e) {
185             final boolean appendSuccessful = handleInterruptedException(logEvent);
186             logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
187         }
188     }
189 
190     // LOG4J2-1049: Some applications use Thread.interrupt() to send
191     // messages between application threads. This does not necessarily
192     // mean that the queue is full. To prevent dropping a log message,
193     // quickly try to offer the event to the queue again.
194     // (Yes, this means there is a possibility the same event is logged twice.)
195     //
196     // Finally, catching the InterruptedException means the
197     // interrupted flag has been cleared on the current thread.
198     // This may interfere with the application's expectation of
199     // being interrupted, so when we are done, we set the interrupted
200     // flag again.
201     private boolean handleInterruptedException(final LogEvent memento) {
202         final boolean appendSuccessful = queue.offer(memento);
203         if (!appendSuccessful) {
204             LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
205                 getName());
206         }
207         // set the interrupted flag again.
208         Thread.currentThread().interrupt();
209         return appendSuccessful;
210     }
211 
212     private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
213         if (!appendSuccessful && errorAppender != null) {
214             errorAppender.callAppender(logEvent);
215         }
216     }
217 
218     /**
219      * Create an AsyncAppender.
220      *
221      * @param appenderRefs     The Appenders to reference.
222      * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
223      * @param blocking         True if the Appender should wait when the queue is full. The default is true.
224      * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
225      *                         in the queue on shutdown. The default is zero which means to wait forever.
226      * @param size             The size of the event queue. The default is 128.
227      * @param name             The name of the Appender.
228      * @param includeLocation  whether to include location information. The default is false.
229      * @param filter           The Filter or null.
230      * @param config           The Configuration.
231      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
232      *                         otherwise they are propagated to the caller.
233      * @return The AsyncAppender.
234      */
235     @PluginFactory
236     public static AsyncAppender createAppender(
237         // @formatter:off
238         @PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
239         @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
240         @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
241         @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout,
242         @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
243         @PluginAttribute("name") final String name,
244         @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
245         @PluginElement("Filter") final Filter filter,
246         @PluginConfiguration final Configuration config,
247         @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
248         // @formatter:on
249         if (name == null) {
250             LOGGER.error("No name provided for AsyncAppender");
251             return null;
252         }
253         if (appenderRefs == null) {
254             LOGGER.error("No appender references provided to AsyncAppender {}", name);
255         }
256 
257         return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
258             shutdownTimeout, config, includeLocation);
259     }
260 
261     /**
262      * Thread that calls the Appenders.
263      */
264     private class AsyncThread extends Thread {
265 
266         private volatile boolean shutdown = false;
267         private final List<AppenderControl> appenders;
268         private final BlockingQueue<LogEvent> queue;
269 
270         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
271             this.appenders = appenders;
272             this.queue = queue;
273             setDaemon(true);
274             setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement());
275         }
276 
277         @Override
278         public void run() {
279             while (!shutdown) {
280                 LogEvent event;
281                 try {
282                     event = queue.take();
283                     if (event == SHUTDOWN) {
284                         shutdown = true;
285                         continue;
286                     }
287                 } catch (final InterruptedException ex) {
288                     break; // LOG4J2-830
289                 }
290                 event.setEndOfBatch(queue.isEmpty());
291                 final boolean success = callAppenders(event);
292                 if (!success && errorAppender != null) {
293                     try {
294                         errorAppender.callAppender(event);
295                     } catch (final Exception ex) {
296                         // Silently accept the error.
297                     }
298                 }
299             }
300             // Process any remaining items in the queue.
301             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
302                 queue.size());
303             int count = 0;
304             int ignored = 0;
305             while (!queue.isEmpty()) {
306                 try {
307                     final LogEvent event = queue.take();
308                     if (event instanceof Log4jLogEvent) {
309                         final Log4jLogEvent logEvent = (Log4jLogEvent) event;
310                         logEvent.setEndOfBatch(queue.isEmpty());
311                         callAppenders(logEvent);
312                         count++;
313                     } else {
314                         ignored++;
315                         LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
316                     }
317                 } catch (final InterruptedException ex) {
318                     // May have been interrupted to shut down.
319                     // Here we ignore interrupts and try to process all remaining events.
320                 }
321             }
322             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
323                 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
324         }
325 
326         /**
327          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
328          * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
329          * exceptions are silently ignored.
330          *
331          * @param event the event to forward to the registered appenders
332          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
333          */
334         boolean callAppenders(final LogEvent event) {
335             boolean success = false;
336             for (final AppenderControl control : appenders) {
337                 try {
338                     control.callAppender(event);
339                     success = true;
340                 } catch (final Exception ex) {
341                     // If no appender is successful the error appender will get it.
342                 }
343             }
344             return success;
345         }
346 
347         public void shutdown() {
348             shutdown = true;
349             if (queue.isEmpty()) {
350                 queue.offer(SHUTDOWN);
351             }
352             if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
353                 this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
354             }
355         }
356     }
357 
358     /**
359      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
360      *
361      * @return the names of the sink appenders
362      */
363     public String[] getAppenderRefStrings() {
364         final String[] result = new String[appenderRefs.length];
365         for (int i = 0; i < result.length; i++) {
366             result[i] = appenderRefs[i].getRef();
367         }
368         return result;
369     }
370 
371     /**
372      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
373      * the class and method where the logging call was made.
374      *
375      * @return {@code true} if location is included with every event, {@code false} otherwise
376      */
377     public boolean isIncludeLocation() {
378         return includeLocation;
379     }
380 
381     /**
382      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
383      * dropped when the queue is full.
384      *
385      * @return whether this AsyncAppender will block or drop events when the queue is full.
386      */
387     public boolean isBlocking() {
388         return blocking;
389     }
390 
391     /**
392      * Returns the name of the appender that any errors are logged to or {@code null}.
393      *
394      * @return the name of the appender that any errors are logged to or {@code null}
395      */
396     public String getErrorRef() {
397         return errorRef;
398     }
399 
400     public int getQueueCapacity() {
401         return queueSize;
402     }
403 
404     public int getQueueRemainingCapacity() {
405         return queue.remainingCapacity();
406     }
407 }