View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.async;
19  
20  import java.util.concurrent.ThreadFactory;
21  import java.util.concurrent.TimeUnit;
22  
23  import org.apache.logging.log4j.Level;
24  import org.apache.logging.log4j.core.AbstractLifeCycle;
25  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
27  import org.apache.logging.log4j.core.util.Throwables;
28  
29  import com.lmax.disruptor.ExceptionHandler;
30  import com.lmax.disruptor.RingBuffer;
31  import com.lmax.disruptor.TimeoutException;
32  import com.lmax.disruptor.WaitStrategy;
33  import com.lmax.disruptor.dsl.Disruptor;
34  import com.lmax.disruptor.dsl.ProducerType;
35  
36  /**
37   * Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and
38   * works with its associated AsyncLoggerContext to synchronize the life cycle of the Disruptor and its thread with the
39   * life cycle of the context. The AsyncLoggerDisruptor of the context is shared by all AsyncLogger objects created by
40   * that AsyncLoggerContext.
41   */
42  class AsyncLoggerDisruptor extends AbstractLifeCycle {
43      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
44      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
45  
46      private volatile Disruptor<RingBufferLogEvent> disruptor;
47      private String contextName;
48  
49      private boolean useThreadLocalTranslator = true;
50      private long backgroundThreadId;
51      private AsyncQueueFullPolicy asyncQueueFullPolicy;
52      private int ringBufferSize;
53  
54      AsyncLoggerDisruptor(final String contextName) {
55          this.contextName = contextName;
56      }
57  
58      public String getContextName() {
59          return contextName;
60      }
61  
62      public void setContextName(final String name) {
63          contextName = name;
64      }
65  
66      Disruptor<RingBufferLogEvent> getDisruptor() {
67          return disruptor;
68      }
69  
70      /**
71       * Creates and starts a new Disruptor and associated thread if none currently exists.
72       *
73       * @see #stop()
74       */
75      @Override
76      public synchronized void start() {
77          if (disruptor != null) {
78              LOGGER.trace(
79                      "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
80                      contextName);
81              return;
82          }
83          LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
84          ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
85          final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
86  
87          final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLogger[" + contextName + "]", true, Thread.NORM_PRIORITY) {
88              @Override
89              public Thread newThread(final Runnable r) {
90                  final Thread result = super.newThread(r);
91                  backgroundThreadId = result.getId();
92                  return result;
93              }
94          };
95          asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
96  
97          disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, threadFactory, ProducerType.MULTI,
98                  waitStrategy);
99  
100         final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
101         disruptor.setDefaultExceptionHandler(errorHandler);
102 
103         final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
104         disruptor.handleEventsWith(handlers);
105 
106         LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
107                 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
108                 .getClass().getSimpleName(), errorHandler);
109         disruptor.start();
110 
111         LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
112                 : "vararg");
113         super.start();
114     }
115 
116     /**
117      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
118      * shut down and their references set to {@code null}.
119      */
120     @Override
121     public boolean stop(final long timeout, final TimeUnit timeUnit) {
122         final Disruptor<RingBufferLogEvent> temp = getDisruptor();
123         if (temp == null) {
124             LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
125             return true; // disruptor was already shut down by another thread
126         }
127         setStopping();
128         LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
129 
130         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
131         disruptor = null; // client code fails with NPE if log after stop. This is by design.
132 
133         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
134         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
135         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
136         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
137             try {
138                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
139             } catch (final InterruptedException e) { // ignored
140             }
141         }
142         try {
143             // busy-spins until all events currently in the disruptor have been processed, or timeout
144             temp.shutdown(timeout, timeUnit);
145         } catch (final TimeoutException e) {
146             LOGGER.warn("[{}] AsyncLoggerDisruptor: shutdown timed out after {} {}", contextName, timeout, timeUnit);
147             temp.halt(); // give up on remaining log events, if any
148         }
149 
150         LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor has been shut down.", contextName);
151 
152         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
153             LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
154                     DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
155         }
156         setStopped();
157         return true;
158     }
159 
160     /**
161      * Returns {@code true} if the specified disruptor still has unprocessed events.
162      */
163     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
164         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
165         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
166     }
167 
168     /**
169      * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}.
170      *
171      * @param jmxContextName name of the {@code AsyncLoggerContext}
172      * @return a new {@code RingBufferAdmin} that instruments the ringbuffer
173      */
174     public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
175         final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
176         return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
177     }
178 
179     EventRoute getEventRoute(final Level logLevel) {
180         final int remainingCapacity = remainingDisruptorCapacity();
181         if (remainingCapacity < 0) {
182             return EventRoute.DISCARD;
183         }
184         return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
185     }
186 
187     private int remainingDisruptorCapacity() {
188         final Disruptor<RingBufferLogEvent> temp = disruptor;
189         if (hasLog4jBeenShutDown(temp)) {
190             return -1;
191         }
192         return (int) temp.getRingBuffer().remainingCapacity();
193     }
194         /**
195          * Returns {@code true} if the specified disruptor is null.
196          */
197     private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
198         if (aDisruptor == null) { // LOG4J2-639
199             LOGGER.warn("Ignoring log event after log4j was shut down");
200             return true;
201         }
202         return false;
203     }
204 
205     public boolean tryPublish(final RingBufferLogEventTranslator translator) {
206         try {
207             return disruptor.getRingBuffer().tryPublishEvent(translator);
208         } catch (final NullPointerException npe) {
209             // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
210             LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
211                     translator.level, translator.loggerName, translator.message.getFormattedMessage()
212                             + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
213             return false;
214         }
215     }
216 
217     void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
218         try {
219             // Note: we deliberately access the volatile disruptor field afresh here.
220             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
221             // was shut down, which could cause the publishEvent method to hang and never return.
222             disruptor.publishEvent(translator);
223         } catch (final NullPointerException npe) {
224             // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
225             LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
226                     translator.level, translator.loggerName, translator.message.getFormattedMessage()
227                             + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
228         }
229     }
230 
231     /**
232      * Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
233      *
234      * @return whether AsyncLoggers are allowed to use ThreadLocal objects
235      * @since 2.5
236      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
237      */
238     public boolean isUseThreadLocals() {
239         return useThreadLocalTranslator;
240     }
241 
242     /**
243      * Signals this AsyncLoggerDisruptor whether it is allowed to store non-JDK classes in ThreadLocal objects for
244      * efficiency.
245      * <p>
246      * This property may be modified after the {@link #start()} method has been called.
247      * </p>
248      *
249      * @param allow whether AsyncLoggers are allowed to use ThreadLocal objects
250      * @since 2.5
251      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
252      */
253     public void setUseThreadLocals(final boolean allow) {
254         useThreadLocalTranslator = allow;
255         LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
256                 useThreadLocalTranslator ? "threadlocal" : "vararg");
257     }
258 }