1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ThreadFactory;
22
23 import org.apache.logging.log4j.Logger;
24 import org.apache.logging.log4j.core.LogEvent;
25 import org.apache.logging.log4j.status.StatusLogger;
26
27 import com.lmax.disruptor.BlockingWaitStrategy;
28 import com.lmax.disruptor.EventFactory;
29 import com.lmax.disruptor.EventHandler;
30 import com.lmax.disruptor.EventTranslator;
31 import com.lmax.disruptor.ExceptionHandler;
32 import com.lmax.disruptor.RingBuffer;
33 import com.lmax.disruptor.Sequence;
34 import com.lmax.disruptor.SequenceReportingEventHandler;
35 import com.lmax.disruptor.SleepingWaitStrategy;
36 import com.lmax.disruptor.WaitStrategy;
37 import com.lmax.disruptor.YieldingWaitStrategy;
38 import com.lmax.disruptor.dsl.Disruptor;
39 import com.lmax.disruptor.dsl.ProducerType;
40 import com.lmax.disruptor.util.Util;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 class AsyncLoggerConfigHelper {
59
60 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
61 private static final int HALF_A_SECOND = 500;
62 private static final int RINGBUFFER_MIN_SIZE = 128;
63 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
64 private static final Logger LOGGER = StatusLogger.getLogger();
65
66 private static ThreadFactory threadFactory = new DaemonThreadFactory(
67 "AsyncLoggerConfig-");
68 private static volatile Disruptor<Log4jEventWrapper> disruptor;
69 private static ExecutorService executor;
70
71 private static volatile int count = 0;
72
73
74
75
76
77 private static final EventFactory<Log4jEventWrapper> FACTORY = new EventFactory<Log4jEventWrapper>() {
78 @Override
79 public Log4jEventWrapper newInstance() {
80 return new Log4jEventWrapper();
81 }
82 };
83
84
85
86
87 private final EventTranslator<Log4jEventWrapper> translator = new EventTranslator<Log4jEventWrapper>() {
88 @Override
89 public void translateTo(final Log4jEventWrapper event,
90 final long sequence) {
91 event.event = currentLogEvent.get();
92 event.loggerConfig = asyncLoggerConfig;
93 }
94 };
95
96 private final ThreadLocal<LogEvent> currentLogEvent = new ThreadLocal<LogEvent>();
97 private final AsyncLoggerConfig asyncLoggerConfig;
98
99 public AsyncLoggerConfigHelper(final AsyncLoggerConfig asyncLoggerConfig) {
100 this.asyncLoggerConfig = asyncLoggerConfig;
101 claim();
102 }
103
104 private static synchronized void initDisruptor() {
105 if (disruptor != null) {
106 LOGGER.trace("AsyncLoggerConfigHelper not starting new disruptor, using existing object. Ref count is {}.", count);
107 return;
108 }
109 LOGGER.trace("AsyncLoggerConfigHelper creating new disruptor. Ref count is {}.", count);
110 final int ringBufferSize = calculateRingBufferSize();
111 final WaitStrategy waitStrategy = createWaitStrategy();
112 executor = Executors.newSingleThreadExecutor(threadFactory);
113 disruptor = new Disruptor<Log4jEventWrapper>(FACTORY, ringBufferSize,
114 executor, ProducerType.MULTI, waitStrategy);
115 final EventHandler<Log4jEventWrapper>[] handlers = new Log4jEventWrapperHandler[] {
116 new Log4jEventWrapperHandler() };
117 final ExceptionHandler errorHandler = getExceptionHandler();
118 disruptor.handleExceptionsWith(errorHandler);
119 disruptor.handleEventsWith(handlers);
120
121 LOGGER.debug(
122 "Starting AsyncLoggerConfig disruptor with ringbuffer size={}, waitStrategy={}, exceptionHandler={}...",
123 disruptor.getRingBuffer().getBufferSize(), waitStrategy.getClass().getSimpleName(), errorHandler);
124 disruptor.start();
125 }
126
127 private static WaitStrategy createWaitStrategy() {
128 final String strategy = System
129 .getProperty("AsyncLoggerConfig.WaitStrategy");
130 LOGGER.debug("property AsyncLoggerConfig.WaitStrategy={}", strategy);
131 if ("Sleep".equals(strategy)) {
132 return new SleepingWaitStrategy();
133 } else if ("Yield".equals(strategy)) {
134 return new YieldingWaitStrategy();
135 } else if ("Block".equals(strategy)) {
136 return new BlockingWaitStrategy();
137 }
138 return new SleepingWaitStrategy();
139 }
140
141 private static int calculateRingBufferSize() {
142 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
143 final String userPreferredRBSize = System.getProperty(
144 "AsyncLoggerConfig.RingBufferSize",
145 String.valueOf(ringBufferSize));
146 try {
147 int size = Integer.parseInt(userPreferredRBSize);
148 if (size < RINGBUFFER_MIN_SIZE) {
149 size = RINGBUFFER_MIN_SIZE;
150 LOGGER.warn(
151 "Invalid RingBufferSize {}, using minimum size {}.",
152 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
153 }
154 ringBufferSize = size;
155 } catch (final Exception ex) {
156 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
157 userPreferredRBSize, ringBufferSize);
158 }
159 return Util.ceilingNextPowerOfTwo(ringBufferSize);
160 }
161
162 private static ExceptionHandler getExceptionHandler() {
163 final String cls = System
164 .getProperty("AsyncLoggerConfig.ExceptionHandler");
165 if (cls == null) {
166 return null;
167 }
168 try {
169 @SuppressWarnings("unchecked")
170 final Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
171 .forName(cls);
172 final ExceptionHandler result = klass.newInstance();
173 return result;
174 } catch (final Exception ignored) {
175 LOGGER.debug(
176 "AsyncLoggerConfig.ExceptionHandler not set: error creating "
177 + cls + ": ", ignored);
178 return null;
179 }
180 }
181
182
183
184
185
186 private static class Log4jEventWrapper {
187 private AsyncLoggerConfig loggerConfig;
188 private LogEvent event;
189
190
191
192
193
194 public void clear() {
195 loggerConfig = null;
196 event = null;
197 }
198 }
199
200
201
202
203 private static class Log4jEventWrapperHandler implements
204 SequenceReportingEventHandler<Log4jEventWrapper> {
205 private static final int NOTIFY_PROGRESS_THRESHOLD = 50;
206 private Sequence sequenceCallback;
207 private int counter;
208
209 @Override
210 public void setSequenceCallback(final Sequence sequenceCallback) {
211 this.sequenceCallback = sequenceCallback;
212 }
213
214 @Override
215 public void onEvent(final Log4jEventWrapper event, final long sequence,
216 final boolean endOfBatch) throws Exception {
217 event.event.setEndOfBatch(endOfBatch);
218 event.loggerConfig.asyncCallAppenders(event.event);
219 event.clear();
220
221
222
223
224 if (++counter > NOTIFY_PROGRESS_THRESHOLD) {
225 sequenceCallback.set(sequence);
226 counter = 0;
227 }
228 }
229 }
230
231
232
233
234
235
236
237 synchronized static void claim() {
238 count++;
239 initDisruptor();
240 }
241
242
243
244
245
246
247 synchronized static void release() {
248 if (--count > 0) {
249 LOGGER.trace("AsyncLoggerConfigHelper: not shutting down disruptor: ref count is {}.", count);
250 return;
251 }
252 final Disruptor<Log4jEventWrapper> temp = disruptor;
253 if (temp == null) {
254 LOGGER.trace("AsyncLoggerConfigHelper: disruptor already shut down: ref count is {}.", count);
255 return;
256 }
257 LOGGER.trace("AsyncLoggerConfigHelper: shutting down disruptor: ref count is {}.", count);
258
259
260
261 disruptor = null;
262 temp.shutdown();
263
264
265 final RingBuffer<Log4jEventWrapper> ringBuffer = temp.getRingBuffer();
266 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
267 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
268 break;
269 }
270 try {
271
272 Thread.sleep(HALF_A_SECOND);
273 } catch (final InterruptedException e) {
274
275 }
276 }
277 executor.shutdown();
278 executor = null;
279 }
280
281 public void callAppendersFromAnotherThread(final LogEvent event) {
282 currentLogEvent.set(event);
283 disruptor.publishEvent(translator);
284 }
285
286 }