View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.Serializable;
20  import java.util.ArrayList;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.concurrent.ArrayBlockingQueue;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.Appender;
28  import org.apache.logging.log4j.core.Filter;
29  import org.apache.logging.log4j.core.LogEvent;
30  import org.apache.logging.log4j.core.async.RingBufferLogEvent;
31  import org.apache.logging.log4j.core.config.AppenderControl;
32  import org.apache.logging.log4j.core.config.AppenderRef;
33  import org.apache.logging.log4j.core.config.Configuration;
34  import org.apache.logging.log4j.core.config.ConfigurationException;
35  import org.apache.logging.log4j.core.config.plugins.Plugin;
36  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
37  import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
38  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
39  import org.apache.logging.log4j.core.config.plugins.PluginElement;
40  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
41  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
42  import org.apache.logging.log4j.core.util.Constants;
43  
44  /**
45   * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
46   * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
47   * references.
48   */
49  @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50  public final class AsyncAppender extends AbstractAppender {
51  
52      private static final long serialVersionUID = 1L;
53      private static final int DEFAULT_QUEUE_SIZE = 128;
54      private static final String SHUTDOWN = "Shutdown";
55  
56      private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
57      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
58  
59      private final BlockingQueue<Serializable> queue;
60      private final int queueSize;
61      private final boolean blocking;
62      private final long shutdownTimeout;
63      private final Configuration config;
64      private final AppenderRef[] appenderRefs;
65      private final String errorRef;
66      private final boolean includeLocation;
67      private AppenderControl errorAppender;
68      private AsyncThread thread;
69  
70      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
71              final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions,
72              final long shutdownTimeout, final Configuration config, final boolean includeLocation) {
73          super(name, filter, null, ignoreExceptions);
74          this.queue = new ArrayBlockingQueue<>(queueSize);
75          this.queueSize = queueSize;
76          this.blocking = blocking;
77          this.shutdownTimeout = shutdownTimeout;
78          this.config = config;
79          this.appenderRefs = appenderRefs;
80          this.errorRef = errorRef;
81          this.includeLocation = includeLocation;
82      }
83  
84      @Override
85      public void start() {
86          final Map<String, Appender> map = config.getAppenders();
87          final List<AppenderControl> appenders = new ArrayList<>();
88          for (final AppenderRef appenderRef : appenderRefs) {
89              final Appender appender = map.get(appenderRef.getRef());
90              if (appender != null) {
91                  appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
92              } else {
93                  LOGGER.error("No appender named {} was configured", appenderRef);
94              }
95          }
96          if (errorRef != null) {
97              final Appender appender = map.get(errorRef);
98              if (appender != null) {
99                  errorAppender = new AppenderControl(appender, null, null);
100             } else {
101                 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
102             }
103         }
104         if (appenders.size() > 0) {
105             thread = new AsyncThread(appenders, queue);
106             thread.setName("AsyncAppender-" + getName());
107         } else if (errorRef == null) {
108             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
109         }
110 
111         thread.start();
112         super.start();
113     }
114 
115     @Override
116     public void stop() {
117         super.stop();
118         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
119         thread.shutdown();
120         try {
121             thread.join(shutdownTimeout);
122         } catch (final InterruptedException ex) {
123             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
124         }
125         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
126     }
127 
128     /**
129      * Actual writing occurs here.
130      * 
131      * @param logEvent The LogEvent.
132      */
133     @Override
134     public void append(LogEvent logEvent) {
135         if (!isStarted()) {
136             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
137         }
138         if (!(logEvent instanceof Log4jLogEvent)) {
139             if (!(logEvent instanceof RingBufferLogEvent)) {
140                 return; // only know how to Serialize Log4jLogEvents and RingBufferLogEvents
141             }
142             logEvent = ((RingBufferLogEvent) logEvent).createMemento();
143         }
144         if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
145             logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
146         }
147         final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
148         boolean appendSuccessful = false;
149         if (blocking) {
150             if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
151                 // LOG4J2-485: avoid deadlock that would result from trying
152                 // to add to a full queue from appender thread
153                 coreEvent.setEndOfBatch(false); // queue is definitely not empty!
154                 appendSuccessful = thread.callAppenders(coreEvent);
155             } else {
156                 final Serializable serialized = Log4jLogEvent.serialize(coreEvent, includeLocation);
157                 try {
158                     // wait for free slots in the queue
159                     queue.put(serialized);
160                     appendSuccessful = true;
161                 } catch (final InterruptedException e) {
162                     // LOG4J2-1049: Some applications use Thread.interrupt() to send
163                     // messages between application threads. This does not necessarily
164                     // mean that the queue is full. To prevent dropping a log message,
165                     // quickly try to offer the event to the queue again.
166                     // (Yes, this means there is a possibility the same event is logged twice.)
167                     //
168                     // Finally, catching the InterruptedException means the
169                     // interrupted flag has been cleared on the current thread.
170                     // This may interfere with the application's expectation of
171                     // being interrupted, so when we are done, we set the interrupted
172                     // flag again.
173                     appendSuccessful = queue.offer(serialized);
174                     if (!appendSuccessful) {
175                         LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
176                                 getName());
177                     }
178                     // set the interrupted flag again.
179                     Thread.currentThread().interrupt();
180                 }
181             }
182         } else {
183             appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
184             if (!appendSuccessful) {
185                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
186             }
187         }
188         if (!appendSuccessful && errorAppender != null) {
189             errorAppender.callAppender(coreEvent);
190         }
191     }
192 
193     /**
194      * Create an AsyncAppender.
195      * 
196      * @param appenderRefs The Appenders to reference.
197      * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
198      * @param blocking True if the Appender should wait when the queue is full. The default is true.
199      * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events
200      *                        in the queue on shutdown. The default is zero which means to wait forever.
201      * @param size The size of the event queue. The default is 128.
202      * @param name The name of the Appender.
203      * @param includeLocation whether to include location information. The default is false.
204      * @param filter The Filter or null.
205      * @param config The Configuration.
206      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
207      *            otherwise they are propagated to the caller.
208      * @return The AsyncAppender.
209      */
210     @PluginFactory
211     public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
212             @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
213             @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
214             @PluginAttribute(value = "shutdownTimeout", defaultLong = 0L) final long shutdownTimeout,
215             @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
216             @PluginAttribute("name") final String name,
217             @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
218             @PluginElement("Filter") final Filter filter, @PluginConfiguration final Configuration config,
219             @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
220         if (name == null) {
221             LOGGER.error("No name provided for AsyncAppender");
222             return null;
223         }
224         if (appenderRefs == null) {
225             LOGGER.error("No appender references provided to AsyncAppender {}", name);
226         }
227 
228         return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
229                 shutdownTimeout, config, includeLocation);
230     }
231 
232     /**
233      * Thread that calls the Appenders.
234      */
235     private class AsyncThread extends Thread {
236 
237         private volatile boolean shutdown = false;
238         private final List<AppenderControl> appenders;
239         private final BlockingQueue<Serializable> queue;
240 
241         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
242             this.appenders = appenders;
243             this.queue = queue;
244             setDaemon(true);
245             setName("AsyncAppenderThread" + THREAD_SEQUENCE.getAndIncrement());
246         }
247 
248         @Override
249         public void run() {
250             isAppenderThread.set(Boolean.TRUE); // LOG4J2-485
251             while (!shutdown) {
252                 Serializable s;
253                 try {
254                     s = queue.take();
255                     if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
256                         shutdown = true;
257                         continue;
258                     }
259                 } catch (final InterruptedException ex) {
260                     break; // LOG4J2-830
261                 }
262                 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
263                 event.setEndOfBatch(queue.isEmpty());
264                 final boolean success = callAppenders(event);
265                 if (!success && errorAppender != null) {
266                     try {
267                         errorAppender.callAppender(event);
268                     } catch (final Exception ex) {
269                         // Silently accept the error.
270                     }
271                 }
272             }
273             // Process any remaining items in the queue.
274             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
275                     queue.size());
276             int count = 0;
277             int ignored = 0;
278             while (!queue.isEmpty()) {
279                 try {
280                     final Serializable s = queue.take();
281                     if (Log4jLogEvent.canDeserialize(s)) {
282                         final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
283                         event.setEndOfBatch(queue.isEmpty());
284                         callAppenders(event);
285                         count++;
286                     } else {
287                         ignored++;
288                         LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
289                     }
290                 } catch (final InterruptedException ex) {
291                     // May have been interrupted to shut down.
292                     // Here we ignore interrupts and try to process all remaining events.
293                 }
294             }
295             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
296                     + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
297         }
298 
299         /**
300          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
301          * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
302          * exceptions are silently ignored.
303          *
304          * @param event the event to forward to the registered appenders
305          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
306          */
307         boolean callAppenders(final Log4jLogEvent event) {
308             boolean success = false;
309             for (final AppenderControl control : appenders) {
310                 try {
311                     control.callAppender(event);
312                     success = true;
313                 } catch (final Exception ex) {
314                     // If no appender is successful the error appender will get it.
315                 }
316             }
317             return success;
318         }
319 
320         public void shutdown() {
321             shutdown = true;
322             if (queue.isEmpty()) {
323                 queue.offer(SHUTDOWN);
324             }
325         }
326     }
327 
328     /**
329      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
330      * 
331      * @return the names of the sink appenders
332      */
333     public String[] getAppenderRefStrings() {
334         final String[] result = new String[appenderRefs.length];
335         for (int i = 0; i < result.length; i++) {
336             result[i] = appenderRefs[i].getRef();
337         }
338         return result;
339     }
340 
341     /**
342      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
343      * the class and method where the logging call was made.
344      * 
345      * @return {@code true} if location is included with every event, {@code false} otherwise
346      */
347     public boolean isIncludeLocation() {
348         return includeLocation;
349     }
350 
351     /**
352      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
353      * dropped when the queue is full.
354      * 
355      * @return whether this AsyncAppender will block or drop events when the queue is full.
356      */
357     public boolean isBlocking() {
358         return blocking;
359     }
360 
361     /**
362      * Returns the name of the appender that any errors are logged to or {@code null}.
363      * 
364      * @return the name of the appender that any errors are logged to or {@code null}
365      */
366     public String getErrorRef() {
367         return errorRef;
368     }
369 
370     public int getQueueCapacity() {
371         return queueSize;
372     }
373 
374     public int getQueueRemainingCapacity() {
375         return queue.remainingCapacity();
376     }
377 }