1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.logging.log4j.core.appender;
18
19 import org.apache.logging.log4j.core.Appender;
20 import org.apache.logging.log4j.core.Filter;
21 import org.apache.logging.log4j.core.Layout;
22 import org.apache.logging.log4j.core.LogEvent;
23 import org.apache.logging.log4j.core.config.AppenderControl;
24 import org.apache.logging.log4j.core.config.AppenderRef;
25 import org.apache.logging.log4j.core.config.Configuration;
26 import org.apache.logging.log4j.core.config.ConfigurationException;
27 import org.apache.logging.log4j.core.config.plugins.Plugin;
28 import org.apache.logging.log4j.core.config.plugins.PluginAttr;
29 import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
30 import org.apache.logging.log4j.core.config.plugins.PluginElement;
31 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
32 import org.apache.logging.log4j.core.impl.Log4jLogEvent;
33
34 import java.io.Serializable;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.ArrayBlockingQueue;
39 import java.util.concurrent.BlockingQueue;
40
41
42
43
44
45
46
47
48 @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true)
49 public final class AsynchAppender<T extends Serializable> extends AbstractAppender<T> {
50
51 private static final int DEFAULT_QUEUE_SIZE = 128;
52 private static final String SHUTDOWN = "Shutdown";
53
54 private final BlockingQueue<Serializable> queue;
55 private final boolean blocking;
56 private final Configuration config;
57 private final AppenderRef[] appenderRefs;
58 private final String errorRef;
59 private AppenderControl errorAppender;
60 private AsynchThread thread;
61
62 private AsynchAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
63 final String errorRef, final int queueSize, final boolean blocking,
64 final boolean handleExceptions, final Configuration config) {
65 super(name, filter, null, handleExceptions);
66 this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
67 this.blocking = blocking;
68 this.config = config;
69 this.appenderRefs = appenderRefs;
70 this.errorRef = errorRef;
71 }
72
73 @Override
74 public void start() {
75 final Map<String, Appender<?>> map = config.getAppenders();
76 final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
77 for (final AppenderRef appenderRef : appenderRefs) {
78 if (map.containsKey(appenderRef.getRef())) {
79 appenders.add(new AppenderControl(map.get(appenderRef.getRef()), null, null));
80 } else {
81 LOGGER.error("No appender named {} was configured", appenderRef);
82 }
83 }
84 if (errorRef != null) {
85 if (map.containsKey(errorRef)) {
86 errorAppender = new AppenderControl(map.get(errorRef), null, null);
87 } else {
88 LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
89 }
90 }
91 if (appenders.size() > 0) {
92 thread = new AsynchThread(appenders, queue);
93 } else if (errorRef == null) {
94 throw new ConfigurationException("No appenders are available for AsynchAppender " + getName());
95 }
96
97 thread.start();
98 super.start();
99 }
100
101 @Override
102 public void stop() {
103 super.stop();
104 thread.shutdown();
105 try {
106 thread.join();
107 } catch (final InterruptedException ex) {
108 LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName());
109 }
110 }
111
112
113
114
115
116
117 public void append(final LogEvent event) {
118 if (!isStarted()) {
119 throw new IllegalStateException("AsynchAppender " + getName() + " is not active");
120 }
121 if (event instanceof Log4jLogEvent) {
122 if (blocking && queue.remainingCapacity() > 0) {
123 try {
124 queue.add(Log4jLogEvent.serialize((Log4jLogEvent) event));
125 return;
126 } catch (final IllegalStateException ex) {
127 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
128 }
129 }
130 if (errorAppender != null) {
131 if (!blocking) {
132 error("Appender " + getName() + " is unable to write primary appenders. queue is full");
133 }
134 errorAppender.callAppender(event);
135 }
136 }
137 }
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 @PluginFactory
154 public static <S extends Serializable> AsynchAppender<S> createAppender(
155 @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
156 @PluginAttr("error-ref") final String errorRef,
157 @PluginAttr("blocking") final String blocking,
158 @PluginAttr("bufferSize") final String size,
159 @PluginAttr("name") final String name,
160 @PluginElement("filter") final Filter filter,
161 @PluginConfiguration final Configuration config,
162 @PluginAttr("suppressExceptions") final String suppress) {
163 if (name == null) {
164 LOGGER.error("No name provided for AsynchAppender");
165 return null;
166 }
167 if (appenderRefs == null) {
168 LOGGER.error("No appender references provided to AsynchAppender {}", name);
169 }
170
171 final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
172 final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
173
174 final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
175
176 return new AsynchAppender<S>(name, filter, appenderRefs, errorRef, queueSize, isBlocking, handleExceptions,
177 config);
178 }
179
180
181
182
183 private class AsynchThread extends Thread {
184
185 private volatile boolean shutdown = false;
186 private final List<AppenderControl> appenders;
187 private final BlockingQueue<Serializable> queue;
188
189 public AsynchThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
190 this.appenders = appenders;
191 this.queue = queue;
192 }
193
194 @Override
195 public void run() {
196 while (!shutdown) {
197 Serializable s;
198 try {
199 s = queue.take();
200 if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
201 shutdown = true;
202 continue;
203 }
204 } catch (final InterruptedException ex) {
205
206 continue;
207 }
208 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
209 boolean success = false;
210 for (final AppenderControl control : appenders) {
211 try {
212 control.callAppender(event);
213 success = true;
214 } catch (final Exception ex) {
215
216 }
217 }
218 if (!success && errorAppender != null) {
219 try {
220 errorAppender.callAppender(event);
221 } catch (final Exception ex) {
222
223 }
224 }
225 }
226
227 while (!queue.isEmpty()) {
228 try {
229 final Log4jLogEvent event = Log4jLogEvent.deserialize(queue.take());
230 for (final AppenderControl control : appenders) {
231 control.callAppender(event);
232 }
233 } catch (final InterruptedException ex) {
234
235 }
236 }
237 }
238
239 public void shutdown() {
240 shutdown = true;
241 if (queue.isEmpty()) {
242 queue.offer(SHUTDOWN);
243 }
244 }
245 }
246 }