001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.converter.stream;
018    
019    import java.io.BufferedOutputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.File;
022    import java.io.FileNotFoundException;
023    import java.io.FileOutputStream;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.OutputStream;
027    import java.security.GeneralSecurityException;
028    import javax.crypto.CipherOutputStream;
029    
030    import org.apache.camel.Exchange;
031    import org.apache.camel.StreamCache;
032    import org.apache.camel.spi.StreamCachingStrategy;
033    import org.apache.camel.support.SynchronizationAdapter;
034    import org.apache.camel.util.FileUtil;
035    import org.apache.camel.util.ObjectHelper;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    /**
040     * This output stream will store the content into a File if the stream context size is exceed the
041     * THRESHOLD value. The default THRESHOLD value is {@link StreamCache#DEFAULT_SPOOL_THRESHOLD} bytes .
042     * <p/>
043     * The temp file will store in the temp directory, you can configure it by setting the TEMP_DIR property.
044     * If you don't set the TEMP_DIR property, it will choose the directory which is set by the
045     * system property of "java.io.tmpdir".
046     * <p/>
047     * You can get a cached input stream of this stream. The temp file which is created with this 
048     * output stream will be deleted when you close this output stream or the all cached 
049     * fileInputStream is closed after the exchange is completed.
050     */
051    public class CachedOutputStream extends OutputStream {
052        @Deprecated
053        public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
054        @Deprecated
055        public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
056        @Deprecated
057        public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
058        @Deprecated
059        public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
060        private static final Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class);
061    
062        private final StreamCachingStrategy strategy;
063        private OutputStream currentStream;
064        private boolean inMemory = true;
065        private int totalLength;
066        private File tempFile;
067        private FileInputStreamCache fileInputStreamCache;
068        private CipherPair ciphers;
069        private final boolean closedOnCompletion;
070    
071        public CachedOutputStream(Exchange exchange) {
072            this(exchange, true);
073        }
074    
075        public CachedOutputStream(Exchange exchange, final boolean closedOnCompletion) {
076            this.closedOnCompletion = closedOnCompletion;
077            this.strategy = exchange.getContext().getStreamCachingStrategy();
078            currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize());
079            
080            // add on completion so we can cleanup after the exchange is done such as deleting temporary files
081            exchange.addOnCompletion(new SynchronizationAdapter() {
082                @Override
083                public void onDone(Exchange exchange) {
084                    try {
085                        if (fileInputStreamCache != null) {
086                            fileInputStreamCache.close();
087                        }
088                        if (closedOnCompletion) {
089                            close();
090                            try {
091                                cleanUpTempFile();
092                            } catch (Exception e) {
093                                LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
094                            }
095                        }
096                    } catch (Exception e) {
097                        LOG.warn("Error closing streams. This exception will be ignored.", e);
098                    }
099                }
100        
101                @Override
102                public String toString() {
103                    return "OnCompletion[CachedOutputStream]";
104                }
105            });
106        }
107    
108        public void flush() throws IOException {
109            currentStream.flush();       
110        }
111    
112        public void close() throws IOException {
113            currentStream.close();
114            // need to clean up the temp file this time
115            if (!closedOnCompletion) {
116                try {
117                    cleanUpTempFile();
118                } catch (Exception e) {
119                    LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e);
120                }
121            }
122        }
123    
124        public boolean equals(Object obj) {
125            return currentStream.equals(obj);
126        }
127    
128        public int hashCode() {
129            return currentStream.hashCode();
130        }
131    
132        public OutputStream getCurrentStream() {
133            return currentStream;
134        }
135    
136        public String toString() {
137            return "CachedOutputStream[size: " + totalLength + "]";
138        }
139    
140        public void write(byte[] b, int off, int len) throws IOException {
141            this.totalLength += len;
142            if (inMemory && currentStream instanceof ByteArrayOutputStream && strategy.shouldSpoolCache(totalLength)) {
143                pageToFileStream();
144            }
145            currentStream.write(b, off, len);
146        }
147    
148        public void write(byte[] b) throws IOException {
149            this.totalLength += b.length;
150            if (inMemory && currentStream instanceof ByteArrayOutputStream && strategy.shouldSpoolCache(totalLength)) {
151                pageToFileStream();
152            }
153            currentStream.write(b);
154        }
155    
156        public void write(int b) throws IOException {
157            this.totalLength++;
158            if (inMemory && currentStream instanceof ByteArrayOutputStream && strategy.shouldSpoolCache(totalLength)) {
159                pageToFileStream();
160            }
161            currentStream.write(b);
162        }
163    
164        public InputStream getInputStream() throws IOException {
165            flush();
166    
167            if (inMemory) {
168                if (currentStream instanceof CachedByteArrayOutputStream) {
169                    return ((CachedByteArrayOutputStream) currentStream).newInputStreamCache();
170                } else {
171                    throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName());
172                }
173            } else {
174                try {
175                    if (fileInputStreamCache == null) {
176                        fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers);
177                    }
178                    return fileInputStreamCache;
179                } catch (FileNotFoundException e) {
180                    throw new IOException("Cached file " + tempFile + " not found", e);
181                }
182            }
183        }    
184    
185        public InputStream getWrappedInputStream() throws IOException {
186            // The WrappedInputStream will close the CachedOutputStream when it is closed
187            return new WrappedInputStream(this, getInputStream());
188        }
189    
190        /**
191         * @deprecated  use {@link #newStreamCache()}
192         */
193        @Deprecated
194        public StreamCache getStreamCache() throws IOException {
195            return newStreamCache();
196        }
197    
198        /**
199         * Creates a new {@link StreamCache} from the data cached in this {@link OutputStream}.
200         */
201        public StreamCache newStreamCache() throws IOException {
202            flush();
203    
204            if (inMemory) {
205                if (currentStream instanceof CachedByteArrayOutputStream) {
206                    return ((CachedByteArrayOutputStream) currentStream).newInputStreamCache();
207                } else {
208                    throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName());
209                }
210            } else {
211                try {
212                    if (fileInputStreamCache == null) {
213                        fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers);
214                    }
215                    return fileInputStreamCache;
216                } catch (FileNotFoundException e) {
217                    throw new IOException("Cached file " + tempFile + " not found", e);
218                }
219            }
220        }
221    
222        private void cleanUpTempFile() {
223            // cleanup temporary file
224            if (tempFile != null) {
225                FileUtil.deleteFile(tempFile);
226                tempFile = null;
227            }
228        }
229    
230        private void pageToFileStream() throws IOException {
231            flush();
232    
233            ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
234            tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory());
235    
236            LOG.trace("Creating temporary stream cache file: {}", tempFile);
237    
238            try {
239                currentStream = createOutputStream(tempFile);
240                bout.writeTo(currentStream);
241            } finally {
242                // ensure flag is flipped to file based
243                inMemory = false;
244            }
245        }
246    
247        /**
248         * @deprecated  use {@link #getStrategyBufferSize()}
249         */
250        @Deprecated
251        public int getBufferSize() {
252            return getStrategyBufferSize();
253        }
254        
255        public int getStrategyBufferSize() {
256            return strategy.getBufferSize();
257        }
258    
259        // This class will close the CachedOutputStream when it is closed
260        private static class WrappedInputStream extends InputStream {
261            private CachedOutputStream cachedOutputStream;
262            private InputStream inputStream;
263            
264            WrappedInputStream(CachedOutputStream cos, InputStream is) {
265                cachedOutputStream = cos;
266                inputStream = is;
267            }
268            
269            @Override
270            public int read() throws IOException {
271                return inputStream.read();
272            }
273            
274            @Override
275            public int available() throws IOException {
276                return inputStream.available();
277            }
278            
279            @Override
280            public void reset() throws IOException {
281                inputStream.reset();
282            }
283            
284            @Override
285            public void close() throws IOException {
286                inputStream.close();
287                cachedOutputStream.close();
288            }
289        }
290    
291        private OutputStream createOutputStream(File file) throws IOException {
292            OutputStream out = new BufferedOutputStream(new FileOutputStream(file));
293            if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) {
294                try {
295                    if (ciphers == null) {
296                        ciphers = new CipherPair(strategy.getSpoolChiper());
297                    }
298                } catch (GeneralSecurityException e) {
299                    throw new IOException(e.getMessage(), e);
300                }
301                out = new CipherOutputStream(out, ciphers.getEncryptor()) {
302                    boolean closed;
303                    public void close() throws IOException {
304                        if (!closed) {
305                            super.close();
306                            closed = true;
307                        }
308                    }
309                };
310            }
311            return out;
312        }
313    }