1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import java.io.Serializable;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ArrayBlockingQueue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.logging.log4j.core.Appender;
28 import org.apache.logging.log4j.core.Filter;
29 import org.apache.logging.log4j.core.LogEvent;
30 import org.apache.logging.log4j.core.async.RingBufferLogEvent;
31 import org.apache.logging.log4j.core.config.AppenderControl;
32 import org.apache.logging.log4j.core.config.AppenderRef;
33 import org.apache.logging.log4j.core.config.Configuration;
34 import org.apache.logging.log4j.core.config.ConfigurationException;
35 import org.apache.logging.log4j.core.config.plugins.Plugin;
36 import org.apache.logging.log4j.core.config.plugins.PluginAliases;
37 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
38 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
39 import org.apache.logging.log4j.core.config.plugins.PluginElement;
40 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
41 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
42
43
44
45
46
47
48
49 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50 public final class AsyncAppender extends AbstractAppender {
51
52 private static final int DEFAULT_QUEUE_SIZE = 128;
53 private static final String SHUTDOWN = "Shutdown";
54
55 private final BlockingQueue<Serializable> queue;
56 private final int queueSize;
57 private final boolean blocking;
58 private final Configuration config;
59 private final AppenderRef[] appenderRefs;
60 private final String errorRef;
61 private final boolean includeLocation;
62 private AppenderControl errorAppender;
63 private AsyncThread thread;
64 private static final AtomicLong threadSequence = new AtomicLong(1);
65 private static ThreadLocal<Boolean> isAppenderThread = new ThreadLocal<Boolean>();
66
67
68 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
69 final String errorRef, final int queueSize, final boolean blocking,
70 final boolean ignoreExceptions, final Configuration config,
71 final boolean includeLocation) {
72 super(name, filter, null, ignoreExceptions);
73 this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
74 this.queueSize = queueSize;
75 this.blocking = blocking;
76 this.config = config;
77 this.appenderRefs = appenderRefs;
78 this.errorRef = errorRef;
79 this.includeLocation = includeLocation;
80 }
81
82 @Override
83 public void start() {
84 final Map<String, Appender> map = config.getAppenders();
85 final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
86 for (final AppenderRef appenderRef : appenderRefs) {
87 if (map.containsKey(appenderRef.getRef())) {
88 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
89 appenderRef.getFilter()));
90 } else {
91 LOGGER.error("No appender named {} was configured", appenderRef);
92 }
93 }
94 if (errorRef != null) {
95 if (map.containsKey(errorRef)) {
96 errorAppender = new AppenderControl(map.get(errorRef), null, null);
97 } else {
98 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
99 }
100 }
101 if (appenders.size() > 0) {
102 thread = new AsyncThread(appenders, queue);
103 thread.setName("AsyncAppender-" + getName());
104 } else if (errorRef == null) {
105 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
106 }
107
108 thread.start();
109 super.start();
110 }
111
112 @Override
113 public void stop() {
114 super.stop();
115 LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size());
116 thread.shutdown();
117 try {
118 thread.join();
119 } catch (final InterruptedException ex) {
120 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
121 }
122 LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size());
123 }
124
125
126
127
128
129
130 @Override
131 public void append(LogEvent logEvent) {
132 if (!isStarted()) {
133 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
134 }
135 if (!(logEvent instanceof Log4jLogEvent)) {
136 if (!(logEvent instanceof RingBufferLogEvent)) {
137 return;
138 }
139 logEvent = ((RingBufferLogEvent) logEvent).createMemento();
140 }
141 logEvent.getMessage().getFormattedMessage();
142 final Log4jLogEvent coreEvent = (Log4jLogEvent) logEvent;
143 boolean appendSuccessful = false;
144 if (blocking) {
145 if (isAppenderThread.get() == Boolean.TRUE && queue.remainingCapacity() == 0) {
146
147
148 coreEvent.setEndOfBatch(false);
149 appendSuccessful = thread.callAppenders(coreEvent);
150 } else {
151 try {
152
153 queue.put(Log4jLogEvent.serialize(coreEvent, includeLocation));
154 appendSuccessful = true;
155 } catch (final InterruptedException e) {
156 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
157 getName());
158 }
159 }
160 } else {
161 appendSuccessful = queue.offer(Log4jLogEvent.serialize(coreEvent, includeLocation));
162 if (!appendSuccessful) {
163 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
164 }
165 }
166 if (!appendSuccessful && errorAppender != null) {
167 errorAppender.callAppender(coreEvent);
168 }
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 @PluginFactory
186 public static AsyncAppender createAppender(@PluginElement("AppenderRef") final AppenderRef[] appenderRefs,
187 @PluginAttribute("errorRef") @PluginAliases("error-ref") final String errorRef,
188 @PluginAttribute(value = "blocking", defaultBoolean = true) final boolean blocking,
189 @PluginAttribute(value = "bufferSize", defaultInt = DEFAULT_QUEUE_SIZE) final int size,
190 @PluginAttribute("name") final String name,
191 @PluginAttribute(value = "includeLocation", defaultBoolean = false) final boolean includeLocation,
192 @PluginElement("Filter") final Filter filter,
193 @PluginConfiguration final Configuration config,
194 @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions) {
195 if (name == null) {
196 LOGGER.error("No name provided for AsyncAppender");
197 return null;
198 }
199 if (appenderRefs == null) {
200 LOGGER.error("No appender references provided to AsyncAppender {}", name);
201 }
202
203 return new AsyncAppender(name, filter, appenderRefs, errorRef,
204 size, blocking, ignoreExceptions, config, includeLocation);
205 }
206
207
208
209
210 private class AsyncThread extends Thread {
211
212 private volatile boolean shutdown = false;
213 private final List<AppenderControl> appenders;
214 private final BlockingQueue<Serializable> queue;
215
216 public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
217 this.appenders = appenders;
218 this.queue = queue;
219 setDaemon(true);
220 setName("AsyncAppenderThread" + threadSequence.getAndIncrement());
221 }
222
223 @Override
224 public void run() {
225 isAppenderThread.set(Boolean.TRUE);
226 while (!shutdown) {
227 Serializable s;
228 try {
229 s = queue.take();
230 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
231 shutdown = true;
232 continue;
233 }
234 } catch (final InterruptedException ex) {
235
236 continue;
237 }
238 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
239 event.setEndOfBatch(queue.isEmpty());
240 final boolean success = callAppenders(event);
241 if (!success && errorAppender != null) {
242 try {
243 errorAppender.callAppender(event);
244 } catch (final Exception ex) {
245
246 }
247 }
248 }
249
250 LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.",
251 queue.size());
252 int count= 0;
253 int ignored = 0;
254 while (!queue.isEmpty()) {
255 try {
256 final Serializable s = queue.take();
257 if (Log4jLogEvent.canDeserialize(s)) {
258 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
259 event.setEndOfBatch(queue.isEmpty());
260 callAppenders(event);
261 count++;
262 } else {
263 ignored++;
264 LOGGER.trace("Ignoring event of class {}", s.getClass().getName());
265 }
266 } catch (final InterruptedException ex) {
267
268 }
269 }
270 LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " +
271 "Processed {} and ignored {} events since shutdown started.",
272 queue.size(), count, ignored);
273 }
274
275
276
277
278
279
280
281
282
283
284 boolean callAppenders(final Log4jLogEvent event) {
285 boolean success = false;
286 for (final AppenderControl control : appenders) {
287 try {
288 control.callAppender(event);
289 success = true;
290 } catch (final Exception ex) {
291
292 }
293 }
294 return success;
295 }
296
297 public void shutdown() {
298 shutdown = true;
299 if (queue.isEmpty()) {
300 queue.offer(SHUTDOWN);
301 }
302 }
303 }
304
305
306
307
308
309
310 public String[] getAppenderRefStrings() {
311 final String[] result = new String[appenderRefs.length];
312 for (int i = 0; i < result.length; i++) {
313 result[i] = appenderRefs[i].getRef();
314 }
315 return result;
316 }
317
318
319
320
321
322
323
324 public boolean isIncludeLocation() {
325 return includeLocation;
326 }
327
328
329
330
331
332
333 public boolean isBlocking() {
334 return blocking;
335 }
336
337
338
339
340
341 public String getErrorRef() {
342 return errorRef;
343 }
344
345 public int getQueueCapacity() {
346 return queueSize;
347 }
348
349 public int getQueueRemainingCapacity() {
350 return queue.remainingCapacity();
351 }
352 }