View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.logging.log4j.Level;
25  import org.apache.logging.log4j.core.AbstractLifeCycle;
26  import org.apache.logging.log4j.core.LogEvent;
27  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
28  import org.apache.logging.log4j.core.impl.LogEventFactory;
29  import org.apache.logging.log4j.core.impl.MutableLogEvent;
30  import org.apache.logging.log4j.core.impl.ReusableLogEventFactory;
31  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
32  import org.apache.logging.log4j.core.util.ExecutorServices;
33  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
34  import org.apache.logging.log4j.message.ReusableMessage;
35  
36  import com.lmax.disruptor.EventFactory;
37  import com.lmax.disruptor.EventTranslatorTwoArg;
38  import com.lmax.disruptor.ExceptionHandler;
39  import com.lmax.disruptor.RingBuffer;
40  import com.lmax.disruptor.Sequence;
41  import com.lmax.disruptor.SequenceReportingEventHandler;
42  import com.lmax.disruptor.WaitStrategy;
43  import com.lmax.disruptor.dsl.Disruptor;
44  import com.lmax.disruptor.dsl.ProducerType;
45  
46  /**
47   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX Disruptor library.
48   * <p>
49   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do not configure any {@code <asyncLogger>} or
50   * {@code <asyncRoot>} elements in the configuration. If {@code AsyncLoggerConfig} has inner classes that extend or
51   * implement classes from the Disruptor library, a {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in
52   * the classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin from the pre-defined plugins
53   * definition file.
54   * <p>
55   * This class serves to make the dependency on the Disruptor optional, so that these classes are only loaded when the
56   * {@code AsyncLoggerConfig} is actually used.
57   */
58  public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate {
59  
60      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
61      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
62  
63      /**
64       * RingBuffer events contain all information necessary to perform the work in a separate thread.
65       */
66      public static class Log4jEventWrapper {
67          public Log4jEventWrapper() {
68          }
69  
70          public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) {
71              event = mutableLogEvent;
72          }
73  
74          private AsyncLoggerConfig loggerConfig;
75          private LogEvent event;
76  
77          /**
78           * Release references held by ring buffer to allow objects to be garbage-collected.
79           */
80          public void clear() {
81              loggerConfig = null;
82              if (event instanceof MutableLogEvent) {
83                  ((MutableLogEvent) event).clear();
84              } else {
85                  event = null;
86              }
87          }
88  
89          @Override
90          public String toString() {
91              return String.valueOf(event);
92          }
93      }
94  
95      /**
96       * EventHandler performs the work in a separate thread.
97       */
98      private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
99          private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
100         private Sequence sequenceCallback;
101         private int counter;
102 
103         @Override
104         public void setSequenceCallback(final Sequence sequenceCallback) {
105             this.sequenceCallback = sequenceCallback;
106         }
107 
108         @Override
109         public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
110                 throws Exception {
111             event.event.setEndOfBatch(endOfBatch);
112             event.loggerConfig.asyncCallAppenders(event.event);
113             event.clear();
114 
115             notifyIntermediateProgress(sequence);
116         }
117 
118         /**
119          * Notify the BatchEventProcessor that the sequence has progressed. Without this callback the sequence would not
120          * be progressed until the batch has completely finished.
121          */
122         private void notifyIntermediateProgress(final long sequence) {
123             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
124                 sequenceCallback.set(sequence);
125                 counter = 0;
126             }
127         }
128     }
129 
130     /**
131      * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
132      * RingBuffer.
133      */
134     private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
135         @Override
136         public Log4jEventWrapper newInstance() {
137             return new Log4jEventWrapper();
138         }
139     };
140 
141     /**
142      * Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
143      * RingBuffer.
144      */
145     private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() {
146         @Override
147         public Log4jEventWrapper newInstance() {
148             return new Log4jEventWrapper(new MutableLogEvent());
149         }
150     };
151 
152     /**
153      * Object responsible for passing on data to a specific RingBuffer event.
154      */
155     private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
156             new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
157 
158         @Override
159         public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
160                 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
161             ringBufferElement.event = logEvent;
162             ringBufferElement.loggerConfig = loggerConfig;
163         }
164     };
165 
166     /**
167      * Object responsible for passing on data to a RingBuffer event with a MutableLogEvent.
168      */
169     private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
170             new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
171 
172         @Override
173         public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
174                 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
175             ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
176             ringBufferElement.loggerConfig = loggerConfig;
177         }
178     };
179 
180     private static final ThreadFactory THREAD_FACTORY = Log4jThreadFactory.createDaemonThreadFactory("AsyncLoggerConfig");
181 
182     private int ringBufferSize;
183     private AsyncQueueFullPolicy asyncQueueFullPolicy;
184     private Boolean mutable = Boolean.FALSE;
185 
186     private volatile Disruptor<Log4jEventWrapper> disruptor;
187     private ExecutorService executor;
188     private long backgroundThreadId; // LOG4J2-471
189     private EventFactory<Log4jEventWrapper> factory;
190     private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator;
191 
192     public AsyncLoggerConfigDisruptor() {
193     }
194 
195     // called from AsyncLoggerConfig constructor
196     @Override
197     public void setLogEventFactory(final LogEventFactory logEventFactory) {
198         // if any AsyncLoggerConfig uses a ReusableLogEventFactory
199         // then we need to populate our ringbuffer with MutableLogEvents
200         this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory);
201     }
202 
203     /**
204      * Increases the reference count and creates and starts a new Disruptor and associated thread if none currently
205      * exists.
206      *
207      * @see #stop()
208      */
209     @Override
210     public synchronized void start() {
211         if (disruptor != null) {
212             LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "
213                     + "using existing object.");
214             return;
215         }
216         LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");
217         ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
218         final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
219         executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
220         backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
221         asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
222 
223         translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;
224         factory = mutable ? MUTABLE_FACTORY : FACTORY;
225         disruptor = new Disruptor<>(factory, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
226 
227         final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
228         disruptor.handleExceptionsWith(errorHandler);
229 
230         final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
231         disruptor.handleEventsWith(handlers);
232 
233         LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
234                 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
235                 .getClass().getSimpleName(), errorHandler);
236         disruptor.start();
237         super.start();
238     }
239 
240     /**
241      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
242      * shut down and their references set to {@code null}.
243      */
244     @Override
245     public boolean stop(final long timeout, final TimeUnit timeUnit) {
246         final Disruptor<Log4jEventWrapper> temp = disruptor;
247         if (temp == null) {
248             LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down.");
249             return true; // disruptor was already shut down by another thread
250         }
251         setStopping();
252         LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration.");
253 
254         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
255         disruptor = null; // client code fails with NPE if log after stop = OK
256 
257         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
258         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
259         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
260         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
261             try {
262                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
263             } catch (final InterruptedException e) { // ignored
264             }
265         }
266         temp.shutdown(); // busy-spins until all events currently in the disruptor have been processed
267 
268         LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor executor for this configuration.");
269         // finally, kill the processor thread
270         ExecutorServices.shutdown(executor, timeout, timeUnit, toString());
271         executor = null; // release reference to allow GC
272 
273         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
274             LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
275                     DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
276         }
277         setStopped();
278         return true;
279     }
280 
281     /**
282      * Returns {@code true} if the specified disruptor still has unprocessed events.
283      */
284     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
285         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
286         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
287     }
288 
289     @Override
290     public EventRoute getEventRoute(final Level logLevel) {
291         final int remainingCapacity = remainingDisruptorCapacity();
292         if (remainingCapacity < 0) {
293             return EventRoute.DISCARD;
294         }
295         return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
296     }
297 
298     private int remainingDisruptorCapacity() {
299         final Disruptor<Log4jEventWrapper> temp = disruptor;
300         if (hasLog4jBeenShutDown(temp)) {
301             return -1;
302         }
303         return (int) temp.getRingBuffer().remainingCapacity();
304     }
305 
306     /**
307      * Returns {@code true} if the specified disruptor is null.
308      */
309     private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
310         if (aDisruptor == null) { // LOG4J2-639
311             LOGGER.warn("Ignoring log event after log4j was shut down");
312             return true;
313         }
314         return false;
315     }
316 
317     @Override
318     public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
319         // LOG4J2-639: catch NPE if disruptor field was set to null after our check above
320         try {
321             final LogEvent logEvent = prepareEvent(event);
322             enqueue(logEvent, asyncLoggerConfig);
323         } catch (final NullPointerException npe) {
324             // Note: NPE prevents us from adding a log event to the disruptor after it was shut down,
325             // which could cause the publishEvent method to hang and never return.
326             LOGGER.warn("Ignoring log event after log4j was shut down.");
327         }
328     }
329 
330     private LogEvent prepareEvent(final LogEvent event) {
331         final LogEvent logEvent = ensureImmutable(event);
332         if (logEvent instanceof Log4jLogEvent && logEvent.getMessage() instanceof ReusableMessage) {
333             ((Log4jLogEvent) logEvent).makeMessageImmutable();
334         }
335         return logEvent;
336     }
337 
338     private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
339         disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
340     }
341 
342     @Override
343     public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
344         final LogEvent logEvent = prepareEvent(event);
345         return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
346     }
347 
348     private LogEvent ensureImmutable(final LogEvent event) {
349         LogEvent result = event;
350         if (event instanceof RingBufferLogEvent) {
351             // Deal with special case where both types of Async Loggers are used together:
352             // RingBufferLogEvents are created by the all-loggers-async type, but
353             // this event is also consumed by the some-loggers-async type (this class).
354             // The original event will be re-used and modified in an application thread later,
355             // so take a snapshot of it, which can be safely processed in the
356             // some-loggers-async background thread.
357             result = ((RingBufferLogEvent) event).createMemento();
358         }
359         return result;
360     }
361 
362     /*
363      * (non-Javadoc)
364      *
365      * @see org.apache.logging.log4j.core.async.AsyncLoggerConfigDelegate#createRingBufferAdmin(java.lang.String,
366      * java.lang.String)
367      */
368     @Override
369     public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
370         return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
371     }
372 }