1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.logging.log4j.core.async;
19
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.logging.log4j.Level;
25 import org.apache.logging.log4j.core.AbstractLifeCycle;
26 import org.apache.logging.log4j.core.jmx.RingBufferAdmin;
27 import org.apache.logging.log4j.core.util.ExecutorServices;
28 import org.apache.logging.log4j.core.util.Log4jThreadFactory;
29
30 import com.lmax.disruptor.ExceptionHandler;
31 import com.lmax.disruptor.RingBuffer;
32 import com.lmax.disruptor.TimeoutException;
33 import com.lmax.disruptor.WaitStrategy;
34 import com.lmax.disruptor.dsl.Disruptor;
35 import com.lmax.disruptor.dsl.ProducerType;
36
37
38
39
40
41
42
43 class AsyncLoggerDisruptor extends AbstractLifeCycle {
44 private static final int SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS = 50;
45 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 200;
46
47 private volatile Disruptor<RingBufferLogEvent> disruptor;
48 private ExecutorService executor;
49 private String contextName;
50
51 private boolean useThreadLocalTranslator = true;
52 private long backgroundThreadId;
53 private AsyncQueueFullPolicy asyncQueueFullPolicy;
54 private int ringBufferSize;
55
56 AsyncLoggerDisruptor(final String contextName) {
57 this.contextName = contextName;
58 }
59
60 public String getContextName() {
61 return contextName;
62 }
63
64 public void setContextName(final String name) {
65 contextName = name;
66 }
67
68 Disruptor<RingBufferLogEvent> getDisruptor() {
69 return disruptor;
70 }
71
72
73
74
75
76
77 @Override
78 public synchronized void start() {
79 if (disruptor != null) {
80 LOGGER.trace(
81 "[{}] AsyncLoggerDisruptor not starting new disruptor for this context, using existing object.",
82 contextName);
83 return;
84 }
85 LOGGER.trace("[{}] AsyncLoggerDisruptor creating new disruptor for this context.", contextName);
86 ringBufferSize = DisruptorUtil.calculateRingBufferSize("AsyncLogger.RingBufferSize");
87 final WaitStrategy waitStrategy = DisruptorUtil.createWaitStrategy("AsyncLogger.WaitStrategy");
88 executor = Executors.newSingleThreadExecutor(Log4jThreadFactory.createDaemonThreadFactory("AsyncLogger[" + contextName + "]"));
89 backgroundThreadId = DisruptorUtil.getExecutorThreadId(executor);
90 asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create();
91
92 disruptor = new Disruptor<>(RingBufferLogEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,
93 waitStrategy);
94
95 final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
96 disruptor.handleExceptionsWith(errorHandler);
97
98 final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
99 disruptor.handleEventsWith(handlers);
100
101 LOGGER.debug("[{}] Starting AsyncLogger disruptor for this context with ringbufferSize={}, waitStrategy={}, "
102 + "exceptionHandler={}...", contextName, disruptor.getRingBuffer().getBufferSize(), waitStrategy
103 .getClass().getSimpleName(), errorHandler);
104 disruptor.start();
105
106 LOGGER.trace("[{}] AsyncLoggers use a {} translator", contextName, useThreadLocalTranslator ? "threadlocal"
107 : "vararg");
108 super.start();
109 }
110
111
112
113
114
115 @Override
116 public boolean stop(final long timeout, final TimeUnit timeUnit) {
117 final Disruptor<RingBufferLogEvent> temp = getDisruptor();
118 if (temp == null) {
119 LOGGER.trace("[{}] AsyncLoggerDisruptor: disruptor for this context already shut down.", contextName);
120 return true;
121 }
122 setStopping();
123 LOGGER.debug("[{}] AsyncLoggerDisruptor: shutting down disruptor for this context.", contextName);
124
125
126 disruptor = null;
127
128
129
130
131 for (int i = 0; hasBacklog(temp) && i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
132 try {
133 Thread.sleep(SLEEP_MILLIS_BETWEEN_DRAIN_ATTEMPTS);
134 } catch (final InterruptedException e) {
135 }
136 }
137 try {
138
139 temp.shutdown(timeout, timeUnit);
140 } catch (final TimeoutException e) {
141 temp.shutdown();
142 }
143
144 LOGGER.trace("[{}] AsyncLoggerDisruptor: shutting down disruptor executor.", contextName);
145
146 ExecutorServices.shutdown(executor, timeout, timeUnit, toString());
147 executor = null;
148
149 if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) {
150 LOGGER.trace("AsyncLoggerDisruptor: {} discarded {} events.", asyncQueueFullPolicy,
151 DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy));
152 }
153 setStopped();
154 return true;
155 }
156
157
158
159
160 private static boolean hasBacklog(final Disruptor<?> theDisruptor) {
161 final RingBuffer<?> ringBuffer = theDisruptor.getRingBuffer();
162 return !ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize());
163 }
164
165
166
167
168
169
170
171 public RingBufferAdmin createRingBufferAdmin(final String jmxContextName) {
172 final RingBuffer<RingBufferLogEvent> ring = disruptor == null ? null : disruptor.getRingBuffer();
173 return RingBufferAdmin.forAsyncLogger(ring, jmxContextName);
174 }
175
176 EventRoute getEventRoute(final Level logLevel) {
177 final int remainingCapacity = remainingDisruptorCapacity();
178 if (remainingCapacity < 0) {
179 return EventRoute.DISCARD;
180 }
181 return asyncQueueFullPolicy.getRoute(backgroundThreadId, logLevel);
182 }
183
184 private int remainingDisruptorCapacity() {
185 final Disruptor<RingBufferLogEvent> temp = disruptor;
186 if (hasLog4jBeenShutDown(temp)) {
187 return -1;
188 }
189 return (int) temp.getRingBuffer().remainingCapacity();
190 }
191
192
193
194 private boolean hasLog4jBeenShutDown(final Disruptor<RingBufferLogEvent> aDisruptor) {
195 if (aDisruptor == null) {
196 LOGGER.warn("Ignoring log event after log4j was shut down");
197 return true;
198 }
199 return false;
200 }
201
202 public boolean tryPublish(final RingBufferLogEventTranslator translator) {
203
204 try {
205 return disruptor.getRingBuffer().tryPublishEvent(translator);
206 } catch (final NullPointerException npe) {
207 LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
208 return false;
209 }
210 }
211
212 void enqueueLogMessageInfo(final RingBufferLogEventTranslator translator) {
213
214 try {
215
216
217
218 disruptor.publishEvent(translator);
219 } catch (final NullPointerException npe) {
220 LOGGER.warn("[{}] Ignoring log event after log4j was shut down.", contextName);
221 }
222 }
223
224
225
226
227
228
229
230
231 public boolean isUseThreadLocals() {
232 return useThreadLocalTranslator;
233 }
234
235
236
237
238
239
240
241
242
243
244
245
246 public void setUseThreadLocals(final boolean allow) {
247 useThreadLocalTranslator = allow;
248 LOGGER.trace("[{}] AsyncLoggers have been modified to use a {} translator", contextName,
249 useThreadLocalTranslator ? "threadlocal" : "vararg");
250 }
251 }