1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.logging.log4j.Level;
25 import org.apache.logging.log4j.core.AbstractLifeCycle;
26 import org.apache.logging.log4j.core.LogEvent;
27 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
28 import org.apache.logging.log4j.core.impl.LogEventFactory;
29 import org.apache.logging.log4j.core.impl.MutableLogEvent;
30 import org.apache.logging.log4j.core.impl.ReusableLogEventFactory;
31 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
32 import org.apache.logging.log4j.core.util.ExecutorServices;
33 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
34 import org.apache.logging.log4j.message.ReusableMessage;
35
36 import com.lmax.disruptor.EventFactory;
37 import com.lmax.disruptor.EventTranslatorTwoArg;
38 import com.lmax.disruptor.ExceptionHandler;
39 import com.lmax.disruptor.RingBuffer;
40 import com.lmax.disruptor.Sequence;
41 import com.lmax.disruptor.SequenceReportingEventHandler;
42 import com.lmax.disruptor.WaitStrategy;
43 import com.lmax.disruptor.dsl.Disruptor;
44 import com.lmax.disruptor.dsl.ProducerType;
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class AsyncLoggerConfigDisruptor extends AbstractLifeCycle implements AsyncLoggerConfigDelegate {
59
60 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
61 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
62
63
64
65
66 public static class Log4jEventWrapper {
67 public Log4jEventWrapper() {
68 }
69
70 public Log4jEventWrapper(final MutableLogEvent mutableLogEvent) {
71 event = mutableLogEvent;
72 }
73
74 private AsyncLoggerConfig loggerConfig;
75 private LogEvent event;
76
77
78
79
80 public void clear() {
81 loggerConfig = null;
82 if (event instanceof MutableLogEvent) {
83 ((MutableLogEvent) event).clear();
84 } else {
85 event = null;
86 }
87 }
88
89 @Override
90 public String toString() {
91 return String.valueOf(event);
92 }
93 }
94
95
96
97
98 private static class Log4jEventWrapperHandler implements SequenceReportingEventHandler<Log4jEventWrapper> {
99 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
100 private Sequence sequenceCallback;
101 private int counter;
102
103 @Override
104 public void setSequenceCallback(final Sequence sequenceCallback) {
105 this.sequenceCallback = sequenceCallback;
106 }
107
108 @Override
109 public void onEvent(final Log4jEventWrapper event, final long sequence, final boolean endOfBatch)
110 throws Exception {
111 event.event.setEndOfBatch(endOfBatch);
112 event.loggerConfig.asyncCallAppenders(event.event);
113 event.clear();
114
115 notifyIntermediateProgress(sequence);
116 }
117
118
119
120
121
122 private void notifyIntermediateProgress(final long sequence) {
123 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
124 sequenceCallback.set(sequence);
125 counter = 0;
126 }
127 }
128 }
129
130
131
132
133
134 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
135 @Override
136 public Log4jEventWrapper newInstance() {
137 return new Log4jEventWrapper();
138 }
139 };
140
141
142
143
144
145 private static final EventFactory<Log4jEventWrapper> MUTABLE_FACTORY = new EventFactory<Log4jEventWrapper>() {
146 @Override
147 public Log4jEventWrapper newInstance() {
148 return new Log4jEventWrapper(new MutableLogEvent());
149 }
150 };
151
152
153
154
155 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> TRANSLATOR =
156 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
157
158 @Override
159 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
160 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
161 ringBufferElement.event = logEvent;
162 ringBufferElement.loggerConfig = loggerConfig;
163 }
164 };
165
166
167
168
169 private static final EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> MUTABLE_TRANSLATOR =
170 new EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig>() {
171
172 @Override
173 public void translateTo(final Log4jEventWrapper ringBufferElement, final long sequence,
174 final LogEvent logEvent, final AsyncLoggerConfig loggerConfig) {
175 ((MutableLogEvent) ringBufferElement.event).initFrom(logEvent);
176 ringBufferElement.loggerConfig = loggerConfig;
177 }
178 };
179
180 private static final ThreadFactory THREAD_FACTORY = Log4jThreadFactory.createDaemonThreadFactory("AsyncLoggerConfig");
181
182 private int ringBufferSize;
183 private AsyncQueueFullPolicy asyncQueueFullPolicy;
184 private Boolean mutable = Boolean.FALSE;
185
186 private volatile Disruptor<Log4jEventWrapper> disruptor;
187 private ExecutorService executor;
188 private long backgroundThreadId;
189 private EventFactory<Log4jEventWrapper> factory;
190 private EventTranslatorTwoArg<Log4jEventWrapper, LogEvent, AsyncLoggerConfig> translator;
191
192 public AsyncLoggerConfigDisruptor() {
193 }
194
195
196 @Override
197 public void setLogEventFactory(final LogEventFactory logEventFactory) {
198
199
200 this.mutable = mutable || (logEventFactory instanceof ReusableLogEventFactory);
201 }
202
203
204
205
206
207
208
209 @Override
210 public synchronized void start() {
211 if (disruptor != null) {
212 LOGGER.trace("AsyncLoggerConfigDisruptor not starting new disruptor for this configuration, "
213 + "using existing object.");
214 return;
215 }
216 LOGGER.trace("AsyncLoggerConfigDisruptor creating new disruptor for this configuration.");
217 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLoggerConfig.RingBufferSize");
218 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLoggerConfig.WaitStrategy");
219 executor = Executors.newSingleThreadExecutor(THREAD_FACTORY);
220 backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
221 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
222
223 translator = mutable ? MUTABLE_TRANSLATOR : TRANSLATOR;
224 factory = mutable ? MUTABLE_FACTORY : FACTORY;
225 disruptor = new Disruptor<>(factory, ringBufferSize, executor, ProducerType.MULTI, waitStrategy);
226
227 final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
228 disruptor.handleExceptionsWith(errorHandler);
229
230 final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
231 disruptor.handleEventsWith(handlers);
232
233 LOGGER.debug("Starting AsyncLoggerConfig disruptor for this configuration with ringbufferSize={}, "
234 + "waitStrategy={}, exceptionHandler={}...", disruptor.getRingBuffer().getBufferSize(), waitStrategy
235 .getClass().getSimpleName(), errorHandler);
236 disruptor.start();
237 super.start();
238 }
239
240
241
242
243
244 @Override
245 public boolean stop(final long timeout, final TimeUnit timeUnit) {
246 final Disruptor<Log4jEventWrapper> temp = disruptor;
247 if (temp == null) {
248 LOGGER.trace("AsyncLoggerConfigDisruptor: disruptor for this configuration already shut down.");
249 return true;
250 }
251 setStopping();
252 LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor for this configuration.");
253
254
255 disruptor = null;
256
257
258
259
260 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
261 try {
262 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
263 } catch (final InterruptedException e) {
264 }
265 }
266 temp.shutdown();
267
268 LOGGER.trace("AsyncLoggerConfigDisruptor: shutting down disruptor executor for this configuration.");
269
270 ExecutorServices.shutdown(executor, timeout, timeUnit, toString());
271 executor = null;
272
273 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
274 LOGGER.trace("AsyncLoggerConfigDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
275 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
276 }
277 setStopped();
278 return true;
279 }
280
281
282
283
284 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
285 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
286 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
287 }
288
289 @Override
290 public EventRoute getEventRoute(final Level logLevel) {
291 final int remainingCapacity = remainingDisruptorCapacity();
292 if (remainingCapacity < 0) {
293 return EventRoute.DISCARD;
294 }
295 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
296 }
297
298 private int remainingDisruptorCapacity() {
299 final Disruptor<Log4jEventWrapper> temp = disruptor;
300 if (hasLog4jBeenShutDown(temp)) {
301 return -1;
302 }
303 return (int) temp.getRingBuffer().remainingCapacity();
304 }
305
306
307
308
309 private boolean hasLog4jBeenShutDown(final Disruptor<Log4jEventWrapper> aDisruptor) {
310 if (aDisruptor == null) {
311 LOGGER.warn("Ignoring log event after log4j was shut down");
312 return true;
313 }
314 return false;
315 }
316
317 @Override
318 public void enqueueEvent(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
319
320 try {
321 final LogEvent logEvent = prepareEvent(event);
322 enqueue(logEvent, asyncLoggerConfig);
323 } catch (final NullPointerException npe) {
324
325
326 LOGGER.warn("Ignoring log event after log4j was shut down.");
327 }
328 }
329
330 private LogEvent prepareEvent(final LogEvent event) {
331 final LogEvent logEvent = ensureImmutable(event);
332 if (logEvent instanceof Log4jLogEvent && logEvent.getMessage() instanceof ReusableMessage) {
333 ((Log4jLogEvent) logEvent).makeMessageImmutable();
334 }
335 return logEvent;
336 }
337
338 private void enqueue(final LogEvent logEvent, final AsyncLoggerConfig asyncLoggerConfig) {
339 disruptor.getRingBuffer().publishEvent(translator, logEvent, asyncLoggerConfig);
340 }
341
342 @Override
343 public boolean tryEnqueue(final LogEvent event, final AsyncLoggerConfig asyncLoggerConfig) {
344 final LogEvent logEvent = prepareEvent(event);
345 return disruptor.getRingBuffer().tryPublishEvent(translator, logEvent, asyncLoggerConfig);
346 }
347
348 private LogEvent ensureImmutable(final LogEvent event) {
349 LogEvent result = event;
350 if (event instanceof RingBufferLogEvent) {
351
352
353
354
355
356
357 result = ((RingBufferLogEvent) event).createMemento();
358 }
359 return result;
360 }
361
362
363
364
365
366
367
368 @Override
369 public RingBufferAdmin createRingBufferAdmin(final String contextName, final String loggerConfigName) {
370 return RingBufferAdmin.forAsyncLoggerConfig(disruptor.getRingBuffer(), contextName, loggerConfigName);
371 }
372 }