View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import org.apache.logging.log4j.core.Appender;
20  import org.apache.logging.log4j.core.Filter;
21  import org.apache.logging.log4j.core.Layout;
22  import org.apache.logging.log4j.core.LogEvent;
23  import org.apache.logging.log4j.core.config.AppenderControl;
24  import org.apache.logging.log4j.core.config.AppenderRef;
25  import org.apache.logging.log4j.core.config.Configuration;
26  import org.apache.logging.log4j.core.config.ConfigurationException;
27  import org.apache.logging.log4j.core.config.plugins.Plugin;
28  import org.apache.logging.log4j.core.config.plugins.PluginAttr;
29  import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
30  import org.apache.logging.log4j.core.config.plugins.PluginElement;
31  import org.apache.logging.log4j.core.config.plugins.PluginFactory;
32  import org.apache.logging.log4j.core.impl.Log4jLogEvent;
33  
34  import java.io.Serializable;
35  import java.util.ArrayList;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.concurrent.ArrayBlockingQueue;
39  import java.util.concurrent.BlockingQueue;
40  
41  /**
42   * Appends to one or more Appenders asynchronously.  You can configure an AsynchAppender with one
43   * or more Appenders and an Appender to append to if the queue is full. The AsynchAppender does not allow
44   * a filter to be specified on the Appender references.
45   *
46   * @param <T> The {@link Layout}'s {@link Serializable} type.
47   */
48  @Plugin(name = "Asynch", type = "Core", elementType = "appender", printObject = true)
49  public final class AsynchAppender<T extends Serializable> extends AbstractAppender<T> {
50  
51      private static final int DEFAULT_QUEUE_SIZE = 128;
52      private static final String SHUTDOWN = "Shutdown";
53  
54      private final BlockingQueue<Serializable> queue;
55      private final boolean blocking;
56      private final Configuration config;
57      private final AppenderRef[] appenderRefs;
58      private final String errorRef;
59      private final boolean includeLocation;
60      private AppenderControl errorAppender;
61      private AsynchThread thread;
62  
63      private AsynchAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs,
64                             final String errorRef, final int queueSize, final boolean blocking,
65                             final boolean handleExceptions, final Configuration config,
66                             final boolean includeLocation) {
67          super(name, filter, null, handleExceptions);
68          this.queue = new ArrayBlockingQueue<Serializable>(queueSize);
69          this.blocking = blocking;
70          this.config = config;
71          this.appenderRefs = appenderRefs;
72          this.errorRef = errorRef;
73          this.includeLocation = includeLocation;
74      }
75  
76      @Override
77      public void start() {
78          final Map<String, Appender<?>> map = config.getAppenders();
79          final List<AppenderControl> appenders = new ArrayList<AppenderControl>();
80          for (final AppenderRef appenderRef : appenderRefs) {
81              if (map.containsKey(appenderRef.getRef())) {
82                  appenders.add(new AppenderControl(map.get(appenderRef.getRef()), appenderRef.getLevel(),
83                      appenderRef.getFilter()));
84              } else {
85                  LOGGER.error("No appender named {} was configured", appenderRef);
86              }
87          }
88          if (errorRef != null) {
89              if (map.containsKey(errorRef)) {
90                  errorAppender = new AppenderControl(map.get(errorRef), null, null);
91              } else {
92                  LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef);
93              }
94          }
95          if (appenders.size() > 0) {
96              thread = new AsynchThread(appenders, queue);
97          } else if (errorRef == null) {
98              throw new ConfigurationException("No appenders are available for AsynchAppender " + getName());
99          }
100 
101         thread.start();
102         super.start();
103     }
104 
105     @Override
106     public void stop() {
107         super.stop();
108         thread.shutdown();
109         try {
110             thread.join();
111         } catch (final InterruptedException ex) {
112             LOGGER.warn("Interrupted while stopping AsynchAppender {}", getName());
113         }
114     }
115 
116     /**
117      * Actual writing occurs here.
118      * <p/>
119      * @param event The LogEvent.
120      */
121     public void append(final LogEvent event) {
122         if (!isStarted()) {
123             throw new IllegalStateException("AsynchAppender " + getName() + " is not active");
124         }
125         if (event instanceof Log4jLogEvent) {
126             boolean appendSuccessful = false;
127             if (blocking){
128                 try {
129                     // wait for free slots in the queue
130                     queue.put(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
131                     appendSuccessful = true;
132                 } catch (InterruptedException e) {
133                     LOGGER.warn("Interrupted while waiting for a free slots in the LogEvent-queue at the AsynchAppender {}", getName());
134                 }
135             } else {
136                 appendSuccessful = queue.offer(Log4jLogEvent.serialize((Log4jLogEvent) event, includeLocation));
137                 if (!appendSuccessful) {
138                     error("Appender " + getName() + " is unable to write primary appenders. queue is full");
139                 }
140             }
141             if ((!appendSuccessful) && (errorAppender != null)){
142                 errorAppender.callAppender(event);
143             }
144         }
145     }
146 
147     /**
148      * Create an AsynchAppender.
149      * @param appenderRefs The Appenders to reference.
150      * @param errorRef An optional Appender to write to if the queue is full or other errors occur.
151      * @param blocking True if the Appender should wait when the queue is full. The default is true.
152      * @param size The size of the event queue. The default is 128.
153      * @param name The name of the Appender.
154      * @param includeLocation whether to include location information. The default is false.
155      * @param filter The Filter or null.
156      * @param config The Configuration.
157      * @param suppress "true" if exceptions should be hidden from the application, "false" otherwise.
158      * The default is "true".
159      * @param <S> The actual type of the Serializable.
160      * @return The AsynchAppender.
161      */
162     @PluginFactory
163     public static <S extends Serializable> AsynchAppender<S> createAppender(
164                 @PluginElement("appender-ref") final AppenderRef[] appenderRefs,
165                 @PluginAttr("error-ref") final String errorRef,
166                 @PluginAttr("blocking") final String blocking,
167                 @PluginAttr("bufferSize") final String size,
168                 @PluginAttr("name") final String name,
169                 @PluginAttr("includeLocation") final String includeLocation,
170                 @PluginElement("filter") final Filter filter,
171                 @PluginConfiguration final Configuration config,
172                 @PluginAttr("suppressExceptions") final String suppress) {
173         if (name == null) {
174             LOGGER.error("No name provided for AsynchAppender");
175             return null;
176         }
177         if (appenderRefs == null) {
178             LOGGER.error("No appender references provided to AsynchAppender {}", name);
179         }
180 
181         final boolean isBlocking = blocking == null ? true : Boolean.valueOf(blocking);
182         final int queueSize = size == null ? DEFAULT_QUEUE_SIZE : Integer.parseInt(size);
183         final boolean isIncludeLocation = includeLocation == null ? false :
184                 Boolean.parseBoolean(includeLocation);
185 
186         final boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
187 
188         return new AsynchAppender<S>(name, filter, appenderRefs, errorRef, 
189                 queueSize, isBlocking, handleExceptions, config, isIncludeLocation);
190     }
191 
192     /**
193      * Thread that calls the Appenders.
194      */
195     private class AsynchThread extends Thread {
196 
197         private volatile boolean shutdown = false;
198         private final List<AppenderControl> appenders;
199         private final BlockingQueue<Serializable> queue;
200 
201         public AsynchThread(final List<AppenderControl> appenders, final BlockingQueue<Serializable> queue) {
202             this.appenders = appenders;
203             this.queue = queue;
204         }
205 
206         @Override
207         public void run() {
208             while (!shutdown) {
209                 Serializable s;
210                 try {
211                     s = queue.take();
212                     if (s != null && s instanceof String && SHUTDOWN.equals(s.toString())) {
213                         shutdown = true;
214                         continue;
215                     }
216                 } catch (final InterruptedException ex) {
217                     // No good reason for this.
218                     continue;
219                 }
220                 final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
221                 event.setEndOfBatch(queue.isEmpty());
222                 boolean success = false;
223                 for (final AppenderControl<?> control : appenders) {
224                     try {
225                         control.callAppender(event);
226                         success = true;
227                     } catch (final Exception ex) {
228                         // If no appender is successful the error appender will get it.
229                     }
230                 }
231                 if (!success && errorAppender != null) {
232                     try {
233                         errorAppender.callAppender(event);
234                     } catch (final Exception ex) {
235                         // Silently accept the error.
236                     }
237                 }
238             }
239             // Process any remaining items in the queue.
240             while (!queue.isEmpty()) {
241                 try {
242                     Serializable s = queue.take();
243                     if (s instanceof Log4jLogEvent) {
244                         final Log4jLogEvent event = Log4jLogEvent.deserialize(s);
245                         event.setEndOfBatch(queue.isEmpty());
246                         for (final AppenderControl<?> control : appenders) {
247                             control.callAppender(event);
248                         }
249                     }
250                 } catch (final InterruptedException ex) {
251                     // May have been interrupted to shut down.
252                 }
253             }
254         }
255 
256         public void shutdown() {
257             shutdown = true;
258             if (queue.isEmpty()) {
259                 queue.offer(SHUTDOWN);
260             }
261         }
262     }
263 }