1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.Objects;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23
24 import org.apache.logging.log4j.Level;
25 import org.apache.logging.log4j.Marker;
26 import org.apache.logging.log4j.ThreadContext;
27 import org.apache.logging.log4j.core.Logger;
28 import org.apache.logging.log4j.core.LoggerContext;
29 import org.apache.logging.log4j.core.config.Property;
30 import org.apache.logging.log4j.core.config.ReliabilityStrategy;
31 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
33 import org.apache.logging.log4j.core.util.Clock;
34 import org.apache.logging.log4j.core.util.ClockFactory;
35 import org.apache.logging.log4j.core.util.DummyNanoClock;
36 import org.apache.logging.log4j.core.util.Integers;
37 import org.apache.logging.log4j.core.util.Loader;
38 import org.apache.logging.log4j.core.util.NanoClock;
39 import org.apache.logging.log4j.message.Message;
40 import org.apache.logging.log4j.message.MessageFactory;
41 import org.apache.logging.log4j.message.TimestampMessage;
42 import org.apache.logging.log4j.status.StatusLogger;
43 import org.apache.logging.log4j.util.PropertiesUtil;
44
45 import com.lmax.disruptor.BlockingWaitStrategy;
46 import com.lmax.disruptor.ExceptionHandler;
47 import com.lmax.disruptor.RingBuffer;
48 import com.lmax.disruptor.SleepingWaitStrategy;
49 import com.lmax.disruptor.WaitStrategy;
50 import com.lmax.disruptor.YieldingWaitStrategy;
51 import com.lmax.disruptor.dsl.Disruptor;
52 import com.lmax.disruptor.dsl.ProducerType;
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public class AsyncLogger extends Logger {
83 private static final long serialVersionUID = 1L;
84 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
85 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
86 private static final int RINGBUFFER_MIN_SIZE = 128;
87 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
88 private static final StatusLogger LOGGER = StatusLogger.getLogger();
89 private static final ThreadNameStrategy THREAD_NAME_STRATEGY = ThreadNameStrategy.create();
90
91 static enum ThreadNameStrategy {
92 CACHED {
93 @Override
94 public String getThreadName(final Info info) {
95 return info.cachedThreadName;
96 }
97 },
98 UNCACHED {
99 @Override
100 public String getThreadName(final Info info) {
101 return Thread.currentThread().getName();
102 }
103 };
104 abstract String getThreadName(Info info);
105
106 static ThreadNameStrategy create() {
107 final String name = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ThreadNameStrategy", CACHED.name());
108 try {
109 return ThreadNameStrategy.valueOf(name);
110 } catch (final Exception ex) {
111 LOGGER.debug("Using AsyncLogger.ThreadNameStrategy.CACHED: '{}' not valid: {}", name, ex.toString());
112 return CACHED;
113 }
114 }
115 }
116 private static volatile Disruptor<RingBufferLogEvent> disruptor;
117 private static final Clock CLOCK = ClockFactory.getClock();
118 private static volatile NanoClock nanoClock = new DummyNanoClock();
119
120 private static final ExecutorService executor = Executors
121 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
122
123 static {
124 initInfoForExecutorThread();
125 LOGGER.debug("AsyncLogger.ThreadNameStrategy={}", THREAD_NAME_STRATEGY);
126 final int ringBufferSize = calculateRingBufferSize();
127
128 final WaitStrategy waitStrategy = createWaitStrategy();
129 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
130 waitStrategy);
131 disruptor.handleExceptionsWith(getExceptionHandler());
132 disruptor.handleEventsWith(new RingBufferLogEventHandler());
133
134 LOGGER.debug("Starting AsyncLogger disruptor with ringbuffer size {}...", disruptor.getRingBuffer()
135 .getBufferSize());
136 disruptor.start();
137 }
138
139 private static int calculateRingBufferSize() {
140 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
141 final String userPreferredRBSize = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.RingBufferSize",
142 String.valueOf(ringBufferSize));
143 try {
144 int size = Integer.parseInt(userPreferredRBSize);
145 if (size < RINGBUFFER_MIN_SIZE) {
146 size = RINGBUFFER_MIN_SIZE;
147 LOGGER.warn("Invalid RingBufferSize {}, using minimum size {}.", userPreferredRBSize,
148 RINGBUFFER_MIN_SIZE);
149 }
150 ringBufferSize = size;
151 } catch (final Exception ex) {
152 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.", userPreferredRBSize, ringBufferSize);
153 }
154 return Integers.ceilingNextPowerOfTwo(ringBufferSize);
155 }
156
157
158
159
160
161
162
163
164 private static void initInfoForExecutorThread() {
165 executor.submit(new Runnable(){
166 @Override
167 public void run() {
168 final boolean isAppenderThread = true;
169 final Info info = new Info(new RingBufferLogEventTranslator(),
170 Thread.currentThread().getName(), isAppenderThread);
171 Info.threadlocalInfo.set(info);
172 }
173 });
174 }
175
176 private static WaitStrategy createWaitStrategy() {
177 final String strategy = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.WaitStrategy");
178 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
179 if ("Sleep".equals(strategy)) {
180 return new SleepingWaitStrategy();
181 } else if ("Yield".equals(strategy)) {
182 return new YieldingWaitStrategy();
183 } else if ("Block".equals(strategy)) {
184 return new BlockingWaitStrategy();
185 }
186 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
187 return new BlockingWaitStrategy();
188 }
189
190 private static ExceptionHandler<RingBufferLogEvent> getExceptionHandler() {
191 final String cls = PropertiesUtil.getProperties().getStringProperty("AsyncLogger.ExceptionHandler");
192 if (cls == null) {
193 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
194 return null;
195 }
196 try {
197 @SuppressWarnings("unchecked")
198 final ExceptionHandler<RingBufferLogEvent> result = Loader.newCheckedInstanceOf(cls, ExceptionHandler.class);
199 LOGGER.debug("AsyncLogger.ExceptionHandler={}", result);
200 return result;
201 } catch (final Exception ignored) {
202 LOGGER.debug("AsyncLogger.ExceptionHandler not set: error creating " + cls + ": ", ignored);
203 return null;
204 }
205 }
206
207
208
209
210
211
212
213
214
215 public AsyncLogger(final LoggerContext context, final String name, final MessageFactory messageFactory) {
216 super(context, name, messageFactory);
217 }
218
219
220
221
222 static class Info {
223 private static final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>() {
224 @Override
225 protected Info initialValue() {
226
227 return new Info(new RingBufferLogEventTranslator(), Thread.currentThread().getName(), false);
228 }
229 };
230 private final RingBufferLogEventTranslator translator;
231 private final String cachedThreadName;
232 private final boolean isAppenderThread;
233
234 public Info(final RingBufferLogEventTranslator translator, final String threadName, final boolean appenderThread) {
235 this.translator = translator;
236 this.cachedThreadName = threadName;
237 this.isAppenderThread = appenderThread;
238 }
239
240
241 private String threadName() {
242 return THREAD_NAME_STRATEGY.getThreadName(this);
243 }
244 }
245
246 @Override
247 public void logMessage(final String fqcn, final Level level, final Marker marker, final Message message,
248 final Throwable thrown) {
249
250 final Disruptor<RingBufferLogEvent> temp = disruptor;
251 if (temp == null) {
252 LOGGER.fatal("Ignoring log event after log4j was shut down");
253 } else {
254 logMessage0(temp, fqcn, level, marker, message, thrown);
255 }
256 }
257
258 private void logMessage0(final Disruptor<RingBufferLogEvent> theDisruptor, final String fqcn, final Level level,
259 final Marker marker, final Message message, final Throwable thrown) {
260 final Info info = Info.threadlocalInfo.get();
261 logMessageInAppropriateThread(info, theDisruptor, fqcn, level, marker, message, thrown);
262 }
263
264 private void logMessageInAppropriateThread(final Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
265 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
266 if (!logMessageInCurrentThread(info, theDisruptor, fqcn, level, marker, message, thrown)) {
267 logMessageInBackgroundThread(info, fqcn, level, marker, message, thrown);
268 }
269 }
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285 private boolean logMessageInCurrentThread(Info info, final Disruptor<RingBufferLogEvent> theDisruptor,
286 final String fqcn, final Level level, final Marker marker, final Message message, final Throwable thrown) {
287 if (info.isAppenderThread && theDisruptor.getRingBuffer().remainingCapacity() == 0) {
288
289 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
290 strategy.log(this, getName(), fqcn, marker, level, message, thrown);
291 return true;
292 }
293 return false;
294 }
295
296
297
298
299
300
301
302
303
304
305
306 private void logMessageInBackgroundThread(Info info, final String fqcn, final Level level, final Marker marker,
307 final Message message, final Throwable thrown) {
308
309 message.getFormattedMessage();
310
311 initLogMessageInfo(info, fqcn, level, marker, message, thrown);
312 enqueueLogMessageInfo(info);
313 }
314
315 private void initLogMessageInfo(Info info, final String fqcn, final Level level, final Marker marker,
316 final Message message, final Throwable thrown) {
317 info.translator.setValues(this, getName(), marker, fqcn, level, message,
318
319 thrown,
320
321
322
323
324
325 ThreadContext.getImmutableContext(),
326
327
328 ThreadContext.getImmutableStack(),
329
330
331
332 info.threadName(),
333
334
335
336
337 calcLocationIfRequested(fqcn),
338
339
340
341
342
343 eventTimeMillis(message),
344 nanoClock.nanoTime()
345 );
346 }
347
348 private long eventTimeMillis(final Message message) {
349 return message instanceof TimestampMessage ? ((TimestampMessage) message).getTimestamp() :
350 CLOCK.currentTimeMillis();
351 }
352
353
354
355
356
357
358 private StackTraceElement calcLocationIfRequested(String fqcn) {
359 final boolean includeLocation = privateConfig.loggerConfig.isIncludeLocation();
360 return includeLocation ? location(fqcn) : null;
361 }
362
363 private void enqueueLogMessageInfo(Info info) {
364
365 try {
366
367
368
369 disruptor.publishEvent(info.translator);
370 } catch (final NullPointerException npe) {
371 LOGGER.fatal("Ignoring log event after log4j was shut down.");
372 }
373 }
374
375 private static StackTraceElement location(final String fqcnOfLogger) {
376 return Log4jLogEvent.calcLocation(fqcnOfLogger);
377 }
378
379
380
381
382
383
384
385 public void actualAsyncLog(final RingBufferLogEvent event) {
386 final Map<Property, Boolean> properties = privateConfig.loggerConfig.getProperties();
387 event.mergePropertiesIntoContextMap(properties, privateConfig.config.getStrSubstitutor());
388 final ReliabilityStrategy strategy = privateConfig.loggerConfig.getReliabilityStrategy();
389 strategy.log(this, event);
390 }
391
392 public static void stop() {
393 final Disruptor<RingBufferLogEvent> temp = disruptor;
394
395
396
397 disruptor = null;
398 if (temp == null) {
399 return;
400 }
401
402
403
404
405 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
406 try {
407 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
408 } catch (final InterruptedException e) {
409 }
410 }
411 temp.shutdown();
412 executor.shutdown();
413 Info.threadlocalInfo.remove();
414 }
415
416
417
418
419 private static boolean hasBacklog(final Disruptor<?> disruptor) {
420 final RingBuffer<?> ringBuffer = disruptor.getRingBuffer();
421 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
422 }
423
424
425
426
427
428
429
430 public static RingBufferAdmin createRingBufferAdmin(final String contextName) {
431 return RingBufferAdmin.forAsyncLogger(disruptor.getRingBuffer(), contextName);
432 }
433
434
435
436
437
438 public static NanoClock getNanoClock() {
439 return nanoClock;
440 }
441
442
443
444
445
446
447
448
449
450 public static void setNanoClock(NanoClock nanoClock) {
451 AsyncLogger.nanoClock = Objects.requireNonNull(nanoClock, "NanoClock must be non-null");
452 }
453 }