View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache license, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License. You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the license for the specific language governing permissions and
15   * limitations under the license.
16   */
17  package org.apache.logging.log4j.core.appender;
18  
19  import java.io.IOException;
20  import java.io.OutputStream;
21  import java.io.Serializable;
22  import java.nio.Buffer;
23  import java.nio.ByteBuffer;
24  import java.util.Objects;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.apache.logging.log4j.core.Layout;
28  import org.apache.logging.log4j.core.LoggerContext;
29  import org.apache.logging.log4j.core.layout.ByteBufferDestination;
30  import org.apache.logging.log4j.core.layout.ByteBufferDestinationHelper;
31  import org.apache.logging.log4j.core.util.Constants;
32  
33  /**
34   * Manages an OutputStream so that it can be shared by multiple Appenders and will
35   * allow appenders to reconfigure without requiring a new stream.
36   */
37  public class OutputStreamManager extends AbstractManager implements ByteBufferDestination {
38      protected final Layout<?> layout;
39      protected ByteBuffer byteBuffer;
40      private volatile OutputStream outputStream;
41      private boolean skipFooter;
42  
43      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
44              final boolean writeHeader) {
45          // Can't use new ctor because it throws an exception
46          this(os, streamName, layout, writeHeader, Constants.ENCODER_BYTE_BUFFER_SIZE);
47      }
48  
49      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
50              final boolean writeHeader, final int bufferSize) {
51          // Can't use new ctor because it throws an exception
52          this(os, streamName, layout, writeHeader, ByteBuffer.wrap(new byte[bufferSize]));
53      }
54  
55      /**
56       * @since 2.6
57       * @deprecated
58       */
59      @Deprecated
60      protected OutputStreamManager(final OutputStream os, final String streamName, final Layout<?> layout,
61              final boolean writeHeader, final ByteBuffer byteBuffer) {
62          super(null, streamName);
63          this.outputStream = os;
64          this.layout = layout;
65          if (writeHeader && layout != null) {
66              final byte[] header = layout.getHeader();
67              if (header != null) {
68                  try {
69                      getOutputStream().write(header, 0, header.length);
70                  } catch (final IOException e) {
71                      logError("Unable to write header", e);
72                  }
73              }
74          }
75          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
76      }
77  
78      /**
79       * @since 2.7
80       */
81      protected OutputStreamManager(final LoggerContext loggerContext, final OutputStream os, final String streamName,
82              final boolean createOnDemand, final Layout<? extends Serializable> layout, final boolean writeHeader,
83              final ByteBuffer byteBuffer) {
84          super(loggerContext, streamName);
85          if (createOnDemand && os != null) {
86              LOGGER.error(
87                      "Invalid OutputStreamManager configuration for '{}': You cannot both set the OutputStream and request on-demand.",
88                      streamName);
89          }
90          this.layout = layout;
91          this.byteBuffer = Objects.requireNonNull(byteBuffer, "byteBuffer");
92          this.outputStream = os;
93          if (writeHeader && layout != null) {
94              final byte[] header = layout.getHeader();
95              if (header != null) {
96                  try {
97                      getOutputStream().write(header, 0, header.length);
98                  } catch (final IOException e) {
99                      logError("Unable to write header for " + streamName, e);
100                 }
101             }
102         }
103     }
104 
105     /**
106      * Creates a Manager.
107      *
108      * @param name The name of the stream to manage.
109      * @param data The data to pass to the Manager.
110      * @param factory The factory to use to create the Manager.
111      * @param <T> The type of the OutputStreamManager.
112      * @return An OutputStreamManager.
113      */
114     public static <T> OutputStreamManager getManager(final String name, final T data,
115                                                  final ManagerFactory<? extends OutputStreamManager, T> factory) {
116         return AbstractManager.getManager(name, factory, data);
117     }
118 
119     @SuppressWarnings("unused")
120     protected OutputStream createOutputStream() throws IOException {
121         throw new IllegalStateException(getClass().getCanonicalName() + " must implement createOutputStream()");
122     }
123 
124     /**
125      * Indicate whether the footer should be skipped or not.
126      * @param skipFooter true if the footer should be skipped.
127      */
128     public void skipFooter(final boolean skipFooter) {
129         this.skipFooter = skipFooter;
130     }
131 
132     /**
133      * Default hook to write footer during close.
134      */
135     @Override
136     public boolean releaseSub(final long timeout, final TimeUnit timeUnit) {
137         writeFooter();
138         return closeOutputStream();
139     }
140 
141     /**
142      * Writes the footer.
143      */
144     protected void writeFooter() {
145         if (layout == null || skipFooter) {
146             return;
147         }
148         final byte[] footer = layout.getFooter();
149         if (footer != null) {
150             write(footer);
151         }
152     }
153 
154     /**
155      * Returns the status of the stream.
156      * @return true if the stream is open, false if it is not.
157      */
158     public boolean isOpen() {
159         return getCount() > 0;
160     }
161 
162     public boolean hasOutputStream() {
163         return outputStream != null;
164     }
165 
166     protected OutputStream getOutputStream() throws IOException {
167         if (outputStream == null) {
168             outputStream = createOutputStream();
169         }
170         return outputStream;
171     }
172 
173     protected void setOutputStream(final OutputStream os) {
174         final byte[] header = layout.getHeader();
175         if (header != null) {
176             try {
177                 os.write(header, 0, header.length);
178                 this.outputStream = os; // only update field if os.write() succeeded
179             } catch (final IOException ioe) {
180                 logError("Unable to write header", ioe);
181             }
182         } else {
183             this.outputStream = os;
184         }
185     }
186 
187     /**
188      * Some output streams synchronize writes while others do not.
189      * @param bytes The serialized Log event.
190      * @throws AppenderLoggingException if an error occurs.
191      */
192     protected void write(final byte[] bytes)  {
193         write(bytes, 0, bytes.length, false);
194     }
195 
196     /**
197      * Some output streams synchronize writes while others do not.
198      * @param bytes The serialized Log event.
199      * @param immediateFlush If true, flushes after writing.
200      * @throws AppenderLoggingException if an error occurs.
201      */
202     protected void write(final byte[] bytes, final boolean immediateFlush)  {
203         write(bytes, 0, bytes.length, immediateFlush);
204     }
205 
206     @Override
207     public void writeBytes(final byte[] data, final int offset, final int length) {
208         write(data, offset, length, false);
209     }
210 
211     /**
212      * Some output streams synchronize writes while others do not. Synchronizing here insures that
213      * log events won't be intertwined.
214      * @param bytes The serialized Log event.
215      * @param offset The offset into the byte array.
216      * @param length The number of bytes to write.
217      * @throws AppenderLoggingException if an error occurs.
218      */
219     protected void write(final byte[] bytes, final int offset, final int length) {
220         writeBytes(bytes, offset, length);
221     }
222 
223     /**
224      * Some output streams synchronize writes while others do not. Synchronizing here insures that
225      * log events won't be intertwined.
226      * @param bytes The serialized Log event.
227      * @param offset The offset into the byte array.
228      * @param length The number of bytes to write.
229      * @param immediateFlush flushes immediately after writing.
230      * @throws AppenderLoggingException if an error occurs.
231      */
232     protected synchronized void write(final byte[] bytes, final int offset, final int length, final boolean immediateFlush) {
233         if (immediateFlush && byteBuffer.position() == 0) {
234             writeToDestination(bytes, offset, length);
235             flushDestination();
236             return;
237         }
238         if (length >= byteBuffer.capacity()) {
239             // if request length exceeds buffer capacity, flush the buffer and write the data directly
240             flush();
241             writeToDestination(bytes, offset, length);
242         } else {
243             if (length > byteBuffer.remaining()) {
244                 flush();
245             }
246             byteBuffer.put(bytes, offset, length);
247         }
248         if (immediateFlush) {
249             flush();
250         }
251     }
252 
253     /**
254      * Writes the specified section of the specified byte array to the stream.
255      *
256      * @param bytes the array containing data
257      * @param offset from where to write
258      * @param length how many bytes to write
259      * @since 2.6
260      */
261     protected synchronized void writeToDestination(final byte[] bytes, final int offset, final int length) {
262         try {
263             getOutputStream().write(bytes, offset, length);
264         } catch (final IOException ex) {
265             throw new AppenderLoggingException("Error writing to stream " + getName(), ex);
266         }
267     }
268 
269     /**
270      * Calls {@code flush()} on the underlying output stream.
271      * @since 2.6
272      */
273     protected synchronized void flushDestination() {
274         final OutputStream stream = outputStream; // access volatile field only once per method
275         if (stream != null) {
276             try {
277                 stream.flush();
278             } catch (final IOException ex) {
279                 throw new AppenderLoggingException("Error flushing stream " + getName(), ex);
280             }
281         }
282     }
283 
284     /**
285      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
286      * {@link OutputStreamManager#write(byte[], int, int, boolean)} with the buffer contents.
287      * The underlying stream is not {@linkplain OutputStream#flush() flushed}.
288      *
289      * @see #flushDestination()
290      * @since 2.6
291      */
292     protected synchronized void flushBuffer(final ByteBuffer buf) {
293         ((Buffer) buf).flip();
294         if (buf.remaining() > 0) {
295             writeToDestination(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
296         }
297         buf.clear();
298     }
299 
300     /**
301      * Flushes any buffers.
302      */
303     public synchronized void flush() {
304         flushBuffer(byteBuffer);
305         flushDestination();
306     }
307 
308     protected synchronized boolean closeOutputStream() {
309         flush();
310         final OutputStream stream = outputStream; // access volatile field only once per method
311         if (stream == null || stream == System.out || stream == System.err) {
312             return true;
313         }
314         try {
315             stream.close();
316         } catch (final IOException ex) {
317             logError("Unable to close stream", ex);
318             return false;
319         }
320         return true;
321     }
322 
323     /**
324      * Returns this {@code ByteBufferDestination}'s buffer.
325      * @return the buffer
326      * @since 2.6
327      */
328     @Override
329     public ByteBuffer getByteBuffer() {
330         return byteBuffer;
331     }
332 
333     /**
334      * Drains the ByteBufferDestination's buffer into the destination. By default this calls
335      * {@link #flushBuffer(ByteBuffer)} with the specified buffer. Subclasses may override.
336      * <p>
337      * Do not call this method lightly! For some subclasses this is a very expensive operation. For example,
338      * {@link MemoryMappedFileManager} will assume this method was called because the end of the mapped region
339      * was reached during a text encoding operation and will {@linkplain MemoryMappedFileManager#remap() remap} its
340      * buffer.
341      * </p><p>
342      * To just flush the buffered contents to the underlying stream, call
343      * {@link #flushBuffer(ByteBuffer)} directly instead.
344      * </p>
345      *
346      * @param buf the buffer whose contents to write the the destination
347      * @return the specified buffer
348      * @since 2.6
349      */
350     @Override
351     public ByteBuffer drain(final ByteBuffer buf) {
352         flushBuffer(buf);
353         return buf;
354     }
355 
356     @Override
357     public void writeBytes(final ByteBuffer data) {
358         if (data.remaining() == 0) {
359           return;
360         }
361         synchronized (this) {
362           ByteBufferDestinationHelper.writeToUnsynchronized(data, this);
363         }
364     }
365 }