View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  
18  package org.apache.logging.log4j.core.async;
19  
20  import java.util.concurrent.ExecutorService;
21  import java.util.concurrent.Executors;
22  import java.util.concurrent.TimeUnit;
23  
24  import org.apache.logging.log4j.Level;
25  import org.apache.logging.log4j.core.AbstractLifeCycle;
26  import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
27  import org.apache.logging.log4j.core.util.ExecutorServices;
28  import org.apache.logging.log4j.core.util.Log4jThreadFactory;
29  
30  import com.lmax.disruptor.ExceptionHandler;
31  import com.lmax.disruptor.RingBuffer;
32  import com.lmax.disruptor.TimeoutException;
33  import com.lmax.disruptor.WaitStrategy;
34  import com.lmax.disruptor.dsl.Disruptor;
35  import com.lmax.disruptor.dsl.ProducerType;
36  
37  /**
38   * Helper class for async loggers: AsyncLoggerDisruptor handles the mechanics of working with the LMAX Disruptor, and
39   * works with its associated AsyncLoggerContext to synchronize the life cycle of the Disruptor and its thread with the
40   * life cycle of the context. The AsyncLoggerDisruptor of the context is shared by all AsyncLogger objects created by
41   * that AsyncLoggerContext.
42   */
43  class AsyncLoggerDisruptor extends AbstractLifeCycle {
44      private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
45      private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
46  
47      private volatile Disruptor<RingBufferLogEvent> disruptor;
48      private ExecutorService executor;
49      private String contextName;
50  
51      private boolean useThreadLocalTranslator = true;
52      private long backgroundThreadId;
53      private AsyncQueueFullPolicy asyncQueueFullPolicy;
54      private int ringBufferSize;
55  
56      AsyncLoggerDisruptor(final String contextName) {
57          this.contextName = contextName;
58      }
59  
60      public String getContextName() {
61          return contextName;
62      }
63  
64      public void setContextName(final String name) {
65          contextName = name;
66      }
67  
68      Disruptor<RingBufferLogEvent> getDisruptor() {
69          return disruptor;
70      }
71  
72      /**
73       * Creates and starts a new Disruptor and associated thread if none currently exists.
74       *
75       * @see #stop()
76       */
77      @Override
78      public synchronized void start() {
79          if (disruptor != null) {
80              LOGGER.trace(
81                      "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
82                      contextName);
83              return;
84          }
85          LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
86          ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
87          final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
88          executor = Executors.newSingleThreadExecutor(Log4jThreadFactory.createDaemonThreadFactory("AsyncLogger[" + contextName + "]"));
89          backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
90          asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
91  
92          disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
93                  waitStrategy);
94  
95          final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
96          disruptor.handleExceptionsWith(errorHandler);
97  
98          final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
99          disruptor.handleEventsWith(handlers);
100 
101         LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
102                 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
103                 .getClass().getSimpleName(), errorHandler);
104         disruptor.start();
105 
106         LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
107                 : "vararg");
108         super.start();
109     }
110 
111     /**
112      * Decreases the reference count. If the reference count reached zero, the Disruptor and its associated thread are
113      * shut down and their references set to {@code null}.
114      */
115     @Override
116     public boolean stop(final long timeout, final TimeUnit timeUnit) {
117         final Disruptor<RingBufferLogEvent> temp = getDisruptor();
118         if (temp == null) {
119             LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
120             return true; // disruptor was already shut down by another thread
121         }
122         setStopping();
123         LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
124 
125         // We must guarantee that publishing to the RingBuffer has stopped before we call disruptor.shutdown().
126         disruptor = null; // client code fails with NPE if log after stop. This is by design.
127 
128         // Calling Disruptor.shutdown() will wait until all enqueued events are fully processed,
129         // but this waiting happens in a busy-spin. To avoid (postpone) wasting CPU,
130         // we sleep in short chunks, up to 10 seconds, waiting for the ringbuffer to drain.
131         for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
132             try {
133                 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS); // give up the CPU for a while
134             } catch (final InterruptedException e) { // ignored
135             }
136         }
137         try {
138             // busy-spins until all events currently in the disruptor have been processed
139             temp.shutdown(timeout, timeUnit);
140         } catch (final TimeoutException e) {
141             temp.shutdown();
142         } 
143 
144         LOGGER.trace("[{}] AsyncLoggerDisruptor: shutting down disruptor executor.", contextName);
145         // finally, kill the processor thread
146         ExecutorServices.shutdown(executor, timeout, timeUnit, toString());
147         executor = null;
148 
149         if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
150             LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
151                     DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
152         }
153         setStopped();
154         return true;
155     }
156 
157     /**
158      * Returns {@code true} if the specified disruptor still has unprocessed events.
159      */
160     private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
161         final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
162         return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
163     }
164 
165     /**
166      * Creates and returns a new {@code RingBufferAdmin} that instruments the ringbuffer of the {@code AsyncLogger}.
167      *
168      * @param jmxContextName name of the {@code AsyncLoggerContext}
169      * @return a new {@code RingBufferAdmin} that instruments the ringbuffer
170      */
171     public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
172         final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
173         return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
174     }
175 
176     EventRoute getEventRoute(final Level logLevel) {
177         final int remainingCapacity = remainingDisruptorCapacity();
178         if (remainingCapacity < 0) {
179             return EventRoute.DISCARD;
180         }
181         return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
182     }
183 
184     private int remainingDisruptorCapacity() {
185         final Disruptor<RingBufferLogEvent> temp = disruptor;
186         if (hasLog4jBeenShutDown(temp)) {
187             return -1;
188         }
189         return (int) temp.getRingBuffer().remainingCapacity();
190     }
191         /**
192          * Returns {@code true} if the specified disruptor is null.
193          */
194     private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
195         if (aDisruptor == null) { // LOG4J2-639
196             LOGGER.warn("Ignoring log event after log4j was shut down");
197             return true;
198         }
199         return false;
200     }
201 
202     public boolean tryPublish(final RingBufferLogEventTranslator translator) {
203         // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
204         try {
205             return disruptor.getRingBuffer().tryPublishEvent(translator);
206         } catch (final NullPointerException npe) {
207             LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
208             return false;
209         }
210     }
211 
212     void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
213         // LOG4J2-639: catch NPE if disruptor field was set to null in stop()
214         try {
215             // Note: we deliberately access the volatile disruptor field afresh here.
216             // Avoiding this and using an older reference could result in adding a log event to the disruptor after it
217             // was shut down, which could cause the publishEvent method to hang and never return.
218             disruptor.publishEvent(translator);
219         } catch (final NullPointerException npe) {
220             LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
221         }
222     }
223 
224     /**
225      * Returns whether it is allowed to store non-JDK classes in ThreadLocal objects for efficiency.
226      *
227      * @return whether AsyncLoggers are allowed to use ThreadLocal objects
228      * @since 2.5
229      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
230      */
231     public boolean isUseThreadLocals() {
232         return useThreadLocalTranslator;
233     }
234 
235     /**
236      * Signals this AsyncLoggerDisruptor whether it is allowed to store non-JDK classes in ThreadLocal objects for
237      * efficiency.
238      * <p>
239      * This property may be modified after the {@link #start()} method has been called.
240      * </p>
241      *
242      * @param allow whether AsyncLoggers are allowed to use ThreadLocal objects
243      * @since 2.5
244      * @see <a href="https://issues.apache.org/jira/browse/LOG4J2-1172">LOG4J2-1172</a>
245      */
246     public void setUseThreadLocals(final boolean allow) {
247         useThreadLocalTranslator = allow;
248         LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
249                 useThreadLocalTranslator ? "threadlocal" : "vararg");
250     }
251 }