View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.nio.ByteBuffer;
23  import java.util.Objects;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.logging.log4j.core.Layout;
27  import org.apache.logging.log4j.core.LoggerContext;
28  import org.apache.logging.log4j.core.layout.ByteBufferDestination;
29  import org.apache.logging.log4j.core.util.Constants;
30  
31  /**
32   * Manages an OutputStream so that it can be shared by multiple Appenders and will
33   * allow appenders to reconfigure without requiring a new stream.
34   */
35  public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
36      protected final Layout<?> layout;
37      protected ByteBuffer byteBuffer;
38      private volatile OutputStream os;
39      private boolean skipFooter;
40  
41      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
42              final boolean writeHeader) {
43          // Can't use new ctor because it throws an exception
44          this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
45      }
46  
47      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
48              final boolean writeHeader, final int bufferSize) {
49          // Can't use new ctor because it throws an exception
50          this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
51      }
52  
53      /**
54       * @since 2.6
55       * @deprecated
56       */
57      @Deprecated
58      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
59              final boolean writeHeader, final ByteBuffer byteBuffer) {
60          super(null, streamName);
61          this.os = os;
62          this.layout = layout;
63          if (writeHeader && layout != null) {
64              final byte[] header = layout.getHeader();
65              if (header != null) {
66                  try {
67                      getOutputStream().write(header, 0, header.length);
68                  } catch (final IOException e) {
69                      logError("Unable to write header", e);
70                  }
71              }
72          }
73          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
74      }
75  
76      /**
77       * @since 2.7
78       */
79      protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
80              final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
81              final ByteBuffer byteBuffer) {
82          super(loggerContext, streamName);
83          if (createOnDemand && os != null) {
84              LOGGER.error(
85                      "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
86                      streamName);
87          }
88          this.layout = layout;
89          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
90          this.os = os;
91          if (writeHeader && layout != null) {
92              final byte[] header = layout.getHeader();
93              if (header != null) {
94                  try {
95                      getOutputStream().write(header, 0, header.length);
96                  } catch (final IOException e) {
97                      logError("Unable to write header for " + streamName, e);
98                  }
99              }
100         }
101     }
102 
103     /**
104      * Creates a Manager.
105      *
106      * @param name The name of the stream to manage.
107      * @param data The data to pass to the Manager.
108      * @param factory The factory to use to create the Manager.
109      * @param <T> The type of the OutputStreamManager.
110      * @return An OutputStreamManager.
111      */
112     public static <T> OutputStreamManager getManager(final String name, final T data,
113                                                  final ManagerFactory<? extends OutputStreamManager, T> factory) {
114         return AbstractManager.getManager(name, factory, data);
115     }
116 
117     @SuppressWarnings("unused")
118     protected OutputStream createOutputStream() throws IOException {
119         throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
120     }
121     
122     /**
123      * Indicate whether the footer should be skipped or not.
124      * @param skipFooter true if the footer should be skipped.
125      */
126     public void skipFooter(final boolean skipFooter) {
127         this.skipFooter = skipFooter;
128     }
129 
130     /**
131      * Default hook to write footer during close.
132      */
133     @Override
134     public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
135         writeFooter();
136         return closeOutputStream();
137     }
138 
139     /**
140      * Writes the footer.
141      */
142     protected void writeFooter() {
143         if (layout == null || skipFooter) {
144             return;
145         }
146         final byte[] footer = layout.getFooter();
147         if (footer != null) {
148             write(footer);
149         }
150     }
151 
152     /**
153      * Returns the status of the stream.
154      * @return true if the stream is open, false if it is not.
155      */
156     public boolean isOpen() {
157         return getCount() > 0;
158     }
159 
160     protected OutputStream getOutputStream() throws IOException {
161         if (os == null) {
162             os = createOutputStream();
163         }
164         return os;
165     }
166 
167     protected void setOutputStream(final OutputStream os) {
168         final byte[] header = layout.getHeader();
169         if (header != null) {
170             try {
171                 os.write(header, 0, header.length);
172                 this.os = os; // only update field if os.write() succeeded
173             } catch (final IOException ioe) {
174                 logError("Unable to write header", ioe);
175             }
176         } else {
177             this.os = os;
178         }
179     }
180 
181     /**
182      * Some output streams synchronize writes while others do not.
183      * @param bytes The serialized Log event.
184      * @throws AppenderLoggingException if an error occurs.
185      */
186     protected void write(final byte[] bytes)  {
187         write(bytes, 0, bytes.length, false);
188     }
189 
190     /**
191      * Some output streams synchronize writes while others do not.
192      * @param bytes The serialized Log event.
193      * @param immediateFlush If true, flushes after writing.
194      * @throws AppenderLoggingException if an error occurs.
195      */
196     protected void write(final byte[] bytes, final boolean immediateFlush)  {
197         write(bytes, 0, bytes.length, immediateFlush);
198     }
199 
200     /**
201      * Some output streams synchronize writes while others do not. Synchronizing here insures that
202      * log events won't be intertwined.
203      * @param bytes The serialized Log event.
204      * @param offset The offset into the byte array.
205      * @param length The number of bytes to write.
206      * @throws AppenderLoggingException if an error occurs.
207      */
208     protected void write(final byte[] bytes, final int offset, final int length) {
209         write(bytes, offset, length, false);
210     }
211 
212     /**
213      * Some output streams synchronize writes while others do not. Synchronizing here insures that
214      * log events won't be intertwined.
215      * @param bytes The serialized Log event.
216      * @param offset The offset into the byte array.
217      * @param length The number of bytes to write.
218      * @param immediateFlush flushes immediately after writing.
219      * @throws AppenderLoggingException if an error occurs.
220      */
221     protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
222         if (immediateFlush && byteBuffer.position() == 0) {
223             writeToDestination(bytes, offset, length);
224             flushDestination();
225             return;
226         }
227         if (length >= byteBuffer.capacity()) {
228             // if request length exceeds buffer capacity, flush the buffer and write the data directly
229             flush();
230             writeToDestination(bytes, offset, length);
231         } else {
232             if (length > byteBuffer.remaining()) {
233                 flush();
234             }
235             byteBuffer.put(bytes, offset, length);
236         }
237         if (immediateFlush) {
238             flush();
239         }
240     }
241 
242     /**
243      * Writes the specified section of the specified byte array to the stream.
244      *
245      * @param bytes the array containing data
246      * @param offset from where to write
247      * @param length how many bytes to write
248      * @since 2.6
249      */
250     protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
251         try {
252             getOutputStream().write(bytes, offset, length);
253         } catch (final IOException ex) {
254             throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
255         }
256     }
257 
258     /**
259      * Calls {@code flush()} on the underlying output stream.
260      * @since 2.6
261      */
262     protected synchronized void flushDestination() {
263         final OutputStream stream = os; // access volatile field only once per method
264         if (stream != null) {
265             try {
266                 stream.flush();
267             } catch (final IOException ex) {
268                 throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
269             }
270         }
271     }
272 
273     /**
274      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
275      * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
276      * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
277      *
278      * @see #flushDestination()
279      * @since 2.6
280      */
281     protected synchronized void flushBuffer(final ByteBuffer buf) {
282         buf.flip();
283         if (buf.limit() > 0) {
284             writeToDestination(buf.array(), 0, buf.limit());
285         }
286         buf.clear();
287     }
288 
289     /**
290      * Flushes any buffers.
291      */
292     public synchronized void flush() {
293         flushBuffer(byteBuffer);
294         flushDestination();
295     }
296 
297     protected synchronized boolean closeOutputStream() {
298         flush();
299         final OutputStream stream = os; // access volatile field only once per method
300         if (stream == null || stream == System.out || stream == System.err) {
301             return true;
302         }
303         try {
304             stream.close();
305         } catch (final IOException ex) {
306             logError("Unable to close stream", ex);
307             return false;
308         }
309         return true;
310     }
311 
312     /**
313      * Returns this {@code ByteBufferDestination}'s buffer.
314      * @return the buffer
315      * @since 2.6
316      */
317     @Override
318     public ByteBuffer getByteBuffer() {
319         return byteBuffer;
320     }
321 
322     /**
323      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
324      * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
325      * <p>
326      * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
327      * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
328      * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
329      * buffer.
330      * </p><p>
331      * To just flush the buffered contents to the underlying stream, call
332      * {@link #flushBuffer(ByteBuffer)} directly instead.
333      * </p>
334      *
335      * @param buf the buffer whose contents to write the the destination
336      * @return the specified buffer
337      * @since 2.6
338      */
339     @Override
340     public ByteBuffer drain(final ByteBuffer buf) {
341         flushBuffer(buf);
342         return buf;
343     }
344 }