View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.Map;
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.Executors;
22  
23  import org.apache.logging.log4j.Level;
24  import org.apache.logging.log4j.Marker;
25  import org.apache.logging.log4j.ThreadContext;
26  import org.apache.logging.log4j.core.Logger;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.config.Property;
29  import org.apache.logging.log4j.core.helpers.Clock;
30  import org.apache.logging.log4j.core.helpers.ClockFactory;
31  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32  import org.apache.logging.log4j.message.Message;
33  import org.apache.logging.log4j.message.MessageFactory;
34  import org.apache.logging.log4j.status.StatusLogger;
35  
36  import com.lmax.disruptor.BlockingWaitStrategy;
37  import com.lmax.disruptor.EventHandler;
38  import com.lmax.disruptor.ExceptionHandler;
39  import com.lmax.disruptor.RingBuffer;
40  import com.lmax.disruptor.SleepingWaitStrategy;
41  import com.lmax.disruptor.WaitStrategy;
42  import com.lmax.disruptor.YieldingWaitStrategy;
43  import com.lmax.disruptor.dsl.Disruptor;
44  import com.lmax.disruptor.dsl.ProducerType;
45  import com.lmax.disruptor.util.Util;
46  
47  /**
48   * AsyncLogger is a logger designed for high throughput and low latency logging.
49   * It does not perform any I/O in the calling (application) thread, but instead
50   * hands off the work to another thread as soon as possible. The actual logging
51   * is performed in the background thread. It uses the LMAX Disruptor library for
52   * inter-thread communication. (<a
53   * href="http://lmax-exchange.github.com/disruptor/"
54   * >http://lmax-exchange.github.com/disruptor/</a>)
55   * <p>
56   * To use AsyncLogger, specify the System property
57   * {@code -DLog4jContextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector}
58   * before you obtain a Logger, and all Loggers returned by LogManager.getLogger
59   * will be AsyncLoggers.
60   * <p>
61   * Note that for performance reasons, this logger does not include source
62   * location by default. You need to specify {@code includeLocation="true"} in
63   * the configuration or any %class, %location or %line conversion patterns in
64   * your log4j.xml configuration will produce either a "?" character or no output
65   * at all.
66   * <p>
67   * For best performance, use AsyncLogger with the RandomAccessFileAppender or
68   * RollingRandomAccessFileAppender, with immediateFlush=false. These appenders have
69   * built-in support for the batching mechanism used by the Disruptor library,
70   * and they will flush to disk at the end of each batch. This means that even
71   * with immediateFlush=false, there will never be any items left in the buffer;
72   * all log events will all be written to disk in a very efficient manner.
73   */
74  public class AsyncLogger extends Logger {
75      private static final int HALF_A_SECOND = 500;
76      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
77      private static final int RINGBUFFER_MIN_SIZE = 128;
78      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
79      private static final StatusLogger LOGGER = StatusLogger.getLogger();
80  
81      private static volatile Disruptor<RingBufferLogEvent> disruptor;
82      private static Clock clock = ClockFactory.getClock();
83  
84      private static ExecutorService executor = Executors
85              .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
86      private final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
87  
88      static {
89          final int ringBufferSize = calculateRingBufferSize();
90  
91          final WaitStrategy waitStrategy = createWaitStrategy();
92          disruptor = new Disruptor<RingBufferLogEvent>(
93                  RingBufferLogEvent.FACTORY, ringBufferSize, executor,
94                  ProducerType.MULTI, waitStrategy);
95          final EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {//
96          new RingBufferLogEventHandler() };
97          disruptor.handleExceptionsWith(getExceptionHandler());
98          disruptor.handleEventsWith(handlers);
99  
100         LOGGER.debug(
101                 "Starting AsyncLogger disruptor with ringbuffer size {}...",
102                 disruptor.getRingBuffer().getBufferSize());
103         disruptor.start();
104     }
105 
106     private static int calculateRingBufferSize() {
107         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
108         final String userPreferredRBSize = System.getProperty(
109                 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
110         try {
111             int size = Integer.parseInt(userPreferredRBSize);
112             if (size < RINGBUFFER_MIN_SIZE) {
113                 size = RINGBUFFER_MIN_SIZE;
114                 LOGGER.warn(
115                         "Invalid RingBufferSize {}, using minimum size {}.",
116                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
117             }
118             ringBufferSize = size;
119         } catch (final Exception ex) {
120             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
121                     userPreferredRBSize, ringBufferSize);
122         }
123         return Util.ceilingNextPowerOfTwo(ringBufferSize);
124     }
125 
126     private static WaitStrategy createWaitStrategy() {
127         final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
128         LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
129         if ("Sleep".equals(strategy)) {
130             LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
131             return new SleepingWaitStrategy();
132         } else if ("Yield".equals(strategy)) {
133             LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
134             return new YieldingWaitStrategy();
135         } else if ("Block".equals(strategy)) {
136             LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
137             return new BlockingWaitStrategy();
138         }
139         LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
140         return new SleepingWaitStrategy();
141     }
142 
143     private static ExceptionHandler getExceptionHandler() {
144         final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
145         if (cls == null) {
146             LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
147             return null;
148         }
149         try {
150             @SuppressWarnings("unchecked")
151             final
152             Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
153                     .forName(cls);
154             final ExceptionHandler result = klass.newInstance();
155             LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
156             return result;
157         } catch (final Exception ignored) {
158             LOGGER.debug(
159                     "AsyncLogger.ExceptionHandler not set: error creating "
160                             + cls + ": ", ignored);
161             return null;
162         }
163     }
164 
165     /**
166      * Constructs an {@code AsyncLogger} with the specified context, name and
167      * message factory.
168      *
169      * @param context context of this logger
170      * @param name name of this logger
171      * @param messageFactory message factory of this logger
172      */
173     public AsyncLogger(final LoggerContext context, final String name,
174             final MessageFactory messageFactory) {
175         super(context, name, messageFactory);
176     }
177 
178     /**
179      * Tuple with the event translator and thread name for a thread.
180      */
181     private static class Info {
182         private RingBufferLogEventTranslator translator;
183         private String cachedThreadName;
184     }
185 
186     @Override
187     public void log(final Marker marker, final String fqcn, final Level level, final Message data,
188             final Throwable t) {
189         Info info = threadlocalInfo.get();
190         if (info == null) {
191             info = new Info();
192             info.translator = new RingBufferLogEventTranslator();
193             info.cachedThreadName = Thread.currentThread().getName();
194             threadlocalInfo.set(info);
195         }
196 
197         final boolean includeLocation = config.loggerConfig.isIncludeLocation();
198         info.translator.setValues(this, getName(), marker, fqcn, level, data,
199                 t, //
200 
201                 // config properties are taken care of in the EventHandler
202                 // thread in the #actualAsyncLog method
203 
204                 // needs shallow copy to be fast (LOG4J2-154)
205                 ThreadContext.getImmutableContext(), //
206 
207                 // needs shallow copy to be fast (LOG4J2-154)
208                 ThreadContext.getImmutableStack(), //
209 
210                 // Thread.currentThread().getName(), //
211                 info.cachedThreadName, //
212 
213                 // location: very expensive operation. LOG4J2-153:
214                 // Only include if "includeLocation=true" is specified,
215                 // exclude if not specified or if "false" was specified.
216                 includeLocation ? location(fqcn) : null,
217 
218                 // System.currentTimeMillis());
219                 // CoarseCachedClock: 20% faster than system clock, 16ms gaps
220                 // CachedClock: 10% faster than system clock, smaller gaps
221                 clock.currentTimeMillis());
222 
223         disruptor.publishEvent(info.translator);
224     }
225 
226     private StackTraceElement location(final String fqcnOfLogger) {
227         return Log4jLogEvent.calcLocation(fqcnOfLogger);
228     }
229 
230     /**
231      * This method is called by the EventHandler that processes the
232      * RingBufferLogEvent in a separate thread.
233      *
234      * @param event the event to log
235      */
236     public void actualAsyncLog(final RingBufferLogEvent event) {
237         final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
238         event.mergePropertiesIntoContextMap(properties,
239                 config.config.getStrSubstitutor());
240         config.logEvent(event);
241     }
242 
243     public static void stop() {
244         final Disruptor<RingBufferLogEvent> temp = disruptor;
245 
246         // Must guarantee that publishing to the RingBuffer has stopped
247         // before we call disruptor.shutdown()
248         disruptor = null; // client code fails with NPE if log after stop = OK
249         temp.shutdown();
250 
251         // wait up to 10 seconds for the ringbuffer to drain
252         final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
253         for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
254             if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
255                 break;
256             }
257             try {
258                 // give ringbuffer some time to drain...
259                 Thread.sleep(HALF_A_SECOND);
260             } catch (final InterruptedException e) {
261                 // ignored
262             }
263         }
264         executor.shutdown(); // finally, kill the processor thread
265     }
266 }