View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  
23  import org.apache.logging.log4j.Logger;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26  import org.apache.logging.log4j.core.util.Integers;
27  import org.apache.logging.log4j.status.StatusLogger;
28  import org.apache.logging.log4j.util.PropertiesUtil;
29  
30  import com.lmax.disruptor.BlockingWaitStrategy;
31  import com.lmax.disruptor.EventFactory;
32  import com.lmax.disruptor.EventHandler;
33  import com.lmax.disruptor.EventTranslatorTwoArg;
34  import com.lmax.disruptor.ExceptionHandler;
35  import com.lmax.disruptor.RingBuffer;
36  import com.lmax.disruptor.Sequence;
37  import com.lmax.disruptor.SequenceReportingEventHandler;
38  import com.lmax.disruptor.SleepingWaitStrategy;
39  import com.lmax.disruptor.WaitStrategy;
40  import com.lmax.disruptor.YieldingWaitStrategy;
41  import com.lmax.disruptor.dsl.Disruptor;
42  import com.lmax.disruptor.dsl.ProducerType;
43  
44  /**
45   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX
46   * Disruptor library.
47   * <p>
48   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do
49   * not configure any {@code <asyncLogger>} or {@code <asyncRoot>} elements in
50   * the configuration. If {@code AsyncLoggerConfig} has inner classes that extend
51   * or implement classes from the Disruptor library, a
52   * {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in the
53   * classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin
54   * from the pre-defined plugins definition file.
55   * <p>
56   * This class serves to make the dependency on the Disruptor optional, so that
57   * these classes are only loaded when the {@code AsyncLoggerConfig} is actually
58   * used.
59   */
60  class AsyncLoggerConfigHelper {
61  
62      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
63      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
64      private static final int RINGBUFFER_MIN_SIZE = 128;
65      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
66      private static final Logger LOGGER = StatusLogger.getLogger();
67  
68      private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
69      private static volatile Disruptor<Log4jEventWrapper> disruptor;
70      private static ExecutorService executor;
71  
72      private static volatile int count = 0;
73      private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
74  
75      /**
76       * Factory used to populate the RingBuffer with events. These event objects
77       * are then re-used during the life of the RingBuffer.
78       */
79      private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
80          @Override
81          public Log4jEventWrapper newInstance() {
82              return new Log4jEventWrapper();
83          }
84      };
85  
86      /**
87       * Object responsible for passing on data to a specific RingBuffer event.
88       */
89      private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator 
90              = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
91  
92          @Override
93          public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence, 
94                  final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
95              ringBufferElement.event = logEvent;
96              ringBufferElement.loggerConfig = loggerConfig;
97          }
98      };
99  
100     private final AsyncLoggerConfig asyncLoggerConfig;
101 
102     public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
103         this.asyncLoggerConfig = asyncLoggerConfig;
104         claim();
105     }
106 
107     private static synchronized void initDisruptor() {
108         if (disruptor != null) {
109             LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
110             return;
111         }
112         LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
113         final int ringBufferSize = calculateRingBufferSize();
114         final WaitStrategy waitStrategy = createWaitStrategy();
115         executor = Executors.newSingleThreadExecutor(threadFactory);
116         initThreadLocalForExecutorThread();
117         disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
118         final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
119         new Log4jEventWrapperHandler() };
120         final ExceptionHandler<Log4jEventWrapper> errorHandler = getExceptionHandler();
121         disruptor.handleExceptionsWith(errorHandler);
122         disruptor.handleEventsWith(handlers);
123 
124         LOGGER.debug(
125                 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
126                 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
127         disruptor.start();
128     }
129 
130     private static WaitStrategy createWaitStrategy() {
131         final String strategy = System
132                 .getProperty("AsyncLoggerConfig.WaitStrategy");
133         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
134         if ("Sleep".equals(strategy)) {
135             return new SleepingWaitStrategy();
136         } else if ("Yield".equals(strategy)) {
137             return new YieldingWaitStrategy();
138         } else if ("Block".equals(strategy)) {
139             return new BlockingWaitStrategy();
140         }
141         LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
142         return new BlockingWaitStrategy();
143     }
144 
145     private static int calculateRingBufferSize() {
146         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
147         final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
148                 "AsyncLoggerConfig.RingBufferSize",
149                 String.valueOf(ringBufferSize));
150         try {
151             int size = Integer.parseInt(userPreferredRBSize);
152             if (size < RINGBUFFER_MIN_SIZE) {
153                 size = RINGBUFFER_MIN_SIZE;
154                 LOGGER.warn(
155                         "Invalid RingBufferSize {}, using minimum size {}.",
156                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
157             }
158             ringBufferSize = size;
159         } catch (final Exception ex) {
160             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
161                     userPreferredRBSize, ringBufferSize);
162         }
163         return Integers.ceilingNextPowerOfTwo(ringBufferSize);
164     }
165 
166     private static ExceptionHandler<Log4jEventWrapper> getExceptionHandler() {
167         final String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
168         if (cls == null) {
169             return null;
170         }
171         try {
172             @SuppressWarnings("unchecked")
173             final Class<? extends ExceptionHandler<Log4jEventWrapper>> klass = (Class<? extends ExceptionHandler<Log4jEventWrapper>>) Class
174                     .forName(cls);
175             return klass.newInstance();
176         } catch (final Exception ignored) {
177             LOGGER.debug("AsyncLoggerConfig.ExceptionHandler not set: error creating " + cls + ": ", ignored);
178             return null;
179         }
180     }
181 
182     /**
183      * RingBuffer events contain all information necessary to perform the work
184      * in a separate thread.
185      */
186     private static class Log4jEventWrapper {
187         private AsyncLoggerConfig loggerConfig;
188         private LogEvent event;
189 
190         /**
191          * Release references held by ring buffer to allow objects to be
192          * garbage-collected.
193          */
194         public void clear() {
195             loggerConfig = null;
196             event = null;
197         }
198     }
199 
200     /**
201      * EventHandler performs the work in a separate thread.
202      */
203     private static class Log4jEventWrapperHandler implements
204             SequenceReportingEventHandler<Log4jEventWrapper> {
205         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
206         private Sequence sequenceCallback;
207         private int counter;
208 
209         @Override
210         public void setSequenceCallback(final Sequence sequenceCallback) {
211             this.sequenceCallback = sequenceCallback;
212         }
213 
214         @Override
215         public void onEvent(final Log4jEventWrapper event, final long sequence,
216                 final boolean endOfBatch) throws Exception {
217             event.event.setEndOfBatch(endOfBatch);
218             event.loggerConfig.asyncCallAppenders(event.event);
219             event.clear();
220 
221             notifyIntermediateProgress(sequence);
222         }
223 
224         /**
225          * Notify the BatchEventProcessor that the sequence has progressed.
226          * Without this callback the sequence would not be progressed
227          * until the batch has completely finished.
228          */
229         private void notifyIntermediateProgress(final long sequence) {
230             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
231                 sequenceCallback.set(sequence);
232                 counter = 0;
233             }
234         }
235     }
236 
237     /**
238      * Increases the reference count and creates and starts a new Disruptor and
239      * associated thread if none currently exists.
240      * 
241      * @see #release()
242      */
243     synchronized static void claim() {
244         count++;
245         initDisruptor();
246     }
247 
248     /**
249      * Decreases the reference count. If the reference count reached zero, the
250      * Disruptor and its associated thread are shut down and their references
251      * set to {@code null}.
252      */
253     synchronized static void release() {
254         if (--count > 0) {
255             LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
256             return;
257         }
258         final Disruptor<Log4jEventWrapper> temp = disruptor;
259         if (temp == null) {
260             LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
261                     count);
262             count = 0; // ref count must not be negative or #claim() will not work correctly
263             return; // disruptor was already shut down by another thread
264         }
265         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
266 
267         // Must guarantee that publishing to the RingBuffer has stopped
268         // before we call disruptor.shutdown()
269         disruptor = null; // client code fails with NPE if log after stop = OK
270 
271         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
272         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
273         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
274         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
275             try {
276                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
277             } catch (final InterruptedException e) { // ignored
278             }
279         }
280         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
281         executor.shutdown(); // finally, kill the processor thread
282         executor = null; // release reference to allow GC
283     }
284 
285     /**
286      * Returns {@code true} if the specified disruptor still has unprocessed events.
287      */
288     private static boolean hasBacklog(final Disruptor<?> disruptor) {
289         final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
290         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
291     }
292 
293     /**
294      * Initialize the threadlocal that allows us to detect Logger.log() calls 
295      * initiated from the appender thread, which may cause deadlock when the 
296      * RingBuffer is full. (LOG4J2-471)
297      */
298     private static void initThreadLocalForExecutorThread() {
299         executor.submit(new Runnable() {
300             @Override
301             public void run() {
302                 isAppenderThread.set(Boolean.TRUE);
303             }
304         });
305     }
306 
307     /**
308      * If possible, delegates the invocation to {@code callAppenders} to another
309      * thread and returns {@code true}. If this is not possible (if it detects
310      * that delegating to another thread would cause deadlock because the
311      * current call to Logger.log() originated from the appender thread and the
312      * ringbuffer is full) then this method does nothing and returns {@code false}.
313      * It is the responsibility of the caller to process the event when this
314      * method returns {@code false}.
315      * 
316      * @param event the event to delegate to another thread
317      * @return {@code true} if delegation was successful, {@code false} if the
318      *          calling thread needs to process the event itself
319      */
320     public boolean callAppendersFromAnotherThread(final LogEvent event) {
321         final Disruptor<Log4jEventWrapper> temp = disruptor;
322         if (!hasLog4jBeenShutDown(temp)) {
323 
324             // LOG4J2-471: prevent deadlock when RingBuffer is full and object
325             // being logged calls Logger.log() from its toString() method
326             if (isCalledFromAppenderThreadAndBufferFull(temp)) {
327                 // bypass RingBuffer and invoke Appender directly
328                 return false;
329             }
330             enqueueEvent(event);
331         }
332         return true;
333     }
334 
335     /**
336      * Returns {@code true} if the specified disruptor is null.
337      */
338     private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
339         if (aDisruptor == null) { // LOG4J2-639
340             LOGGER.fatal("Ignoring log event after log4j was shut down");
341             return true;
342         }
343         return false;
344     }
345 
346     private void enqueueEvent(final LogEvent event) {
347         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
348         try {
349             final LogEvent logEvent = prepareEvent(event);
350             enqueue(logEvent);
351         } catch (final NullPointerException npe) {
352             LOGGER.fatal("Ignoring log event after log4j was shut down.");
353         }
354     }
355 
356     private LogEvent prepareEvent(final LogEvent event) {
357         final LogEvent logEvent = ensureImmutable(event);
358         logEvent.getMessage().getFormattedMessage(); // LOG4J2-763: ask message to freeze parameters
359         return logEvent;
360     }
361 
362     private void enqueue(LogEvent logEvent) {
363         // Note: do NOT use the temp variable above!
364         // That could result in adding a log event to the disruptor after it was shut down,
365         // which could cause the publishEvent method to hang and never return.
366         disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
367     }
368 
369     private LogEvent ensureImmutable(final LogEvent event) {
370         LogEvent result = event;
371         if (event instanceof RingBufferLogEvent) {
372             // Deal with special case where both types of Async Loggers are used together:
373             // RingBufferLogEvents are created by the all-loggers-async type, but
374             // this event is also consumed by the some-loggers-async type (this class).
375             // The original event will be re-used and modified in an application thread later,
376             // so take a snapshot of it, which can be safely processed in the
377             // some-loggers-async background thread.
378             result = ((RingBufferLogEvent) event).createMemento();
379         }
380         return result;
381     }
382 
383     /**
384      * Returns true if the specified ringbuffer is full and the Logger.log() call was made from the appender thread.
385      */
386     private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> disruptor) {
387         return isAppenderThread.get() == Boolean.TRUE && disruptor.getRingBuffer().remainingCapacity() == 0;
388     }
389 
390     /**
391      * Creates and returns a new {@code RingBufferAdmin} that instruments the
392      * ringbuffer of this {@code AsyncLoggerConfig}.
393      * 
394      * @param contextName name of the {@code LoggerContext}
395      * @param loggerConfigName name of the logger config
396      */
397     public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
398         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
399     }
400 
401 }