View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import org.apache.logging.log4j.core.Appender;
20  import org.apache.logging.log4j.core.Filter;
21  import org.apache.logging.log4j.core.Layout;
22  import org.apache.logging.log4j.core.LogEvent;
23  import org.apache.logging.log4j.core.config.AppenderControl;
24  import org.apache.logging.log4j.core.config.AppenderRef;
25  import org.apache.logging.log4j.core.config.Configuration;
26  import org.apache.logging.log4j.core.config.ConfigurationException;
27  import org.apache.logging.log4j.core.config.plugins.Plugin;
28  import org.apache.logging.log4j.core.config.plugins.PluginAttr;
29  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
30  import org.apache.logging.log4j.core.config.plugins.PluginElement;
31  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
32  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
33  
34  import java.io.Serializable;
35  import java.util.ArrayList;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.concurrent.ArrayBlockingQueue;
39  import java.util.concurrent.BlockingQueue;
40  
41  /**
42   * Appends to one or more Appenders asynchronously.  You can configure an
43   * AsyncAppender with one or more Appenders and an Appender to append to if the
44   * queue is full. The AsyncAppender does not allow a filter to be specified on
45   * the Appender references.
46   *
47   * @param <T> The {@link Layout}'s {@link Serializable} type.
48   */
49  @Plugin(name = "Async", category = "Core", elementType = "appender", printObject = true)
50  public final class AsyncAppender<T extends Serializable> extends AbstractAppender<T> {
51  
52      private static final int DEFAULT_QUEUE_SIZE = 128;
53      private static final String SHUTDOWN = "Shutdown";
54  
55      private final BlockingQueue<Serializable> queue;
56      private final boolean blocking;
57      private final Configuration config;
58      private final AppenderRef[] appenderRefs;
59      private final String errorRef;
60      private final boolean includeLocation;
61      private AppenderControl<?> errorAppender;
62      private AsyncThread thread;
63  
64      private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
65                             final String errorRef, final int queueSize, final boolean blocking,
66                             final boolean handleExceptions, final Configuration config,
67                             final boolean includeLocation) {
68          super(name, filter, null, handleExceptions);
69          this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
70          this.blocking = blocking;
71          this.config = config;
72          this.appenderRefs = appenderRefs;
73          this.errorRef = errorRef;
74          this.includeLocation = includeLocation;
75      }
76  
77      @Override
78      @SuppressWarnings("unchecked")
79      public void start() {
80          final Map<String, Appender<?>> map = config.getAppenders();
81          final List<AppenderControl<?>> appenders = new ArrayList<AppenderControl<?>>();
82          for (final AppenderRef appenderRef : appenderRefs) {
83              if (map.containsKey(appenderRef.getRef())) {
84                  appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
85                      appenderRef.getFilter()));
86              } else {
87                  LOGGER.error("No appender named {} was configured", appenderRef);
88              }
89          }
90          if (errorRef != null) {
91              if (map.containsKey(errorRef)) {
92                  errorAppender = new AppenderControl(map.get(errorRef), null, null);
93              } else {
94                  LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
95              }
96          }
97          if (appenders.size() > 0) {
98              thread = new AsyncThread(appenders, queue);
99          } else if (errorRef == null) {
100             throw new ConfigurationException("No appenders are available for AsyncAppender " + getName());
101         }
102 
103         thread.start();
104         super.start();
105     }
106 
107     @Override
108     public void stop() {
109         super.stop();
110         thread.shutdown();
111         try {
112             thread.join();
113         } catch (final InterruptedException ex) {
114             LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName());
115         }
116     }
117 
118     /**
119      * Actual writing occurs here.
120      * <p/>
121      * @param event The LogEvent.
122      */
123     @Override
124     public void append(final LogEvent event) {
125         if (!isStarted()) {
126             throw new IllegalStateException("AsyncAppender " + getName() + " is not active");
127         }
128         if (event instanceof Log4jLogEvent) {
129             boolean appendSuccessful = false;
130             if (blocking) {
131                 try {
132                     // wait for free slots in the queue
133                     queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
134                     appendSuccessful = true;
135                 } catch (InterruptedException e) {
136                     LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}",
137                             getName());
138                 }
139             } else {
140                 appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
141                 if (!appendSuccessful) {
142                     error("Appender " + getName() + " is unable to write primary appenders. queue is full");
143                 }
144             }
145             if ((!appendSuccessful) && (errorAppender != null)) {
146                 errorAppender.callAppender(event);
147             }
148         }
149     }
150 
151     /**
152      * Create an AsyncAppender.
153      * @param appenderRefs The Appenders to reference.
154      * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
155      * @param blocking True if the Appender should wait when the queue is full. The default is true.
156      * @param size The size of the event queue. The default is 128.
157      * @param name The name of the Appender.
158      * @param includeLocation whether to include location information. The default is false.
159      * @param filter The Filter or null.
160      * @param config The Configuration.
161      * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise.
162      * The default is "true".
163      * @param <S> The actual type of the Serializable.
164      * @return The AsyncAppender.
165      */
166     @PluginFactory
167     public static <S extends Serializable> AsyncAppender<S> createAppender(
168                 @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
169                 @PluginAttr("error-ref") final String errorRef,
170                 @PluginAttr("blocking") final String blocking,
171                 @PluginAttr("bufferSize") final String size,
172                 @PluginAttr("name") final String name,
173                 @PluginAttr("includeLocation") final String includeLocation,
174                 @PluginElement("filter") final Filter filter,
175                 @PluginConfiguration final Configuration config,
176                 @PluginAttr("suppressExceptions") final String suppress) {
177         if (name == null) {
178             LOGGER.error("No name provided for AsyncAppender");
179             return null;
180         }
181         if (appenderRefs == null) {
182             LOGGER.error("No appender references provided to AsyncAppender {}", name);
183         }
184 
185         final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
186         final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
187         final boolean isIncludeLocation = includeLocation != null && Boolean.parseBoolean(includeLocation);
188 
189         final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
190 
191         return new AsyncAppender<S>(name, filter, appenderRefs, errorRef,
192                 queueSize, isBlocking, handleExceptions, config, isIncludeLocation);
193     }
194 
195     /**
196      * Thread that calls the Appenders.
197      */
198     private class AsyncThread extends Thread {
199 
200         private volatile boolean shutdown = false;
201         private final List<AppenderControl<?>> appenders;
202         private final BlockingQueue<Serializable> queue;
203 
204         public AsyncThread(final List<AppenderControl<?>> appenders, final BlockingQueue<Serializable> queue) {
205             this.appenders = appenders;
206             this.queue = queue;
207         }
208 
209         @Override
210         public void run() {
211             while (!shutdown) {
212                 Serializable s;
213                 try {
214                     s = queue.take();
215                     if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
216                         shutdown = true;
217                         continue;
218                     }
219                 } catch (final InterruptedException ex) {
220                     // No good reason for this.
221                     continue;
222                 }
223                 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
224                 event.setEndOfBatch(queue.isEmpty());
225                 boolean success = false;
226                 for (final AppenderControl<?> control : appenders) {
227                     try {
228                         control.callAppender(event);
229                         success = true;
230                     } catch (final Exception ex) {
231                         // If no appender is successful the error appender will get it.
232                     }
233                 }
234                 if (!success && errorAppender != null) {
235                     try {
236                         errorAppender.callAppender(event);
237                     } catch (final Exception ex) {
238                         // Silently accept the error.
239                     }
240                 }
241             }
242             // Process any remaining items in the queue.
243             while (!queue.isEmpty()) {
244                 try {
245                     Serializable s = queue.take();
246                     if (s instanceof Log4jLogEvent) {
247                         final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
248                         event.setEndOfBatch(queue.isEmpty());
249                         for (final AppenderControl<?> control : appenders) {
250                             control.callAppender(event);
251                         }
252                     }
253                 } catch (final InterruptedException ex) {
254                     // May have been interrupted to shut down.
255                 }
256             }
257         }
258 
259         public void shutdown() {
260             shutdown = true;
261             if (queue.isEmpty()) {
262                 queue.offer(SHUTDOWN);
263             }
264         }
265     }
266 }