1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import org.apache.logging.log4j.core.Appender;
20 import org.apache.logging.log4j.core.Filter;
21 import org.apache.logging.log4j.core.Layout;
22 import org.apache.logging.log4j.core.LogEvent;
23 import org.apache.logging.log4j.core.config.AppenderControl;
24 import org.apache.logging.log4j.core.config.AppenderRef;
25 import org.apache.logging.log4j.core.config.Configuration;
26 import org.apache.logging.log4j.core.config.ConfigurationException;
27 import org.apache.logging.log4j.core.config.plugins.Plugin;
28 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
29 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
30 import org.apache.logging.log4j.core.config.plugins.PluginElement;
31 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
32 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
33
34 import java.io.Serializable;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.ArrayBlockingQueue;
39 import java.util.concurrent.BlockingQueue;
40
41
42
43
44
45
46
47
48
49 @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50 public final class AsyncAppender<T extends Serializable> extends AbstractAppender<T> {
51
52 private static final int DEFAULT_QUEUE_SIZE = 128;
53 private static final String SHUTDOWN = "Shutdown";
54
55 private final BlockingQueue<Serializable> queue;
56 private final boolean blocking;
57 private final Configuration config;
58 private final AppenderRef[] appenderRefs;
59 private final String errorRef;
60 private final boolean includeLocation;
61 private AppenderControl<?> errorAppender;
62 private AsyncThread thread;
63
64 private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
65 final String errorRef, final int queueSize, final boolean blocking,
66 final boolean handleExceptions, final Configuration config,
67 final boolean includeLocation) {
68 super(name, filter, null, handleExceptions);
69 this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
70 this.blocking = blocking;
71 this.config = config;
72 this.appenderRefs = appenderRefs;
73 this.errorRef = errorRef;
74 this.includeLocation = includeLocation;
75 }
76
77 @Override
78 @SuppressWarnings("unchecked")
79 public void start() {
80 final Map<String, Appender<?>> map = config.getAppenders();
81 final List<AppenderControl<?>> appenders = new ArrayList<AppenderControl<?>>();
82 for (final AppenderRef appenderRef : appenderRefs) {
83 if (map.containsKey(appenderRef.getRef())) {
84 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
85 appenderRef.getFilter()));
86 } else {
87 LOGGER.error("No appender named {} was configured", appenderRef);
88 }
89 }
90 if (errorRef != null) {
91 if (map.containsKey(errorRef)) {
92 errorAppender = new AppenderControl(map.get(errorRef), null, null);
93 } else {
94 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
95 }
96 }
97 if (appenders.size() > 0) {
98 thread = new AsyncThread(appenders, queue);
99 } else if (errorRef == null) {
100 throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
101 }
102
103 thread.start();
104 super.start();
105 }
106
107 @Override
108 public void stop() {
109 super.stop();
110 thread.shutdown();
111 try {
112 thread.join();
113 } catch (final InterruptedException ex) {
114 LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
115 }
116 }
117
118
119
120
121
122
123 @Override
124 public void append(final LogEvent event) {
125 if (!isStarted()) {
126 throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
127 }
128 if (event instanceof Log4jLogEvent) {
129 boolean appendSuccessful = false;
130 if (blocking) {
131 try {
132
133 queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
134 appendSuccessful = true;
135 } catch (InterruptedException e) {
136 LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
137 getName());
138 }
139 } else {
140 appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
141 if (!appendSuccessful) {
142 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
143 }
144 }
145 if ((!appendSuccessful) && (errorAppender != null)) {
146 errorAppender.callAppender(event);
147 }
148 }
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 @PluginFactory
167 public static <S extends Serializable> AsyncAppender<S> createAppender(
168 @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
169 @PluginAttr("error-ref") final String errorRef,
170 @PluginAttr("blocking") final String blocking,
171 @PluginAttr("bufferSize") final String size,
172 @PluginAttr("name") final String name,
173 @PluginAttr("includeLocation") final String includeLocation,
174 @PluginElement("filter") final Filter filter,
175 @PluginConfiguration final Configuration config,
176 @PluginAttr("suppressExceptions") final String suppress) {
177 if (name == null) {
178 LOGGER.error("No name provided for AsyncAppender");
179 return null;
180 }
181 if (appenderRefs == null) {
182 LOGGER.error("No appender references provided to AsyncAppender {}", name);
183 }
184
185 final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
186 final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
187 final boolean isIncludeLocation = includeLocation != null && Boolean.parseBoolean(includeLocation);
188
189 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
190
191 return new AsyncAppender<S>(name, filter, appenderRefs, errorRef,
192 queueSize, isBlocking, handleExceptions, config, isIncludeLocation);
193 }
194
195
196
197
198 private class AsyncThread extends Thread {
199
200 private volatile boolean shutdown = false;
201 private final List<AppenderControl<?>> appenders;
202 private final BlockingQueue<Serializable> queue;
203
204 public AsyncThread(final List<AppenderControl<?>> appenders, final BlockingQueue<Serializable> queue) {
205 this.appenders = appenders;
206 this.queue = queue;
207 }
208
209 @Override
210 public void run() {
211 while (!shutdown) {
212 Serializable s;
213 try {
214 s = queue.take();
215 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
216 shutdown = true;
217 continue;
218 }
219 } catch (final InterruptedException ex) {
220
221 continue;
222 }
223 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
224 event.setEndOfBatch(queue.isEmpty());
225 boolean success = false;
226 for (final AppenderControl<?> control : appenders) {
227 try {
228 control.callAppender(event);
229 success = true;
230 } catch (final Exception ex) {
231
232 }
233 }
234 if (!success && errorAppender != null) {
235 try {
236 errorAppender.callAppender(event);
237 } catch (final Exception ex) {
238
239 }
240 }
241 }
242
243 while (!queue.isEmpty()) {
244 try {
245 Serializable s = queue.take();
246 if (s instanceof Log4jLogEvent) {
247 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
248 event.setEndOfBatch(queue.isEmpty());
249 for (final AppenderControl<?> control : appenders) {
250 control.callAppender(event);
251 }
252 }
253 } catch (final InterruptedException ex) {
254
255 }
256 }
257 }
258
259 public void shutdown() {
260 shutdown = true;
261 if (queue.isEmpty()) {
262 queue.offer(SHUTDOWN);
263 }
264 }
265 }
266 }