View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.async;
18  
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ThreadFactory;
22  
23  import org.apache.logging.log4j.Logger;
24  import org.apache.logging.log4j.core.LogEvent;
25  import org.apache.logging.log4j.status.StatusLogger;
26  
27  import com.lmax.disruptor.BlockingWaitStrategy;
28  import com.lmax.disruptor.EventFactory;
29  import com.lmax.disruptor.EventHandler;
30  import com.lmax.disruptor.EventTranslator;
31  import com.lmax.disruptor.ExceptionHandler;
32  import com.lmax.disruptor.RingBuffer;
33  import com.lmax.disruptor.Sequence;
34  import com.lmax.disruptor.SequenceReportingEventHandler;
35  import com.lmax.disruptor.SleepingWaitStrategy;
36  import com.lmax.disruptor.WaitStrategy;
37  import com.lmax.disruptor.YieldingWaitStrategy;
38  import com.lmax.disruptor.dsl.Disruptor;
39  import com.lmax.disruptor.dsl.ProducerType;
40  import com.lmax.disruptor.util.Util;
41  
42  /**
43   * Helper class decoupling the {@code AsyncLoggerConfig} class from the LMAX
44   * Disruptor library.
45   * <p>
46   * {@code AsyncLoggerConfig} is a plugin, and will be loaded even if users do
47   * not configure any {@code <asyncLogger>} or {@code <asyncRoot>} elements in
48   * the configuration. If {@code AsyncLoggerConfig} has inner classes that extend
49   * or implement classes from the Disruptor library, a
50   * {@code NoClassDefFoundError} is thrown if the Disruptor jar is not in the
51   * classpath when the PluginManager loads the {@code AsyncLoggerConfig} plugin
52   * from the pre-defined plugins definition file.
53   * <p>
54   * This class serves to make the dependency on the Disruptor optional, so that
55   * these classes are only loaded when the {@code AsyncLoggerConfig} is actually
56   * used.
57   */
58  class AsyncLoggerConfigHelper {
59  
60      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
61      private static final int HALF_A_SECOND = 500;
62      private static final int RINGBUFFER_MIN_SIZE = 128;
63      private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
64      private static final Logger LOGGER = StatusLogger.getLogger();
65  
66      private static ThreadFactory threadFactory = new DaemonThreadFactory(
67              "AsyncLoggerConfig-");
68      private static volatile Disruptor<Log4jEventWrapper> disruptor;
69      private static ExecutorService executor;
70  
71      private static volatile int count = 0;
72  
73      /**
74       * Factory used to populate the RingBuffer with events. These event objects
75       * are then re-used during the life of the RingBuffer.
76       */
77      private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
78          @Override
79          public Log4jEventWrapper newInstance() {
80              return new Log4jEventWrapper();
81          }
82      };
83  
84      /**
85       * Object responsible for passing on data to a specific RingBuffer event.
86       */
87      private final EventTranslator<Log4jEventWrapper> translator = new EventTranslator<Log4jEventWrapper>() {
88          @Override
89          public void translateTo(final Log4jEventWrapper event,
90                  final long sequence) {
91              event.event = currentLogEvent.get();
92              event.loggerConfig = asyncLoggerConfig;
93          }
94      };
95  
96      private final ThreadLocal<LogEvent> currentLogEvent = new ThreadLocal<LogEvent>();
97      private final AsyncLoggerConfig asyncLoggerConfig;
98  
99      public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
100         this.asyncLoggerConfig = asyncLoggerConfig;
101         claim();
102     }
103 
104     private static synchronized void initDisruptor() {
105         if (disruptor != null) {
106             LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
107             return;
108         }
109         LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
110         final int ringBufferSize = calculateRingBufferSize();
111         final WaitStrategy waitStrategy = createWaitStrategy();
112         executor = Executors.newSingleThreadExecutor(threadFactory);
113         disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
114                 executor, ProducerType.MULTI, waitStrategy);
115         final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {//
116         new Log4jEventWrapperHandler() };
117         final ExceptionHandler errorHandler = getExceptionHandler();
118         disruptor.handleExceptionsWith(errorHandler);
119         disruptor.handleEventsWith(handlers);
120 
121         LOGGER.debug(
122                 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
123                 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
124         disruptor.start();
125     }
126 
127     private static WaitStrategy createWaitStrategy() {
128         final String strategy = System
129                 .getProperty("AsyncLoggerConfig.WaitStrategy");
130         LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
131         if ("Sleep".equals(strategy)) {
132             return new SleepingWaitStrategy();
133         } else if ("Yield".equals(strategy)) {
134             return new YieldingWaitStrategy();
135         } else if ("Block".equals(strategy)) {
136             return new BlockingWaitStrategy();
137         }
138         return new SleepingWaitStrategy();
139     }
140 
141     private static int calculateRingBufferSize() {
142         int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
143         final String userPreferredRBSize = System.getProperty(
144                 "AsyncLoggerConfig.RingBufferSize",
145                 String.valueOf(ringBufferSize));
146         try {
147             int size = Integer.parseInt(userPreferredRBSize);
148             if (size < RINGBUFFER_MIN_SIZE) {
149                 size = RINGBUFFER_MIN_SIZE;
150                 LOGGER.warn(
151                         "Invalid RingBufferSize {}, using minimum size {}.",
152                         userPreferredRBSize, RINGBUFFER_MIN_SIZE);
153             }
154             ringBufferSize = size;
155         } catch (final Exception ex) {
156             LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
157                     userPreferredRBSize, ringBufferSize);
158         }
159         return Util.ceilingNextPowerOfTwo(ringBufferSize);
160     }
161 
162     private static ExceptionHandler getExceptionHandler() {
163         final String cls = System
164                 .getProperty("AsyncLoggerConfig.ExceptionHandler");
165         if (cls == null) {
166             return null;
167         }
168         try {
169             @SuppressWarnings("unchecked")
170             final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
171                     .forName(cls);
172             final ExceptionHandler result = klass.newInstance();
173             return result;
174         } catch (final Exception ignored) {
175             LOGGER.debug(
176                     "AsyncLoggerConfig.ExceptionHandler not set: error creating "
177                             + cls + ": ", ignored);
178             return null;
179         }
180     }
181 
182     /**
183      * RingBuffer events contain all information necessary to perform the work
184      * in a separate thread.
185      */
186     private static class Log4jEventWrapper {
187         private AsyncLoggerConfig loggerConfig;
188         private LogEvent event;
189 
190         /**
191          * Release references held by ring buffer to allow objects to be
192          * garbage-collected.
193          */
194         public void clear() {
195             loggerConfig = null;
196             event = null;
197         }
198     }
199 
200     /**
201      * EventHandler performs the work in a separate thread.
202      */
203     private static class Log4jEventWrapperHandler implements
204             SequenceReportingEventHandler<Log4jEventWrapper> {
205         private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
206         private Sequence sequenceCallback;
207         private int counter;
208 
209         @Override
210         public void setSequenceCallback(final Sequence sequenceCallback) {
211             this.sequenceCallback = sequenceCallback;
212         }
213 
214         @Override
215         public void onEvent(final Log4jEventWrapper event, final long sequence,
216                 final boolean endOfBatch) throws Exception {
217             event.event.setEndOfBatch(endOfBatch);
218             event.loggerConfig.asyncCallAppenders(event.event);
219             event.clear();
220 
221             // notify the BatchEventProcessor that the sequence has progressed.
222             // Without this callback the sequence would not be progressed
223             // until the batch has completely finished.
224             if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
225                 sequenceCallback.set(sequence);
226                 counter = 0;
227             }
228         }
229     }
230 
231     /**
232      * Increases the reference count and creates and starts a new Disruptor and
233      * associated thread if none currently exists.
234      * 
235      * @see #release()
236      */
237     synchronized static void claim() {
238         count++;
239         initDisruptor();
240     }
241 
242     /**
243      * Decreases the reference count. If the reference count reached zero, the
244      * Disruptor and its associated thread are shut down and their references
245      * set to {@code null}.
246      */
247     synchronized static void release() {
248         if (--count > 0) {
249             LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
250             return;
251         }
252         final Disruptor<Log4jEventWrapper> temp = disruptor;
253         if (temp == null) {
254             LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}.", count);
255             return; // disruptor was already shut down by another thread
256         }
257         LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
258 
259         // Must guarantee that publishing to the RingBuffer has stopped
260         // before we call disruptor.shutdown()
261         disruptor = null; // client code fails with NPE if log after stop = OK
262         temp.shutdown();
263 
264         // wait up to 10 seconds for the ringbuffer to drain
265         final RingBuffer<Log4jEventWrapper> ringBuffer = temp.getRingBuffer();
266         for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
267             if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
268                 break;
269             }
270             try {
271                 // give ringbuffer some time to drain...
272                 Thread.sleep(HALF_A_SECOND);
273             } catch (final InterruptedException e) {
274                 // ignored
275             }
276         }
277         executor.shutdown(); // finally, kill the processor thread
278         executor = null; // release reference to allow GC
279     }
280 
281     public void callAppendersFromAnotherThread(final LogEvent event) {
282         currentLogEvent.set(event);
283         disruptor.publishEvent(translator);
284     }
285 
286 }