1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import org.apache.logging.log4j.core.Appender;
20 import org.apache.logging.log4j.core.Filter;
21 import org.apache.logging.log4j.core.Layout;
22 import org.apache.logging.log4j.core.LogEvent;
23 import org.apache.logging.log4j.core.config.AppenderControl;
24 import org.apache.logging.log4j.core.config.AppenderRef;
25 import org.apache.logging.log4j.core.config.Configuration;
26 import org.apache.logging.log4j.core.config.ConfigurationException;
27 import org.apache.logging.log4j.core.config.plugins.Plugin;
28 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
29 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
30 import org.apache.logging.log4j.core.config.plugins.PluginElement;
31 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
32 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
33
34 import java.io.Serializable;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.ArrayBlockingQueue;
39 import java.util.concurrent.BlockingQueue;
40
41
42
43
44
45
46
47
48 @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true)
49 public final class AsynchAppender<T extends Serializable> extends AbstractAppender<T> {
50
51 private static final int DEFAULT_QUEUE_SIZE = 128;
52 private static final String SHUTDOWN = "Shutdown";
53
54 private final BlockingQueue<Serializable> queue;
55 private final boolean blocking;
56 private final Configuration config;
57 private final AppenderRef[] appenderRefs;
58 private final String errorRef;
59 private final boolean includeLocation;
60 private AppenderControl errorAppender;
61 private AsynchThread thread;
62
63 private AsynchAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
64 final String errorRef, final int queueSize, final boolean blocking,
65 final boolean handleExceptions, final Configuration config,
66 final boolean includeLocation) {
67 super(name, filter, null, handleExceptions);
68 this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
69 this.blocking = blocking;
70 this.config = config;
71 this.appenderRefs = appenderRefs;
72 this.errorRef = errorRef;
73 this.includeLocation = includeLocation;
74 }
75
76 @Override
77 public void start() {
78 final Map<String, Appender<?>> map = config.getAppenders();
79 final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
80 for (final AppenderRef appenderRef : appenderRefs) {
81 if (map.containsKey(appenderRef.getRef())) {
82 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
83 appenderRef.getFilter()));
84 } else {
85 LOGGER.error("No appender named {} was configured", appenderRef);
86 }
87 }
88 if (errorRef != null) {
89 if (map.containsKey(errorRef)) {
90 errorAppender = new AppenderControl(map.get(errorRef), null, null);
91 } else {
92 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
93 }
94 }
95 if (appenders.size() > 0) {
96 thread = new AsynchThread(appenders, queue);
97 } else if (errorRef == null) {
98 throw new ConfigurationException("No appenders are available for AsynchAppender " + getName());
99 }
100
101 thread.start();
102 super.start();
103 }
104
105 @Override
106 public void stop() {
107 super.stop();
108 thread.shutdown();
109 try {
110 thread.join();
111 } catch (final InterruptedException ex) {
112 LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName());
113 }
114 }
115
116
117
118
119
120
121 public void append(final LogEvent event) {
122 if (!isStarted()) {
123 throw new IllegalStateException("AsynchAppender " + getName() + " is not active");
124 }
125 if (event instanceof Log4jLogEvent) {
126 boolean appendSuccessful = false;
127 if (blocking){
128 try {
129
130 queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
131 appendSuccessful = true;
132 } catch (InterruptedException e) {
133 LOGGER.warn("Interrupted while waiting for a free slots in the LogEvent-queue at the AsynchAppender {}", getName());
134 }
135 } else {
136 appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
137 if (!appendSuccessful) {
138 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
139 }
140 }
141 if ((!appendSuccessful) && (errorAppender != null)){
142 errorAppender.callAppender(event);
143 }
144 }
145 }
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 @PluginFactory
163 public static <S extends Serializable> AsynchAppender<S> createAppender(
164 @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
165 @PluginAttr("error-ref") final String errorRef,
166 @PluginAttr("blocking") final String blocking,
167 @PluginAttr("bufferSize") final String size,
168 @PluginAttr("name") final String name,
169 @PluginAttr("includeLocation") final String includeLocation,
170 @PluginElement("filter") final Filter filter,
171 @PluginConfiguration final Configuration config,
172 @PluginAttr("suppressExceptions") final String suppress) {
173 if (name == null) {
174 LOGGER.error("No name provided for AsynchAppender");
175 return null;
176 }
177 if (appenderRefs == null) {
178 LOGGER.error("No appender references provided to AsynchAppender {}", name);
179 }
180
181 final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
182 final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
183 final boolean isIncludeLocation = includeLocation == null ? false :
184 Boolean.parseBoolean(includeLocation);
185
186 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
187
188 return new AsynchAppender<S>(name, filter, appenderRefs, errorRef,
189 queueSize, isBlocking, handleExceptions, config, isIncludeLocation);
190 }
191
192
193
194
195 private class AsynchThread extends Thread {
196
197 private volatile boolean shutdown = false;
198 private final List<AppenderControl> appenders;
199 private final BlockingQueue<Serializable> queue;
200
201 public AsynchThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
202 this.appenders = appenders;
203 this.queue = queue;
204 }
205
206 @Override
207 public void run() {
208 while (!shutdown) {
209 Serializable s;
210 try {
211 s = queue.take();
212 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
213 shutdown = true;
214 continue;
215 }
216 } catch (final InterruptedException ex) {
217
218 continue;
219 }
220 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
221 event.setEndOfBatch(queue.isEmpty());
222 boolean success = false;
223 for (final AppenderControl<?> control : appenders) {
224 try {
225 control.callAppender(event);
226 success = true;
227 } catch (final Exception ex) {
228
229 }
230 }
231 if (!success && errorAppender != null) {
232 try {
233 errorAppender.callAppender(event);
234 } catch (final Exception ex) {
235
236 }
237 }
238 }
239
240 while (!queue.isEmpty()) {
241 try {
242 Serializable s = queue.take();
243 if (s instanceof Log4jLogEvent) {
244 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
245 event.setEndOfBatch(queue.isEmpty());
246 for (final AppenderControl<?> control : appenders) {
247 control.callAppender(event);
248 }
249 }
250 } catch (final InterruptedException ex) {
251
252 }
253 }
254 }
255
256 public void shutdown() {
257 shutdown = true;
258 if (queue.isEmpty()) {
259 queue.offer(SHUTDOWN);
260 }
261 }
262 }
263 }