View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.nio.Buffer;
23  import java.nio.ByteBuffer;
24  import java.util.Objects;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.logging.log4j.core.Layout;
28  import org.apache.logging.log4j.core.LoggerContext;
29  import org.apache.logging.log4j.core.layout.ByteBufferDestination;
30  import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
31  import org.apache.logging.log4j.core.util.Constants;
32  
33  /**
34   * Manages an OutputStream so that it can be shared by multiple Appenders and will
35   * allow appenders to reconfigure without requiring a new stream.
36   */
37  public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
38      protected final Layout<?> layout;
39      protected ByteBuffer byteBuffer;
40      private volatile OutputStream outputStream;
41      private boolean skipFooter;
42  
43      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
44              final boolean writeHeader) {
45          this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
46      }
47  
48      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
49              final boolean writeHeader, final int bufferSize) {
50          this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
51      }
52  
53      /**
54       * @since 2.6
55       * @deprecated
56       */
57      @Deprecated
58      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
59              final boolean writeHeader, final ByteBuffer byteBuffer) {
60          super(null, streamName);
61          this.outputStream = os;
62          this.layout = layout;
63          if (writeHeader) {
64              writeHeader(os);
65          }
66          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
67      }
68  
69      /**
70       * @since 2.7
71       */
72      protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
73              final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
74              final ByteBuffer byteBuffer) {
75          super(loggerContext, streamName);
76          if (createOnDemand && os != null) {
77              LOGGER.error(
78                      "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
79                      streamName);
80          }
81          this.layout = layout;
82          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
83          this.outputStream = os;
84          if (writeHeader) {
85              writeHeader(os);
86          }
87      }
88  
89      /**
90       * Creates a Manager.
91       *
92       * @param name The name of the stream to manage.
93       * @param data The data to pass to the Manager.
94       * @param factory The factory to use to create the Manager.
95       * @param <T> The type of the OutputStreamManager.
96       * @return An OutputStreamManager.
97       */
98      public static <T> OutputStreamManager getManager(final String name, final T data,
99                                                   final ManagerFactory<? extends OutputStreamManager, T> factory) {
100         return AbstractManager.getManager(name, factory, data);
101     }
102 
103     @SuppressWarnings("unused")
104     protected OutputStream createOutputStream() throws IOException {
105         throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
106     }
107 
108     /**
109      * Indicate whether the footer should be skipped or not.
110      * @param skipFooter true if the footer should be skipped.
111      */
112     public void skipFooter(final boolean skipFooter) {
113         this.skipFooter = skipFooter;
114     }
115 
116     /**
117      * Default hook to write footer during close.
118      */
119     @Override
120     public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
121         writeFooter();
122         return closeOutputStream();
123     }
124 
125     protected void writeHeader(OutputStream os) {
126         if (layout != null && os != null) {
127             final byte[] header = layout.getHeader();
128             if (header != null) {
129                 try {
130                     os.write(header, 0, header.length);
131                 } catch (final IOException e) {
132                     logError("Unable to write header", e);
133                 }
134             }
135         }
136     }
137 
138     /**
139      * Writes the footer.
140      */
141     protected void writeFooter() {
142         if (layout == null || skipFooter) {
143             return;
144         }
145         final byte[] footer = layout.getFooter();
146         if (footer != null) {
147             write(footer);
148         }
149     }
150 
151     /**
152      * Returns the status of the stream.
153      * @return true if the stream is open, false if it is not.
154      */
155     public boolean isOpen() {
156         return getCount() > 0;
157     }
158 
159     public boolean hasOutputStream() {
160         return outputStream != null;
161     }
162 
163     protected OutputStream getOutputStream() throws IOException {
164         if (outputStream == null) {
165             outputStream = createOutputStream();
166         }
167         return outputStream;
168     }
169 
170     protected void setOutputStream(final OutputStream os) {
171         this.outputStream = os;
172     }
173 
174     /**
175      * Some output streams synchronize writes while others do not.
176      * @param bytes The serialized Log event.
177      * @throws AppenderLoggingException if an error occurs.
178      */
179     protected void write(final byte[] bytes)  {
180         write(bytes, 0, bytes.length, false);
181     }
182 
183     /**
184      * Some output streams synchronize writes while others do not.
185      * @param bytes The serialized Log event.
186      * @param immediateFlush If true, flushes after writing.
187      * @throws AppenderLoggingException if an error occurs.
188      */
189     protected void write(final byte[] bytes, final boolean immediateFlush)  {
190         write(bytes, 0, bytes.length, immediateFlush);
191     }
192 
193     @Override
194     public void writeBytes(final byte[] data, final int offset, final int length) {
195         write(data, offset, length, false);
196     }
197 
198     /**
199      * Some output streams synchronize writes while others do not. Synchronizing here insures that
200      * log events won't be intertwined.
201      * @param bytes The serialized Log event.
202      * @param offset The offset into the byte array.
203      * @param length The number of bytes to write.
204      * @throws AppenderLoggingException if an error occurs.
205      */
206     protected void write(final byte[] bytes, final int offset, final int length) {
207         writeBytes(bytes, offset, length);
208     }
209 
210     /**
211      * Some output streams synchronize writes while others do not. Synchronizing here insures that
212      * log events won't be intertwined.
213      * @param bytes The serialized Log event.
214      * @param offset The offset into the byte array.
215      * @param length The number of bytes to write.
216      * @param immediateFlush flushes immediately after writing.
217      * @throws AppenderLoggingException if an error occurs.
218      */
219     protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
220         if (immediateFlush && byteBuffer.position() == 0) {
221             writeToDestination(bytes, offset, length);
222             flushDestination();
223             return;
224         }
225         if (length >= byteBuffer.capacity()) {
226             // if request length exceeds buffer capacity, flush the buffer and write the data directly
227             flush();
228             writeToDestination(bytes, offset, length);
229         } else {
230             if (length > byteBuffer.remaining()) {
231                 flush();
232             }
233             byteBuffer.put(bytes, offset, length);
234         }
235         if (immediateFlush) {
236             flush();
237         }
238     }
239 
240     /**
241      * Writes the specified section of the specified byte array to the stream.
242      *
243      * @param bytes the array containing data
244      * @param offset from where to write
245      * @param length how many bytes to write
246      * @since 2.6
247      */
248     protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
249         try {
250             getOutputStream().write(bytes, offset, length);
251         } catch (final IOException ex) {
252             throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
253         }
254     }
255 
256     /**
257      * Calls {@code flush()} on the underlying output stream.
258      * @since 2.6
259      */
260     protected synchronized void flushDestination() {
261         final OutputStream stream = outputStream; // access volatile field only once per method
262         if (stream != null) {
263             try {
264                 stream.flush();
265             } catch (final IOException ex) {
266                 throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
267             }
268         }
269     }
270 
271     /**
272      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
273      * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
274      * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
275      *
276      * @see #flushDestination()
277      * @since 2.6
278      */
279     protected synchronized void flushBuffer(final ByteBuffer buf) {
280         ((Buffer) buf).flip();
281         if (buf.remaining() > 0) {
282             writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
283         }
284         buf.clear();
285     }
286 
287     /**
288      * Flushes any buffers.
289      */
290     public synchronized void flush() {
291         flushBuffer(byteBuffer);
292         flushDestination();
293     }
294 
295     protected synchronized boolean closeOutputStream() {
296         flush();
297         final OutputStream stream = outputStream; // access volatile field only once per method
298         if (stream == null || stream == System.out || stream == System.err) {
299             return true;
300         }
301         try {
302             stream.close();
303             LOGGER.debug("OutputStream closed");
304         } catch (final IOException ex) {
305             logError("Unable to close stream", ex);
306             return false;
307         }
308         return true;
309     }
310 
311     /**
312      * Returns this {@code ByteBufferDestination}'s buffer.
313      * @return the buffer
314      * @since 2.6
315      */
316     @Override
317     public ByteBuffer getByteBuffer() {
318         return byteBuffer;
319     }
320 
321     /**
322      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
323      * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
324      * <p>
325      * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
326      * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
327      * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
328      * buffer.
329      * </p><p>
330      * To just flush the buffered contents to the underlying stream, call
331      * {@link #flushBuffer(ByteBuffer)} directly instead.
332      * </p>
333      *
334      * @param buf the buffer whose contents to write the the destination
335      * @return the specified buffer
336      * @since 2.6
337      */
338     @Override
339     public ByteBuffer drain(final ByteBuffer buf) {
340         flushBuffer(buf);
341         return buf;
342     }
343 
344     @Override
345     public void writeBytes(final ByteBuffer data) {
346         if (data.remaining() == 0) {
347           return;
348         }
349         synchronized (this) {
350           ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
351         }
352     }
353 }