001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.converter.stream; 018 019 import java.io.BufferedOutputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.File; 022 import java.io.FileNotFoundException; 023 import java.io.FileOutputStream; 024 import java.io.IOException; 025 import java.io.InputStream; 026 import java.io.OutputStream; 027 import java.security.GeneralSecurityException; 028 import javax.crypto.CipherOutputStream; 029 030 import org.apache.camel.Exchange; 031 import org.apache.camel.StreamCache; 032 import org.apache.camel.spi.StreamCachingStrategy; 033 import org.apache.camel.support.SynchronizationAdapter; 034 import org.apache.camel.util.FileUtil; 035 import org.apache.camel.util.ObjectHelper; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 /** 040 * This output stream will store the content into a File if the stream context size is exceed the 041 * THRESHOLD value. The default THRESHOLD value is {@link StreamCache#DEFAULT_SPOOL_THRESHOLD} bytes . 042 * <p/> 043 * The temp file will store in the temp directory, you can configure it by setting the TEMP_DIR property. 044 * If you don't set the TEMP_DIR property, it will choose the directory which is set by the 045 * system property of "java.io.tmpdir". 046 * <p/> 047 * You can get a cached input stream of this stream. The temp file which is created with this 048 * output stream will be deleted when you close this output stream or the all cached 049 * fileInputStream is closed after the exchange is completed. 050 */ 051 public class CachedOutputStream extends OutputStream { 052 @Deprecated 053 public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; 054 @Deprecated 055 public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize"; 056 @Deprecated 057 public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; 058 @Deprecated 059 public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation"; 060 private static final Logger LOG = LoggerFactory.getLogger(CachedOutputStream.class); 061 062 private final StreamCachingStrategy strategy; 063 private OutputStream currentStream; 064 private boolean inMemory = true; 065 private int totalLength; 066 private File tempFile; 067 private FileInputStreamCache fileInputStreamCache; 068 private CipherPair ciphers; 069 private final boolean closedOnCompletion; 070 071 public CachedOutputStream(Exchange exchange) { 072 this(exchange, true); 073 } 074 075 public CachedOutputStream(Exchange exchange, final boolean closedOnCompletion) { 076 this.closedOnCompletion = closedOnCompletion; 077 this.strategy = exchange.getContext().getStreamCachingStrategy(); 078 currentStream = new CachedByteArrayOutputStream(strategy.getBufferSize()); 079 080 // add on completion so we can cleanup after the exchange is done such as deleting temporary files 081 exchange.addOnCompletion(new SynchronizationAdapter() { 082 @Override 083 public void onDone(Exchange exchange) { 084 try { 085 if (fileInputStreamCache != null) { 086 fileInputStreamCache.close(); 087 } 088 if (closedOnCompletion) { 089 close(); 090 try { 091 cleanUpTempFile(); 092 } catch (Exception e) { 093 LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); 094 } 095 } 096 } catch (Exception e) { 097 LOG.warn("Error closing streams. This exception will be ignored.", e); 098 } 099 } 100 101 @Override 102 public String toString() { 103 return "OnCompletion[CachedOutputStream]"; 104 } 105 }); 106 } 107 108 public void flush() throws IOException { 109 currentStream.flush(); 110 } 111 112 public void close() throws IOException { 113 currentStream.close(); 114 // need to clean up the temp file this time 115 if (!closedOnCompletion) { 116 try { 117 cleanUpTempFile(); 118 } catch (Exception e) { 119 LOG.warn("Error deleting temporary cache file: " + tempFile + ". This exception will be ignored.", e); 120 } 121 } 122 } 123 124 public boolean equals(Object obj) { 125 return currentStream.equals(obj); 126 } 127 128 public int hashCode() { 129 return currentStream.hashCode(); 130 } 131 132 public OutputStream getCurrentStream() { 133 return currentStream; 134 } 135 136 public String toString() { 137 return "CachedOutputStream[size: " + totalLength + "]"; 138 } 139 140 public void write(byte[] b, int off, int len) throws IOException { 141 this.totalLength += len; 142 if (inMemory && currentStream instanceof ByteArrayOutputStream && strategy.shouldSpoolCache(totalLength)) { 143 pageToFileStream(); 144 } 145 currentStream.write(b, off, len); 146 } 147 148 public void write(byte[] b) throws IOException { 149 this.totalLength += b.length; 150 if (inMemory && currentStream instanceof ByteArrayOutputStream && strategy.shouldSpoolCache(totalLength)) { 151 pageToFileStream(); 152 } 153 currentStream.write(b); 154 } 155 156 public void write(int b) throws IOException { 157 this.totalLength++; 158 if (inMemory && currentStream instanceof ByteArrayOutputStream && strategy.shouldSpoolCache(totalLength)) { 159 pageToFileStream(); 160 } 161 currentStream.write(b); 162 } 163 164 public InputStream getInputStream() throws IOException { 165 flush(); 166 167 if (inMemory) { 168 if (currentStream instanceof CachedByteArrayOutputStream) { 169 return ((CachedByteArrayOutputStream) currentStream).newInputStreamCache(); 170 } else { 171 throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName()); 172 } 173 } else { 174 try { 175 if (fileInputStreamCache == null) { 176 fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers); 177 } 178 return fileInputStreamCache; 179 } catch (FileNotFoundException e) { 180 throw new IOException("Cached file " + tempFile + " not found", e); 181 } 182 } 183 } 184 185 public InputStream getWrappedInputStream() throws IOException { 186 // The WrappedInputStream will close the CachedOutputStream when it is closed 187 return new WrappedInputStream(this, getInputStream()); 188 } 189 190 /** 191 * @deprecated use {@link #newStreamCache()} 192 */ 193 @Deprecated 194 public StreamCache getStreamCache() throws IOException { 195 return newStreamCache(); 196 } 197 198 /** 199 * Creates a new {@link StreamCache} from the data cached in this {@link OutputStream}. 200 */ 201 public StreamCache newStreamCache() throws IOException { 202 flush(); 203 204 if (inMemory) { 205 if (currentStream instanceof CachedByteArrayOutputStream) { 206 return ((CachedByteArrayOutputStream) currentStream).newInputStreamCache(); 207 } else { 208 throw new IllegalStateException("CurrentStream should be an instance of CachedByteArrayOutputStream but is: " + currentStream.getClass().getName()); 209 } 210 } else { 211 try { 212 if (fileInputStreamCache == null) { 213 fileInputStreamCache = new FileInputStreamCache(tempFile, ciphers); 214 } 215 return fileInputStreamCache; 216 } catch (FileNotFoundException e) { 217 throw new IOException("Cached file " + tempFile + " not found", e); 218 } 219 } 220 } 221 222 private void cleanUpTempFile() { 223 // cleanup temporary file 224 if (tempFile != null) { 225 FileUtil.deleteFile(tempFile); 226 tempFile = null; 227 } 228 } 229 230 private void pageToFileStream() throws IOException { 231 flush(); 232 233 ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; 234 tempFile = FileUtil.createTempFile("cos", ".tmp", strategy.getSpoolDirectory()); 235 236 LOG.trace("Creating temporary stream cache file: {}", tempFile); 237 238 try { 239 currentStream = createOutputStream(tempFile); 240 bout.writeTo(currentStream); 241 } finally { 242 // ensure flag is flipped to file based 243 inMemory = false; 244 } 245 } 246 247 /** 248 * @deprecated use {@link #getStrategyBufferSize()} 249 */ 250 @Deprecated 251 public int getBufferSize() { 252 return getStrategyBufferSize(); 253 } 254 255 public int getStrategyBufferSize() { 256 return strategy.getBufferSize(); 257 } 258 259 // This class will close the CachedOutputStream when it is closed 260 private static class WrappedInputStream extends InputStream { 261 private CachedOutputStream cachedOutputStream; 262 private InputStream inputStream; 263 264 WrappedInputStream(CachedOutputStream cos, InputStream is) { 265 cachedOutputStream = cos; 266 inputStream = is; 267 } 268 269 @Override 270 public int read() throws IOException { 271 return inputStream.read(); 272 } 273 274 @Override 275 public int available() throws IOException { 276 return inputStream.available(); 277 } 278 279 @Override 280 public void reset() throws IOException { 281 inputStream.reset(); 282 } 283 284 @Override 285 public void close() throws IOException { 286 inputStream.close(); 287 cachedOutputStream.close(); 288 } 289 } 290 291 private OutputStream createOutputStream(File file) throws IOException { 292 OutputStream out = new BufferedOutputStream(new FileOutputStream(file)); 293 if (ObjectHelper.isNotEmpty(strategy.getSpoolChiper())) { 294 try { 295 if (ciphers == null) { 296 ciphers = new CipherPair(strategy.getSpoolChiper()); 297 } 298 } catch (GeneralSecurityException e) { 299 throw new IOException(e.getMessage(), e); 300 } 301 out = new CipherOutputStream(out, ciphers.getEncryptor()) { 302 boolean closed; 303 public void close() throws IOException { 304 if (!closed) { 305 super.close(); 306 closed = true; 307 } 308 } 309 }; 310 } 311 return out; 312 } 313 }