1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.async;
19
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.TimeUnit;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.core.AbstractLifeCycle;
25 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
26 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
27 import org.apache.logging.log4j.core.util.Throwables;
28
29 import com.lmax.disruptor.ExceptionHandler;
30 import com.lmax.disruptor.RingBuffer;
31 import com.lmax.disruptor.TimeoutException;
32 import com.lmax.disruptor.WaitStrategy;
33 import com.lmax.disruptor.dsl.Disruptor;
34 import com.lmax.disruptor.dsl.ProducerType;
35
36
37
38
39
40
41
42 class AsyncLoggerDisruptor extends AbstractLifeCycle {
43 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
44 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
45
46 private volatile Disruptor<RingBufferLogEvent> disruptor;
47 private String contextName;
48
49 private boolean useThreadLocalTranslator = true;
50 private long backgroundThreadId;
51 private AsyncQueueFullPolicy asyncQueueFullPolicy;
52 private int ringBufferSize;
53
54 AsyncLoggerDisruptor(final String contextName) {
55 this.contextName = contextName;
56 }
57
58 public String getContextName() {
59 return contextName;
60 }
61
62 public void setContextName(final String name) {
63 contextName = name;
64 }
65
66 Disruptor<RingBufferLogEvent> getDisruptor() {
67 return disruptor;
68 }
69
70
71
72
73
74
75 @Override
76 public synchronized void start() {
77 if (disruptor != null) {
78 LOGGER.trace(
79 "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
80 contextName);
81 return;
82 }
83 LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
84 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
85 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
86
87 final ThreadFactory threadFactory = new Log4jThreadFactory("AsyncLogger[" + contextName + "]", true, Thread.NORM_PRIORITY) {
88 @Override
89 public Thread newThread(final Runnable r) {
90 final Thread result = super.newThread(r);
91 backgroundThreadId = result.getId();
92 return result;
93 }
94 };
95 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
96
97 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, threadFactory, ProducerType.MULTI,
98 waitStrategy);
99
100 final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
101 disruptor.setDefaultExceptionHandler(errorHandler);
102
103 final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
104 disruptor.handleEventsWith(handlers);
105
106 LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
107 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
108 .getClass().getSimpleName(), errorHandler);
109 disruptor.start();
110
111 LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
112 : "vararg");
113 super.start();
114 }
115
116
117
118
119
120 @Override
121 public boolean stop(final long timeout, final TimeUnit timeUnit) {
122 final Disruptor<RingBufferLogEvent> temp = getDisruptor();
123 if (temp == null) {
124 LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
125 return true;
126 }
127 setStopping();
128 LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
129
130
131 disruptor = null;
132
133
134
135
136 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
137 try {
138 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
139 } catch (final InterruptedException e) {
140 }
141 }
142 try {
143
144 temp.shutdown(timeout, timeUnit);
145 } catch (final TimeoutException e) {
146 LOGGER.warn("[{}] AsyncLoggerDisruptor: shutdown timed out after {} {}", contextName, timeout, timeUnit);
147 temp.halt();
148 }
149
150 LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor has been shut down.", contextName);
151
152 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
153 LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
154 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
155 }
156 setStopped();
157 return true;
158 }
159
160
161
162
163 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
164 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
165 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
166 }
167
168
169
170
171
172
173
174 public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
175 final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
176 return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
177 }
178
179 EventRoute getEventRoute(final Level logLevel) {
180 final int remainingCapacity = remainingDisruptorCapacity();
181 if (remainingCapacity < 0) {
182 return EventRoute.DISCARD;
183 }
184 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
185 }
186
187 private int remainingDisruptorCapacity() {
188 final Disruptor<RingBufferLogEvent> temp = disruptor;
189 if (hasLog4jBeenShutDown(temp)) {
190 return -1;
191 }
192 return (int) temp.getRingBuffer().remainingCapacity();
193 }
194
195
196
197 private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
198 if (aDisruptor == null) {
199 LOGGER.warn("Ignoring log event after log4j was shut down");
200 return true;
201 }
202 return false;
203 }
204
205 public boolean tryPublish(final RingBufferLogEventTranslator translator) {
206 try {
207 return disruptor.getRingBuffer().tryPublishEvent(translator);
208 } catch (final NullPointerException npe) {
209
210 LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
211 translator.level, translator.loggerName, translator.message.getFormattedMessage()
212 + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
213 return false;
214 }
215 }
216
217 void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
218 try {
219
220
221
222 disruptor.publishEvent(translator);
223 } catch (final NullPointerException npe) {
224
225 LOGGER.warn("[{}] Ignoring log event after log4j was shut down: {} [{}] {}", contextName,
226 translator.level, translator.loggerName, translator.message.getFormattedMessage()
227 + (translator.thrown == null ? "" : Throwables.toStringList(translator.thrown)));
228 }
229 }
230
231
232
233
234
235
236
237
238 public boolean isUseThreadLocals() {
239 return useThreadLocalTranslator;
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253 public void setUseThreadLocals(final boolean allow) {
254 useThreadLocalTranslator = allow;
255 LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
256 useThreadLocalTranslator ? "threadlocal" : "vararg");
257 }
258 }