View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.Map;
20  import java.util.Objects;
21  import java.util.concurrent.ExecutorService;
22  import java.util.concurrent.Executors;
23  
24  import org.apache.logging.log4j.Level;
25  import org.apache.logging.log4j.Marker;
26  import org.apache.logging.log4j.ThreadContext;
27  import org.apache.logging.log4j.core.Logger;
28  import org.apache.logging.log4j.core.LoggerContext;
29  import org.apache.logging.log4j.core.config.Property;
30  import org.apache.logging.log4j.core.config.ReliabilityStrategy;
31  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
33  import org.apache.logging.log4j.core.util.Clock;
34  import org.apache.logging.log4j.core.util.ClockFactory;
35  import org.apache.logging.log4j.core.util.DummyNanoClock;
36  import org.apache.logging.log4j.core.util.Integers;
37  import org.apache.logging.log4j.core.util.Loader;
38  import org.apache.logging.log4j.core.util.NanoClock;
39  import org.apache.logging.log4j.message.Message;
40  import org.apache.logging.log4j.message.MessageFactory;
41  import org.apache.logging.log4j.message.TimestampMessage;
42  import org.apache.logging.log4j.status.StatusLogger;
43  import org.apache.logging.log4j.util.PropertiesUtil;
44  
45  import com.lmax.disruptor.BlockingWaitStrategy;
46  import com.lmax.disruptor.ExceptionHandler;
47  import com.lmax.disruptor.RingBuffer;
48  import com.lmax.disruptor.SleepingWaitStrategy;
49  import com.lmax.disruptor.WaitStrategy;
50  import com.lmax.disruptor.YieldingWaitStrategy;
51  import com.lmax.disruptor.dsl.Disruptor;
52  import com.lmax.disruptor.dsl.ProducerType;
53  
54  /**
55   * AsyncLogger is a logger designed for high throughput and low latency logging.
56   * It does not perform any I/O in the calling (application) thread, but instead
57   * hands off the work to another thread as soon as possible. The actual logging
58   * is performed in the background thread. It uses the LMAX Disruptor library for
59   * inter-thread communication. (<a
60   * href="http://lmax-exchange.github.com/disruptor/"
61   * >http://lmax-exchange.github.com/disruptor/</a>)
62   * <p>
63   * To use AsyncLogger, specify the System property
64   * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
65   * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
66   * will be AsyncLoggers.
67   * <p>
68   * Note that for performance reasons, this logger does not include source
69   * location by default. You need to specify {@code includeLocation="true"} in
70   * the configuration or any %class, %location or %line conversion patterns in
71   * your log4j.xml configuration will produce either a "?" character or no output
72   * at all.
73   * <p>
74   * For best performance, use AsyncLogger with the RandomAccessFileAppender or
75   * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders
76   * have built-in support for the batching mechanism used by the Disruptor
77   * library, and they will flush to disk at the end of each batch. This means
78   * that even with immediateFlush=false, there will never be any items left in
79   * the buffer; all log events will all be written to disk in a very efficient
80   * manner.
81   */
82  public class AsyncLogger extends Logger {
83      private static final long serialVersionUID = 1L;
84      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
85      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
86      private static final int RINGBUFFER_MIN_SIZE = 128;
87      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
88      private static final StatusLogger LOGGER = StatusLogger.getLogger();
89      private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
90  
91      static enum ThreadNameStrategy { // LOG4J2-467
92          CACHED {
93              @Override
94              public String getThreadName(final Info info) {
95                  return info.cachedThreadName;
96              }
97          },
98          UNCACHED {
99              @Override
100             public String getThreadName(final Info info) {
101                 return Thread.currentThread().getName();
102             }
103         };
104         abstract String getThreadName(Info info);
105 
106         static ThreadNameStrategy create() {
107             final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
108             try {
109                 return ThreadNameStrategy.valueOf(name);
110             } catch (final Exception ex) {
111                 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
112                 return CACHED;
113             }
114         }
115     }
116     private static volatile Disruptor<RingBufferLogEvent> disruptor;
117     private static final Clock CLOCK = ClockFactory.getClock();
118     private static volatile NanoClock nanoClock = new DummyNanoClock();
119 
120     private static final ExecutorService executor = Executors
121             .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
122 
123     static {
124         initInfoForExecutorThread();
125         LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
126         final int ringBufferSize = calculateRingBufferSize();
127 
128         final WaitStrategy waitStrategy = createWaitStrategy();
129         disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
130                 waitStrategy);
131         disruptor.handleExceptionsWith(getExceptionHandler());
132         disruptor.handleEventsWith(new RingBufferLogEventHandler());
133 
134         LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
135                 .getBufferSize());
136         disruptor.start();
137     }
138 
139     private static int calculateRingBufferSize() {
140         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
141         final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize",
142                 String.valueOf(ringBufferSize));
143         try {
144             int size = Integer.parseInt(userPreferredRBSize);
145             if (size < RINGBUFFER_MIN_SIZE) {
146                 size = RINGBUFFER_MIN_SIZE;
147                 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
148                         RINGBUFFER_MIN_SIZE);
149             }
150             ringBufferSize = size;
151         } catch (final Exception ex) {
152             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
153         }
154         return Integers.ceilingNextPowerOfTwo(ringBufferSize);
155     }
156 
157     /**
158      * Initialize an {@code Info} object that is threadlocal to the consumer/appender thread.
159      * This Info object uniquely has attribute {@code isAppenderThread} set to {@code true}.
160      * All other Info objects will have this attribute set to {@code false}.
161      * This allows us to detect Logger.log() calls initiated from the appender thread,
162      * which may cause deadlock when the RingBuffer is full. (LOG4J2-471)
163      */
164     private static void initInfoForExecutorThread() {
165         executor.submit(new Runnable(){
166             @Override
167             public void run() {
168                 final boolean isAppenderThread = true;
169                 final Info info = new Info(new RingBufferLogEventTranslator(), //
170                         Thread.currentThread().getName(), isAppenderThread);
171                 Info.threadlocalInfo.set(info);
172             }
173         });
174     }
175 
176     private static WaitStrategy createWaitStrategy() {
177         final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
178         LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
179         if ("Sleep".equals(strategy)) {
180             return new SleepingWaitStrategy();
181         } else if ("Yield".equals(strategy)) {
182             return new YieldingWaitStrategy();
183         } else if ("Block".equals(strategy)) {
184             return new BlockingWaitStrategy();
185         }
186         LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
187         return new BlockingWaitStrategy();
188     }
189 
190     private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
191         final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
192         if (cls == null) {
193             LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
194             return null;
195         }
196         try {
197             @SuppressWarnings("unchecked")
198             final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
199             LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
200             return result;
201         } catch (final Exception ignored) {
202             LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
203             return null;
204         }
205     }
206 
207     /**
208      * Constructs an {@code AsyncLogger} with the specified context, name and
209      * message factory.
210      *
211      * @param context context of this logger
212      * @param name name of this logger
213      * @param messageFactory message factory of this logger
214      */
215     public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
216         super(context, name, messageFactory);
217     }
218 
219     /**
220      * Tuple with the event translator and thread name for a thread.
221      */
222     static class Info {
223         private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>() {
224             @Override
225             protected Info initialValue() {
226                 // by default, set isAppenderThread to false
227                 return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
228             }
229         };
230         private final RingBufferLogEventTranslator translator;
231         private final String cachedThreadName;
232         private final boolean isAppenderThread;
233         
234         public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
235             this.translator = translator;
236             this.cachedThreadName = threadName;
237             this.isAppenderThread = appenderThread;
238         }
239 
240         // LOG4J2-467
241         private String threadName() {
242             return THREAD_NAME_STRATEGY.getThreadName(this);
243         }
244     }
245 
246     @Override
247     public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
248             final Throwable thrown) {
249         
250         final Disruptor<RingBufferLogEvent> temp = disruptor;
251         if (temp == null) { // LOG4J2-639
252             LOGGER.fatal("Ignoring log event after log4j was shut down");
253         } else {
254             logMessage0(temp, fqcn, level, marker, message, thrown);
255         }
256     }
257 
258     private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level,
259             final Marker marker, final Message message, final Throwable thrown) {
260         final Info info = Info.threadlocalInfo.get();
261         logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown);
262     }
263 
264     private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
265             final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
266         if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) {
267             logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown);
268         }
269     }
270 
271     /**
272      * LOG4J2-471: prevent deadlock when RingBuffer is full and object
273      * being logged calls Logger.log() from its toString() method
274      *
275      * @param info threadlocal information - used to determine if the current thread is the background appender thread
276      * @param theDisruptor used to check if the buffer is full
277      * @param fqcn fully qualified caller name
278      * @param level log level
279      * @param marker optional marker
280      * @param message log message
281      * @param thrown optional exception
282      * @return {@code true} if the event has been logged in the current thread, {@code false} if it should be passed to
283      *          the background thread
284      */
285     private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
286             final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
287         if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) {
288             // bypass RingBuffer and invoke Appender directly
289             final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
290             strategy.log(this, getName(), fqcn, marker, level, message, thrown);
291             return true;
292         }
293         return false;
294     }
295 
296     /**
297      * Enqueues the specified message to be logged in the background thread.
298      * 
299      * @param info holds some cached information
300      * @param fqcn fully qualified caller name
301      * @param level log level
302      * @param marker optional marker
303      * @param message log message
304      * @param thrown optional exception
305      */
306     private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker,
307             final Message message, final Throwable thrown) {
308         
309         message.getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
310 
311         initLogMessageInfo(info, fqcn, level, marker, message, thrown);
312         enqueueLogMessageInfo(info);
313     }
314 
315     private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker,
316             final Message message, final Throwable thrown) {
317         info.translator.setValues(this, getName(), marker, fqcn, level, message, //
318                 // don't construct ThrowableProxy until required
319                 thrown, //
320 
321                 // config properties are taken care of in the EventHandler
322                 // thread in the #actualAsyncLog method
323 
324                 // needs shallow copy to be fast (LOG4J2-154)
325                 ThreadContext.getImmutableContext(), //
326 
327                 // needs shallow copy to be fast (LOG4J2-154)
328                 ThreadContext.getImmutableStack(), //
329 
330                 // Thread.currentThread().getName(), //
331                 // info.cachedThreadName, //
332                 info.threadName(), //
333 
334                 // location: very expensive operation. LOG4J2-153:
335                 // Only include if "includeLocation=true" is specified,
336                 // exclude if not specified or if "false" was specified.
337                 calcLocationIfRequested(fqcn),
338 
339                 // System.currentTimeMillis());
340                 // CoarseCachedClock: 20% faster than system clock, 16ms gaps
341                 // CachedClock: 10% faster than system clock, smaller gaps
342                 // LOG4J2-744 avoid calling clock altogether if message has the timestamp
343                 eventTimeMillis(message), //
344                 nanoClock.nanoTime() //
345         );
346     }
347 
348     private long eventTimeMillis(final Message message) {
349         return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
350                 CLOCK.currentTimeMillis();
351     }
352 
353     /**
354      * Returns the caller location if requested, {@code null} otherwise.
355      * @param fqcn fully qualified caller name.
356      * @return the caller location if requested, {@code null} otherwise.
357      */
358     private StackTraceElement calcLocationIfRequested(String fqcn) {
359         final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation();
360         return includeLocation ? location(fqcn) : null;
361     }
362 
363     private void enqueueLogMessageInfo(Info info) {
364         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
365         try {
366             // Note: do NOT use the temp variable above!
367             // That could result in adding a log event to the disruptor after it was shut down,
368             // which could cause the publishEvent method to hang and never return.
369             disruptor.publishEvent(info.translator);
370         } catch (final NullPointerException npe) {
371             LOGGER.fatal("Ignoring log event after log4j was shut down.");
372         }
373     }
374 
375     private static StackTraceElement location(final String fqcnOfLogger) {
376         return Log4jLogEvent.calcLocation(fqcnOfLogger);
377     }
378 
379     /**
380      * This method is called by the EventHandler that processes the
381      * RingBufferLogEvent in a separate thread.
382      *
383      * @param event the event to log
384      */
385     public void actualAsyncLog(final RingBufferLogEvent event) {
386         final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties();
387         event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor());
388         final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
389         strategy.log(this, event);
390     }
391 
392     public static void stop() {
393         final Disruptor<RingBufferLogEvent> temp = disruptor;
394 
395         // Must guarantee that publishing to the RingBuffer has stopped
396         // before we call disruptor.shutdown()
397         disruptor = null; // client code fails with NPE if log after stop = OK
398         if (temp == null) {
399             return; // stop() has already been called
400         }
401 
402         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
403         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
404         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
405         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
406             try {
407                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
408             } catch (final InterruptedException e) { // ignored
409             }
410         }
411         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
412         executor.shutdown(); // finally, kill the processor thread
413         Info.threadlocalInfo.remove(); // LOG4J2-323
414     }
415 
416     /**
417      * Returns {@code true} if the specified disruptor still has unprocessed events.
418      */
419     private static boolean hasBacklog(final Disruptor<?> disruptor) {
420         final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
421         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
422     }
423 
424     /**
425      * Creates and returns a new {@code RingBufferAdmin} that instruments the
426      * ringbuffer of the {@code AsyncLogger}.
427      *
428      * @param contextName name of the global {@code AsyncLoggerContext}
429      */
430     public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
431         return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
432     }
433     
434     /**
435      * Returns the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
436      * @return the {@code NanoClock} to use for creating the nanoTime timestamp of log events
437      */
438     public static NanoClock getNanoClock() {
439         return nanoClock;
440     }
441     
442     /**
443      * Sets the {@code NanoClock} to use for creating the nanoTime timestamp of log events.
444      * <p>
445      * FOR INTERNAL USE. This method may be called with a different {@code NanoClock} implementation when the
446      * configuration changes.
447      * 
448      * @param nanoClock the {@code NanoClock} to use for creating the nanoTime timestamp of log events
449      */
450     public static void setNanoClock(NanoClock nanoClock) {
451         AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null");
452     }
453 }