1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.async;
18
19 import java.util.Map;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22
23 import org.apache.logging.log4j.Level;
24 import org.apache.logging.log4j.Marker;
25 import org.apache.logging.log4j.ThreadContext;
26 import org.apache.logging.log4j.core.Logger;
27 import org.apache.logging.log4j.core.LoggerContext;
28 import org.apache.logging.log4j.core.config.Property;
29 import org.apache.logging.log4j.core.helpers.Clock;
30 import org.apache.logging.log4j.core.helpers.ClockFactory;
31 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
32 import org.apache.logging.log4j.message.Message;
33 import org.apache.logging.log4j.message.MessageFactory;
34 import org.apache.logging.log4j.status.StatusLogger;
35
36 import com.lmax.disruptor.BlockingWaitStrategy;
37 import com.lmax.disruptor.EventHandler;
38 import com.lmax.disruptor.ExceptionHandler;
39 import com.lmax.disruptor.RingBuffer;
40 import com.lmax.disruptor.SleepingWaitStrategy;
41 import com.lmax.disruptor.WaitStrategy;
42 import com.lmax.disruptor.YieldingWaitStrategy;
43 import com.lmax.disruptor.dsl.Disruptor;
44 import com.lmax.disruptor.dsl.ProducerType;
45 import com.lmax.disruptor.util.Util;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class AsyncLogger extends Logger {
75 private static final int HALF_A_SECOND = 500;
76 private static final int MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN = 20;
77 private static final int RINGBUFFER_MIN_SIZE = 128;
78 private static final int RINGBUFFER_DEFAULT_SIZE = 256 * 1024;
79 private static final StatusLogger LOGGER = StatusLogger.getLogger();
80
81 private static volatile Disruptor<RingBufferLogEvent> disruptor;
82 private static Clock clock = ClockFactory.getClock();
83
84 private static ExecutorService executor = Executors
85 .newSingleThreadExecutor(new DaemonThreadFactory("AsyncLogger-"));
86 private final ThreadLocal<Info> threadlocalInfo = new ThreadLocal<Info>();
87
88 static {
89 final int ringBufferSize = calculateRingBufferSize();
90
91 final WaitStrategy waitStrategy = createWaitStrategy();
92 disruptor = new Disruptor<RingBufferLogEvent>(
93 RingBufferLogEvent.FACTORY, ringBufferSize, executor,
94 ProducerType.MULTI, waitStrategy);
95 final EventHandler<RingBufferLogEvent>[] handlers = new RingBufferLogEventHandler[] {
96 new RingBufferLogEventHandler() };
97 disruptor.handleExceptionsWith(getExceptionHandler());
98 disruptor.handleEventsWith(handlers);
99
100 LOGGER.debug(
101 "Starting AsyncLogger disruptor with ringbuffer size {}...",
102 disruptor.getRingBuffer().getBufferSize());
103 disruptor.start();
104 }
105
106 private static int calculateRingBufferSize() {
107 int ringBufferSize = RINGBUFFER_DEFAULT_SIZE;
108 final String userPreferredRBSize = System.getProperty(
109 "AsyncLogger.RingBufferSize", String.valueOf(ringBufferSize));
110 try {
111 int size = Integer.parseInt(userPreferredRBSize);
112 if (size < RINGBUFFER_MIN_SIZE) {
113 size = RINGBUFFER_MIN_SIZE;
114 LOGGER.warn(
115 "Invalid RingBufferSize {}, using minimum size {}.",
116 userPreferredRBSize, RINGBUFFER_MIN_SIZE);
117 }
118 ringBufferSize = size;
119 } catch (final Exception ex) {
120 LOGGER.warn("Invalid RingBufferSize {}, using default size {}.",
121 userPreferredRBSize, ringBufferSize);
122 }
123 return Util.ceilingNextPowerOfTwo(ringBufferSize);
124 }
125
126 private static WaitStrategy createWaitStrategy() {
127 final String strategy = System.getProperty("AsyncLogger.WaitStrategy");
128 LOGGER.debug("property AsyncLogger.WaitStrategy={}", strategy);
129 if ("Sleep".equals(strategy)) {
130 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
131 return new SleepingWaitStrategy();
132 } else if ("Yield".equals(strategy)) {
133 LOGGER.debug("disruptor event handler uses YieldingWaitStrategy");
134 return new YieldingWaitStrategy();
135 } else if ("Block".equals(strategy)) {
136 LOGGER.debug("disruptor event handler uses BlockingWaitStrategy");
137 return new BlockingWaitStrategy();
138 }
139 LOGGER.debug("disruptor event handler uses SleepingWaitStrategy");
140 return new SleepingWaitStrategy();
141 }
142
143 private static ExceptionHandler getExceptionHandler() {
144 final String cls = System.getProperty("AsyncLogger.ExceptionHandler");
145 if (cls == null) {
146 LOGGER.debug("No AsyncLogger.ExceptionHandler specified");
147 return null;
148 }
149 try {
150 @SuppressWarnings("unchecked")
151 final
152 Class<? extends ExceptionHandler> klass = (Class<? extends ExceptionHandler>) Class
153 .forName(cls);
154 final ExceptionHandler result = klass.newInstance();
155 LOGGER.debug("AsyncLogger.ExceptionHandler=" + result);
156 return result;
157 } catch (final Exception ignored) {
158 LOGGER.debug(
159 "AsyncLogger.ExceptionHandler not set: error creating "
160 + cls + ": ", ignored);
161 return null;
162 }
163 }
164
165
166
167
168
169
170
171
172
173 public AsyncLogger(final LoggerContext context, final String name,
174 final MessageFactory messageFactory) {
175 super(context, name, messageFactory);
176 }
177
178
179
180
181 private static class Info {
182 private RingBufferLogEventTranslator translator;
183 private String cachedThreadName;
184 }
185
186 @Override
187 public void log(final Marker marker, final String fqcn, final Level level, final Message data,
188 final Throwable t) {
189 Info info = threadlocalInfo.get();
190 if (info == null) {
191 info = new Info();
192 info.translator = new RingBufferLogEventTranslator();
193 info.cachedThreadName = Thread.currentThread().getName();
194 threadlocalInfo.set(info);
195 }
196
197 final boolean includeLocation = config.loggerConfig.isIncludeLocation();
198 info.translator.setValues(this, getName(), marker, fqcn, level, data,
199 t,
200
201
202
203
204
205 ThreadContext.getImmutableContext(),
206
207
208 ThreadContext.getImmutableStack(),
209
210
211 info.cachedThreadName,
212
213
214
215
216 includeLocation ? location(fqcn) : null,
217
218
219
220
221 clock.currentTimeMillis());
222
223 disruptor.publishEvent(info.translator);
224 }
225
226 private StackTraceElement location(final String fqcnOfLogger) {
227 return Log4jLogEvent.calcLocation(fqcnOfLogger);
228 }
229
230
231
232
233
234
235
236 public void actualAsyncLog(final RingBufferLogEvent event) {
237 final Map<Property, Boolean> properties = config.loggerConfig.getProperties();
238 event.mergePropertiesIntoContextMap(properties,
239 config.config.getStrSubstitutor());
240 config.logEvent(event);
241 }
242
243 public static void stop() {
244 final Disruptor<RingBufferLogEvent> temp = disruptor;
245
246
247
248 disruptor = null;
249 temp.shutdown();
250
251
252 final RingBuffer<RingBufferLogEvent> ringBuffer = temp.getRingBuffer();
253 for (int i = 0; i < MAX_DRAIN_ATTEMPTS_BEFORE_SHUTDOWN; i++) {
254 if (ringBuffer.hasAvailableCapacity(ringBuffer.getBufferSize())) {
255 break;
256 }
257 try {
258
259 Thread.sleep(HALF_A_SECOND);
260 } catch (final InterruptedException e) {
261
262 }
263 }
264 executor.shutdown();
265 }
266 }