1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ThreadFactory;
20 import java.util.concurrent.TimeUnit;
21
22 import org.apache.logging.log4j.Level;
23 import org.apache.logging.log4j.core.AbstractLifeCycle;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
26 import org.apache.logging.log4j.core.impl.LogEventFactory;
27 import org.apache.logging.log4j.core.impl.MutableLogEvent;
28 import org.apache.logging.log4j.core.impl.ReusableLogEventFactory;
29 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
30 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
31 import org.apache.logging.log4j.core.util.Throwables;
32 import org.apache.logging.log4j.message.ReusableMessage;
33
34 import com.lmax.disruptor.EventFactory;
35 import com.lmax.disruptor.EventTranslatorTwoArg;
36 import com.lmax.disruptor.ExceptionHandler;
37 import com.lmax.disruptor.RingBuffer;
38 import com.lmax.disruptor.Sequence;
39 import com.lmax.disruptor.SequenceReportingEventHandler;
40 import com.lmax.disruptor.TimeoutException;
41 import com.lmax.disruptor.WaitStrategy;
42 import com.lmax.disruptor.dsl.Disruptor;
43 import com.lmax.disruptor.dsl.ProducerType;
44
45
46
47
48
49
50
51
52
53
54
55
56
57 public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate {
58
59 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
60 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
61
62
63
64
65 public static class Log4jEventWrapper {
66 public Log4jEventWrapper() {
67 }
68
69 public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) {
70 event = mutableLogEvent;
71 }
72
73 private AsyncLoggerConfig loggerConfig;
74 private LogEvent event;
75
76
77
78
79 public void clear() {
80 loggerConfig = null;
81 if (event instanceof MutableLogEvent) {
82 ((MutableLogEvent) event).clear();
83 } else {
84 event = null;
85 }
86 }
87
88 @Override
89 public String toString() {
90 return String.valueOf(event);
91 }
92 }
93
94
95
96
97 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
98 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
99 private Sequence sequenceCallback;
100 private int counter;
101
102 @Override
103 public void setSequenceCallback(final Sequence sequenceCallback) {
104 this.sequenceCallback = sequenceCallback;
105 }
106
107 @Override
108 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
109 throws Exception {
110 event.event.setEndOfBatch(endOfBatch);
111 event.loggerConfig.logToAsyncLoggerConfigsOnCurrentThread(event.event);
112 event.clear();
113
114 notifyIntermediateProgress(sequence);
115 }
116
117
118
119
120
121 private void notifyIntermediateProgress(final long sequence) {
122 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
123 sequenceCallback.set(sequence);
124 counter = 0;
125 }
126 }
127 }
128
129
130
131
132
133 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
134 @Override
135 public Log4jEventWrapper newInstance() {
136 return new Log4jEventWrapper();
137 }
138 };
139
140
141
142
143
144 private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() {
145 @Override
146 public Log4jEventWrapper newInstance() {
147 return new Log4jEventWrapper(new MutableLogEvent());
148 }
149 };
150
151
152
153
154 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
155 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
156
157 @Override
158 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
159 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
160 ringBufferElement.event = logEvent;
161 ringBufferElement.loggerConfig = loggerConfig;
162 }
163 };
164
165
166
167
168 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
169 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
170
171 @Override
172 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
173 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
174 ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
175 ringBufferElement.loggerConfig = loggerConfig;
176 }
177 };
178
179 private int ringBufferSize;
180 private AsyncQueueFullPolicy asyncQueueFullPolicy;
181 private Boolean mutable = Boolean.FALSE;
182
183 private volatile Disruptor<Log4jEventWrapper> disruptor;
184 private long backgroundThreadId;
185 private EventFactory<Log4jEventWrapper> factory;
186 private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator;
187 private volatile boolean alreadyLoggedWarning = false;
188
189 private final Object queueFullEnqueueLock = new Object();
190
191 public AsyncLoggerConfigDisruptor() {
192 }
193
194
195 @Override
196 public void setLogEventFactory(final LogEventFactory logEventFactory) {
197
198
199 this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory);
200 }
201
202
203
204
205
206
207
208 @Override
209 public synchronized void start() {
210 if (disruptor != null) {
211 LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "
212 + "using existing object.");
213 return;
214 }
215 LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");
216 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
217 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
218
219 final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLoggerConfig", true, Thread.NORM_PRIORITY) {
220 @Override
221 public Thread newThread(final Runnable r) {
222 final Thread result = super.newThread(r);
223 backgroundThreadId = result.getId();
224 return result;
225 }
226 };
227 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
228
229 translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;
230 factory = mutable ? MUTABLE_FACTORY : FACTORY;
231 disruptor = new Disruptor<>(factory, ringBufferSize, threadFactory, ProducerType.MULTI, waitStrategy);
232
233 final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
234 disruptor.setDefaultExceptionHandler(errorHandler);
235
236 final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
237 disruptor.handleEventsWith(handlers);
238
239 LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
240 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
241 .getClass().getSimpleName(), errorHandler);
242 disruptor.start();
243 super.start();
244 }
245
246
247
248
249
250 @Override
251 public boolean stop(final long timeout, final TimeUnit timeUnit) {
252 final Disruptor<Log4jEventWrapper> temp = disruptor;
253 if (temp == null) {
254 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down.");
255 return true;
256 }
257 setStopping();
258 LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration.");
259
260
261 disruptor = null;
262
263
264
265
266 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
267 try {
268 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
269 } catch (final InterruptedException e) {
270 }
271 }
272 try {
273
274 temp.shutdown(timeout, timeUnit);
275 } catch (final TimeoutException e) {
276 LOGGER.warn("AsyncLoggerConfigDisruptor: shutdown timed out after {} {}", timeout, timeUnit);
277 temp.halt();
278 }
279 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor has been shut down.");
280
281 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
282 LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
283 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
284 }
285 setStopped();
286 return true;
287 }
288
289
290
291
292 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
293 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
294 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
295 }
296
297 @Override
298 public EventRoute getEventRoute(final Level logLevel) {
299 final int remainingCapacity = remainingDisruptorCapacity();
300 if (remainingCapacity < 0) {
301 return EventRoute.DISCARD;
302 }
303 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
304 }
305
306 private int remainingDisruptorCapacity() {
307 final Disruptor<Log4jEventWrapper> temp = disruptor;
308 if (hasLog4jBeenShutDown(temp)) {
309 return -1;
310 }
311 return (int) temp.getRingBuffer().remainingCapacity();
312 }
313
314
315
316
317 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
318 if (aDisruptor == null) {
319 LOGGER.warn("Ignoring log event after log4j was shut down");
320 return true;
321 }
322 return false;
323 }
324
325 @Override
326 public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
327
328 try {
329 final LogEvent logEvent = prepareEvent(event);
330 enqueue(logEvent, asyncLoggerConfig);
331 } catch (final NullPointerException npe) {
332
333
334 LOGGER.warn("Ignoring log event after log4j was shut down: {} [{}] {}", event.getLevel(),
335 event.getLoggerName(), event.getMessage().getFormattedMessage()
336 + (event.getThrown() == null ? "" : Throwables.toStringList(event.getThrown())));
337 }
338 }
339
340 private LogEvent prepareEvent(final LogEvent event) {
341 LogEvent logEvent = ensureImmutable(event);
342 if (logEvent.getMessage() instanceof ReusableMessage) {
343 if (logEvent instanceof Log4jLogEvent) {
344 ((Log4jLogEvent) logEvent).makeMessageImmutable();
345 } else if (logEvent instanceof MutableLogEvent) {
346
347
348 if (translator != MUTABLE_TRANSLATOR) {
349
350 logEvent = ((MutableLogEvent) logEvent).createMemento();
351 }
352 } else {
353 showWarningAboutCustomLogEventWithReusableMessage(logEvent);
354 }
355 } else {
356 InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage());
357 }
358 return logEvent;
359 }
360
361 private void showWarningAboutCustomLogEventWithReusableMessage(final LogEvent logEvent) {
362 if (!alreadyLoggedWarning) {
363 LOGGER.warn("Custom log event of type {} contains a mutable message of type {}." +
364 " AsyncLoggerConfig does not know how to make an immutable copy of this message." +
365 " This may result in ConcurrentModificationExceptions or incorrect log messages" +
366 " if the application modifies objects in the message while" +
367 " the background thread is writing it to the appenders.",
368 logEvent.getClass().getName(), logEvent.getMessage().getClass().getName());
369 alreadyLoggedWarning = true;
370 }
371 }
372
373 private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
374 if (synchronizeEnqueueWhenQueueFull()) {
375 synchronized (queueFullEnqueueLock) {
376 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
377 }
378 } else {
379 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
380 }
381 }
382
383 private boolean synchronizeEnqueueWhenQueueFull() {
384 return DisruptorUtil.ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL
385
386 && backgroundThreadId != Thread.currentThread().getId();
387 }
388
389 @Override
390 public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
391 final LogEvent logEvent = prepareEvent(event);
392 return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
393 }
394
395 private LogEvent ensureImmutable(final LogEvent event) {
396 LogEvent result = event;
397 if (event instanceof RingBufferLogEvent) {
398
399
400
401
402
403
404 result = ((RingBufferLogEvent) event).createMemento();
405 }
406 return result;
407 }
408
409
410
411
412
413
414
415 @Override
416 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
417 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
418 }
419 }