View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.util.ArrayList;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TransferQueue;
25  import java.util.concurrent.atomic.AtomicLong;
26  
27  import org.apache.logging.log4j.core.AbstractLogEvent;
28  import org.apache.logging.log4j.core.Appender;
29  import org.apache.logging.log4j.core.Filter;
30  import org.apache.logging.log4j.core.LogEvent;
31  import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
32  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
33  import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
34  import org.apache.logging.log4j.core.async.BlockingQueueFactory;
35  import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
36  import org.apache.logging.log4j.core.async.EventRoute;
37  import org.apache.logging.log4j.core.config.AppenderControl;
38  import org.apache.logging.log4j.core.config.AppenderRef;
39  import org.apache.logging.log4j.core.config.Configuration;
40  import org.apache.logging.log4j.core.config.ConfigurationException;
41  import org.apache.logging.log4j.core.config.plugins.Plugin;
42  import org.apache.logging.log4j.core.config.plugins.PluginAliases;
43  import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
44  import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
45  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
46  import org.apache.logging.log4j.core.config.plugins.PluginElement;
47  import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
48  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
49  import org.apache.logging.log4j.core.util.Constants;
50  import org.apache.logging.log4j.core.util.Log4jThread;
51  
52  /**
53   * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an
54   * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender
55   * references.
56   */
57  @Plugin(name = "Async", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
58  public final class AsyncAppender extends AbstractAppender {
59  
60      private static final int DEFAULT_QUEUE_SIZE = 128;
61      private static final LogEvent SHUTDOWN = new AbstractLogEvent() {
62      };
63  
64      private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1);
65  
66      private final BlockingQueue<LogEvent> queue;
67      private final int queueSize;
68      private final boolean blocking;
69      private final long shutdownTimeout;
70      private final Configuration config;
71      private final AppenderRef[] appenderRefs;
72      private final String errorRef;
73      private final boolean includeLocation;
74      private AppenderControl errorAppender;
75      private AsyncThread thread;
76      private AsyncQueueFullPolicy asyncQueueFullPolicy;
77  
78      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
79                            final String errorRef, final int queueSize, final boolean blocking,
80                            final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config,
81                            final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
82          super(name, filter, null, ignoreExceptions);
83          this.queue = blockingQueueFactory.create(queueSize);
84          this.queueSize = queueSize;
85          this.blocking = blocking;
86          this.shutdownTimeout = shutdownTimeout;
87          this.config = config;
88          this.appenderRefs = appenderRefs;
89          this.errorRef = errorRef;
90          this.includeLocation = includeLocation;
91      }
92  
93      @Override
94      public void start() {
95          final Map<String, Appender> map = config.getAppenders();
96          final List<AppenderControl> appenders = new ArrayList<>();
97          for (final AppenderRef appenderRef : appenderRefs) {
98              final Appender appender = map.get(appenderRef.getRef());
99              if (appender != null) {
100                 appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter()));
101             } else {
102                 LOGGER.error("No appender named {} was configured", appenderRef);
103             }
104         }
105         if (errorRef != null) {
106             final Appender appender = map.get(errorRef);
107             if (appender != null) {
108                 errorAppender = new AppenderControl(appender, null, null);
109             } else {
110                 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
111             }
112         }
113         if (appenders.size() > 0) {
114             thread = new AsyncThread(appenders, queue);
115             thread.setName("AsyncAppender-" + getName());
116         } else if (errorRef == null) {
117             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
118         }
119         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
120 
121         thread.start();
122         super.start();
123     }
124 
125     @Override
126     public boolean stop(final long timeout, final TimeUnit timeUnit) {
127         setStopping();
128         super.stop(timeout, timeUnit, false);
129         LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
130         thread.shutdown();
131         try {
132             thread.join(shutdownTimeout);
133         } catch (final InterruptedException ex) {
134             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
135         }
136         LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
137 
138         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
139             LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy,
140                 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
141         }
142         setStopped();
143         return true;
144     }
145 
146     /**
147      * Actual writing occurs here.
148      *
149      * @param logEvent The LogEvent.
150      */
151     @Override
152     public void append(final LogEvent logEvent) {
153         if (!isStarted()) {
154             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
155         }
156         if (!Constants.FORMAT_MESSAGES_IN_BACKGROUND) { // LOG4J2-898: user may choose
157             logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
158         }
159         final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation);
160         if (!transfer(memento)) {
161             if (blocking) {
162                 // delegate to the event router (which may discard, enqueue and block, or log in current thread)
163                 final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel());
164                 route.logMessage(this, memento);
165             } else {
166                 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
167                 logToErrorAppenderIfNecessary(false, memento);
168             }
169         }
170     }
171 
172     private boolean transfer(final LogEvent memento) {
173         return queue instanceof TransferQueue
174             ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento)
175             : queue.offer(memento);
176     }
177 
178     /**
179      * FOR INTERNAL USE ONLY.
180      *
181      * @param logEvent the event to log
182      */
183     public void logMessageInCurrentThread(final LogEvent logEvent) {
184         logEvent.setEndOfBatch(queue.isEmpty());
185         final boolean appendSuccessful = thread.callAppenders(logEvent);
186         logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
187     }
188 
189     /**
190      * FOR INTERNAL USE ONLY.
191      *
192      * @param logEvent the event to log
193      */
194     public void logMessageInBackgroundThread(final LogEvent logEvent) {
195         try {
196             // wait for free slots in the queue
197             queue.put(logEvent);
198         } catch (final InterruptedException e) {
199             final boolean appendSuccessful = handleInterruptedException(logEvent);
200             logToErrorAppenderIfNecessary(appendSuccessful, logEvent);
201         }
202     }
203 
204     // LOG4J2-1049: Some applications use Thread.interrupt() to send
205     // messages between application threads. This does not necessarily
206     // mean that the queue is full. To prevent dropping a log message,
207     // quickly try to offer the event to the queue again.
208     // (Yes, this means there is a possibility the same event is logged twice.)
209     //
210     // Finally, catching the InterruptedException means the
211     // interrupted flag has been cleared on the current thread.
212     // This may interfere with the application's expectation of
213     // being interrupted, so when we are done, we set the interrupted
214     // flag again.
215     private boolean handleInterruptedException(final LogEvent memento) {
216         final boolean appendSuccessful = queue.offer(memento);
217         if (!appendSuccessful) {
218             LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
219                 getName());
220         }
221         // set the interrupted flag again.
222         Thread.currentThread().interrupt();
223         return appendSuccessful;
224     }
225 
226     private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) {
227         if (!appendSuccessful && errorAppender != null) {
228             errorAppender.callAppender(logEvent);
229         }
230     }
231 
232     /**
233      * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the
234      * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior
235      * pre-2.7.
236      *
237      * @param appenderRefs     The Appenders to reference.
238      * @param errorRef         An optional Appender to write to if the queue is full or other errors occur.
239      * @param blocking         True if the Appender should wait when the queue is full. The default is true.
240      * @param shutdownTimeout  How many milliseconds the Appender should wait to flush outstanding log events
241      *                         in the queue on shutdown. The default is zero which means to wait forever.
242      * @param size             The size of the event queue. The default is 128.
243      * @param name             The name of the Appender.
244      * @param includeLocation  whether to include location information. The default is false.
245      * @param filter           The Filter or null.
246      * @param config           The Configuration.
247      * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged;
248      *                         otherwise they are propagated to the caller.
249      * @return The AsyncAppender.
250      * @deprecated use {@link Builder} instead
251      */
252     @Deprecated
253     public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef,
254                                                final boolean blocking, final long shutdownTimeout, final int size,
255                                                final String name, final boolean includeLocation, final Filter filter,
256                                                final Configuration config, final boolean ignoreExceptions) {
257         if (name == null) {
258             LOGGER.error("No name provided for AsyncAppender");
259             return null;
260         }
261         if (appenderRefs == null) {
262             LOGGER.error("No appender references provided to AsyncAppender {}", name);
263         }
264 
265         return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions,
266             shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>());
267     }
268 
269     @PluginBuilderFactory
270     public static Builder newBuilder() {
271         return new Builder();
272     }
273 
274     public static class Builder implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> {
275 
276         @PluginElement("AppenderRef")
277         @Required(message = "No appender references provided to AsyncAppender")
278         private AppenderRef[] appenderRefs;
279 
280         @PluginBuilderAttribute
281         @PluginAliases("error-ref")
282         private String errorRef;
283 
284         @PluginBuilderAttribute
285         private boolean blocking = true;
286 
287         @PluginBuilderAttribute
288         private long shutdownTimeout = 0L;
289 
290         @PluginBuilderAttribute
291         private int bufferSize = DEFAULT_QUEUE_SIZE;
292 
293         @PluginBuilderAttribute
294         @Required(message = "No name provided for AsyncAppender")
295         private String name;
296 
297         @PluginBuilderAttribute
298         private boolean includeLocation = false;
299 
300         @PluginElement("Filter")
301         private Filter filter;
302 
303         @PluginConfiguration
304         private Configuration configuration;
305 
306         @PluginBuilderAttribute
307         private boolean ignoreExceptions = true;
308 
309         @PluginElement(BlockingQueueFactory.ELEMENT_TYPE)
310         private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>();
311 
312         public Builder setAppenderRefs(final AppenderRef[] appenderRefs) {
313             this.appenderRefs = appenderRefs;
314             return this;
315         }
316 
317         public Builder setErrorRef(final String errorRef) {
318             this.errorRef = errorRef;
319             return this;
320         }
321 
322         public Builder setBlocking(final boolean blocking) {
323             this.blocking = blocking;
324             return this;
325         }
326 
327         public Builder setShutdownTimeout(final long shutdownTimeout) {
328             this.shutdownTimeout = shutdownTimeout;
329             return this;
330         }
331 
332         public Builder setBufferSize(final int bufferSize) {
333             this.bufferSize = bufferSize;
334             return this;
335         }
336 
337         public Builder setName(final String name) {
338             this.name = name;
339             return this;
340         }
341 
342         public Builder setIncludeLocation(final boolean includeLocation) {
343             this.includeLocation = includeLocation;
344             return this;
345         }
346 
347         public Builder setFilter(final Filter filter) {
348             this.filter = filter;
349             return this;
350         }
351 
352         public Builder setConfiguration(final Configuration configuration) {
353             this.configuration = configuration;
354             return this;
355         }
356 
357         public Builder setIgnoreExceptions(final boolean ignoreExceptions) {
358             this.ignoreExceptions = ignoreExceptions;
359             return this;
360         }
361 
362         public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) {
363             this.blockingQueueFactory = blockingQueueFactory;
364             return this;
365         }
366 
367         @Override
368         public AsyncAppender build() {
369             return new AsyncAppender(name, filter, appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions,
370                 shutdownTimeout, configuration, includeLocation, blockingQueueFactory);
371         }
372     }
373 
374     /**
375      * Thread that calls the Appenders.
376      */
377     private class AsyncThread extends Log4jThread {
378 
379         private volatile boolean shutdown = false;
380         private final List<AppenderControl> appenders;
381         private final BlockingQueue<LogEvent> queue;
382 
383         public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) {
384             super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement());
385             this.appenders = appenders;
386             this.queue = queue;
387             setDaemon(true);
388         }
389 
390         @Override
391         public void run() {
392             while (!shutdown) {
393                 LogEvent event;
394                 try {
395                     event = queue.take();
396                     if (event == SHUTDOWN) {
397                         shutdown = true;
398                         continue;
399                     }
400                 } catch (final InterruptedException ex) {
401                     break; // LOG4J2-830
402                 }
403                 event.setEndOfBatch(queue.isEmpty());
404                 final boolean success = callAppenders(event);
405                 if (!success && errorAppender != null) {
406                     try {
407                         errorAppender.callAppender(event);
408                     } catch (final Exception ex) {
409                         // Silently accept the error.
410                     }
411                 }
412             }
413             // Process any remaining items in the queue.
414             LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
415                 queue.size());
416             int count = 0;
417             int ignored = 0;
418             while (!queue.isEmpty()) {
419                 try {
420                     final LogEvent event = queue.take();
421                     if (event instanceof Log4jLogEvent) {
422                         final Log4jLogEvent logEvent = (Log4jLogEvent) event;
423                         logEvent.setEndOfBatch(queue.isEmpty());
424                         callAppenders(logEvent);
425                         count++;
426                     } else {
427                         ignored++;
428                         LOGGER.trace("Ignoring event of class {}", event.getClass().getName());
429                     }
430                 } catch (final InterruptedException ex) {
431                     // May have been interrupted to shut down.
432                     // Here we ignore interrupts and try to process all remaining events.
433                 }
434             }
435             LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. "
436                 + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored);
437         }
438 
439         /**
440          * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl}
441          * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any
442          * exceptions are silently ignored.
443          *
444          * @param event the event to forward to the registered appenders
445          * @return {@code true} if at least one appender call succeeded, {@code false} otherwise
446          */
447         boolean callAppenders(final LogEvent event) {
448             boolean success = false;
449             for (final AppenderControl control : appenders) {
450                 try {
451                     control.callAppender(event);
452                     success = true;
453                 } catch (final Exception ex) {
454                     // If no appender is successful the error appender will get it.
455                 }
456             }
457             return success;
458         }
459 
460         public void shutdown() {
461             shutdown = true;
462             if (queue.isEmpty()) {
463                 queue.offer(SHUTDOWN);
464             }
465             if (getState() == State.TIMED_WAITING || getState() == State.WAITING) {
466                 this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call
467             }
468         }
469     }
470 
471     /**
472      * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
473      *
474      * @return the names of the sink appenders
475      */
476     public String[] getAppenderRefStrings() {
477         final String[] result = new String[appenderRefs.length];
478         for (int i = 0; i < result.length; i++) {
479             result[i] = appenderRefs[i].getRef();
480         }
481         return result;
482     }
483 
484     /**
485      * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine
486      * the class and method where the logging call was made.
487      *
488      * @return {@code true} if location is included with every event, {@code false} otherwise
489      */
490     public boolean isIncludeLocation() {
491         return includeLocation;
492     }
493 
494     /**
495      * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are
496      * dropped when the queue is full.
497      *
498      * @return whether this AsyncAppender will block or drop events when the queue is full.
499      */
500     public boolean isBlocking() {
501         return blocking;
502     }
503 
504     /**
505      * Returns the name of the appender that any errors are logged to or {@code null}.
506      *
507      * @return the name of the appender that any errors are logged to or {@code null}
508      */
509     public String getErrorRef() {
510         return errorRef;
511     }
512 
513     public int getQueueCapacity() {
514         return queueSize;
515     }
516 
517     public int getQueueRemainingCapacity() {
518         return queue.remainingCapacity();
519     }
520 }