1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22
23 import org.apache.logging.log4j.Logger;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26 import org.apache.logging.log4j.core.util.Integers;
27 import org.apache.logging.log4j.status.StatusLogger;
28 import org.apache.logging.log4j.util.PropertiesUtil;
29
30 import com.lmax.disruptor.BlockingWaitStrategy;
31 import com.lmax.disruptor.EventFactory;
32 import com.lmax.disruptor.EventHandler;
33 import com.lmax.disruptor.EventTranslatorTwoArg;
34 import com.lmax.disruptor.ExceptionHandler;
35 import com.lmax.disruptor.RingBuffer;
36 import com.lmax.disruptor.Sequence;
37 import com.lmax.disruptor.SequenceReportingEventHandler;
38 import com.lmax.disruptor.SleepingWaitStrategy;
39 import com.lmax.disruptor.WaitStrategy;
40 import com.lmax.disruptor.YieldingWaitStrategy;
41 import com.lmax.disruptor.dsl.Disruptor;
42 import com.lmax.disruptor.dsl.ProducerType;
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 class AsyncLoggerConfigHelper {
61
62 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
63 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
64 private static final int RINGBUFFER_MIN_SIZE = 128;
65 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
66 private static final Logger LOGGER = StatusLogger.getLogger();
67
68 private static ThreadFactory threadFactory = new DaemonThreadFactory("AsyncLoggerConfig-");
69 private static volatile Disruptor<Log4jEventWrapper> disruptor;
70 private static ExecutorService executor;
71
72 private static volatile int count = 0;
73 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<>();
74
75
76
77
78
79 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
80 @Override
81 public Log4jEventWrapper newInstance() {
82 return new Log4jEventWrapper();
83 }
84 };
85
86
87
88
89 private final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator
90 = new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
91
92 @Override
93 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
94 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
95 ringBufferElement.event = logEvent;
96 ringBufferElement.loggerConfig = loggerConfig;
97 }
98 };
99
100 private final AsyncLoggerConfig asyncLoggerConfig;
101
102 public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
103 this.asyncLoggerConfig = asyncLoggerConfig;
104 claim();
105 }
106
107 private static synchronized void initDisruptor() {
108 if (disruptor != null) {
109 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
110 return;
111 }
112 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
113 final int ringBufferSize = calculateRingBufferSize();
114 final WaitStrategy waitStrategy = createWaitStrategy();
115 executor = Executors.newSingleThreadExecutor(threadFactory);
116 initThreadLocalForExecutorThread();
117 disruptor = new Disruptor<>(FACTORY, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
118 final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {
119 new Log4jEventWrapperHandler() };
120 final ExceptionHandler<Log4jEventWrapper> errorHandler = getExceptionHandler();
121 disruptor.handleExceptionsWith(errorHandler);
122 disruptor.handleEventsWith(handlers);
123
124 LOGGER.debug(
125 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
126 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
127 disruptor.start();
128 }
129
130 private static WaitStrategy createWaitStrategy() {
131 final String strategy = System
132 .getProperty("AsyncLoggerConfig.WaitStrategy");
133 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
134 if ("Sleep".equals(strategy)) {
135 return new SleepingWaitStrategy();
136 } else if ("Yield".equals(strategy)) {
137 return new YieldingWaitStrategy();
138 } else if ("Block".equals(strategy)) {
139 return new BlockingWaitStrategy();
140 }
141 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
142 return new BlockingWaitStrategy();
143 }
144
145 private static int calculateRingBufferSize() {
146 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
147 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty(
148 "AsyncLoggerConfig.RingBufferSize",
149 String.valueOf(ringBufferSize));
150 try {
151 int size = Integer.parseInt(userPreferredRBSize);
152 if (size < RINGBUFFER_MIN_SIZE) {
153 size = RINGBUFFER_MIN_SIZE;
154 LOGGER.warn(
155 "Invalid RingBufferSize {}, using minimum size {}.",
156 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
157 }
158 ringBufferSize = size;
159 } catch (final Exception ex) {
160 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
161 userPreferredRBSize, ringBufferSize);
162 }
163 return Integers.ceilingNextPowerOfTwo(ringBufferSize);
164 }
165
166 private static ExceptionHandler<Log4jEventWrapper> getExceptionHandler() {
167 final String cls = System.getProperty("AsyncLoggerConfig.ExceptionHandler");
168 if (cls == null) {
169 return null;
170 }
171 try {
172 @SuppressWarnings("unchecked")
173 final Class<? extends ExceptionHandler<Log4jEventWrapper>> klass = (Class<? extends ExceptionHandler<Log4jEventWrapper>>) Class
174 .forName(cls);
175 return klass.newInstance();
176 } catch (final Exception ignored) {
177 LOGGER.debug("AsyncLoggerConfig.ExceptionHandler not set: error creating " + cls + ": ", ignored);
178 return null;
179 }
180 }
181
182
183
184
185
186 private static class Log4jEventWrapper {
187 private AsyncLoggerConfig loggerConfig;
188 private LogEvent event;
189
190
191
192
193
194 public void clear() {
195 loggerConfig = null;
196 event = null;
197 }
198 }
199
200
201
202
203 private static class Log4jEventWrapperHandler implements
204 SequenceReportingEventHandler<Log4jEventWrapper> {
205 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
206 private Sequence sequenceCallback;
207 private int counter;
208
209 @Override
210 public void setSequenceCallback(final Sequence sequenceCallback) {
211 this.sequenceCallback = sequenceCallback;
212 }
213
214 @Override
215 public void onEvent(final Log4jEventWrapper event, final long sequence,
216 final boolean endOfBatch) throws Exception {
217 event.event.setEndOfBatch(endOfBatch);
218 event.loggerConfig.asyncCallAppenders(event.event);
219 event.clear();
220
221 notifyIntermediateProgress(sequence);
222 }
223
224
225
226
227
228
229 private void notifyIntermediateProgress(final long sequence) {
230 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
231 sequenceCallback.set(sequence);
232 counter = 0;
233 }
234 }
235 }
236
237
238
239
240
241
242
243 synchronized static void claim() {
244 count++;
245 initDisruptor();
246 }
247
248
249
250
251
252
253 synchronized static void release() {
254 if (--count > 0) {
255 LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
256 return;
257 }
258 final Disruptor<Log4jEventWrapper> temp = disruptor;
259 if (temp == null) {
260 LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}. (Resetting to zero.)",
261 count);
262 count = 0;
263 return;
264 }
265 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
266
267
268
269 disruptor = null;
270
271
272
273
274 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
275 try {
276 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
277 } catch (final InterruptedException e) {
278 }
279 }
280 temp.shutdown();
281 executor.shutdown();
282 executor = null;
283 }
284
285
286
287
288 private static boolean hasBacklog(final Disruptor<?> disruptor) {
289 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
290 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
291 }
292
293
294
295
296
297
298 private static void initThreadLocalForExecutorThread() {
299 executor.submit(new Runnable() {
300 @Override
301 public void run() {
302 isAppenderThread.set(Boolean.TRUE);
303 }
304 });
305 }
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320 public boolean callAppendersFromAnotherThread(final LogEvent event) {
321 final Disruptor<Log4jEventWrapper> temp = disruptor;
322 if (!hasLog4jBeenShutDown(temp)) {
323
324
325
326 if (isCalledFromAppenderThreadAndBufferFull(temp)) {
327
328 return false;
329 }
330 enqueueEvent(event);
331 }
332 return true;
333 }
334
335
336
337
338 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
339 if (aDisruptor == null) {
340 LOGGER.fatal("Ignoring log event after log4j was shut down");
341 return true;
342 }
343 return false;
344 }
345
346 private void enqueueEvent(final LogEvent event) {
347
348 try {
349 final LogEvent logEvent = prepareEvent(event);
350 enqueue(logEvent);
351 } catch (final NullPointerException npe) {
352 LOGGER.fatal("Ignoring log event after log4j was shut down.");
353 }
354 }
355
356 private LogEvent prepareEvent(final LogEvent event) {
357 final LogEvent logEvent = ensureImmutable(event);
358 logEvent.getMessage().getFormattedMessage();
359 return logEvent;
360 }
361
362 private void enqueue(LogEvent logEvent) {
363
364
365
366 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
367 }
368
369 private LogEvent ensureImmutable(final LogEvent event) {
370 LogEvent result = event;
371 if (event instanceof RingBufferLogEvent) {
372
373
374
375
376
377
378 result = ((RingBufferLogEvent) event).createMemento();
379 }
380 return result;
381 }
382
383
384
385
386 private boolean isCalledFromAppenderThreadAndBufferFull(Disruptor<Log4jEventWrapper> disruptor) {
387 return isAppenderThread.get() == Boolean.TRUE && disruptor.getRingBuffer().remainingCapacity() == 0;
388 }
389
390
391
392
393
394
395
396
397 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
398 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
399 }
400
401 }