001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.io.File; 020 import java.util.concurrent.locks.Lock; 021 import java.util.concurrent.locks.ReentrantLock; 022 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Expression; 025 import org.apache.camel.impl.DefaultExchange; 026 import org.apache.camel.impl.DefaultProducer; 027 import org.apache.camel.util.FileUtil; 028 import org.apache.camel.util.LRUCache; 029 import org.apache.camel.util.ObjectHelper; 030 import org.apache.camel.util.ServiceHelper; 031 import org.apache.camel.util.StringHelper; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 /** 036 * Generic file producer 037 */ 038 public class GenericFileProducer<T> extends DefaultProducer { 039 protected final Logger log = LoggerFactory.getLogger(getClass()); 040 protected final GenericFileEndpoint<T> endpoint; 041 protected GenericFileOperations<T> operations; 042 // assume writing to 100 different files concurrently at most for the same file producer 043 private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100); 044 045 protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { 046 super(endpoint); 047 this.endpoint = endpoint; 048 this.operations = operations; 049 } 050 051 public String getFileSeparator() { 052 return File.separator; 053 } 054 055 public String normalizePath(String name) { 056 return FileUtil.normalizePath(name); 057 } 058 059 public void process(Exchange exchange) throws Exception { 060 // store any existing file header which we want to keep and propagate 061 final String existing = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 062 063 // create the target file name 064 String target = createFileName(exchange); 065 066 // use lock for same file name to avoid concurrent writes to the same file 067 // for example when you concurrently append to the same file 068 Lock lock; 069 synchronized (locks) { 070 lock = locks.get(target); 071 if (lock == null) { 072 lock = new ReentrantLock(); 073 locks.put(target, lock); 074 } 075 } 076 077 lock.lock(); 078 try { 079 processExchange(exchange, target); 080 } finally { 081 // do not remove as the locks cache has an upper bound 082 // this ensure the locks is appropriate reused 083 lock.unlock(); 084 // and remove the write file name header as we only want to use it once (by design) 085 exchange.getIn().removeHeader(Exchange.OVERRULE_FILE_NAME); 086 // and restore existing file name 087 exchange.getIn().setHeader(Exchange.FILE_NAME, existing); 088 } 089 } 090 091 /** 092 * Sets the operations to be used. 093 * <p/> 094 * Can be used to set a fresh operations in case of recovery attempts 095 * 096 * @param operations the operations 097 */ 098 public void setOperations(GenericFileOperations<T> operations) { 099 this.operations = operations; 100 } 101 102 /** 103 * Perform the work to process the fileExchange 104 * 105 * @param exchange fileExchange 106 * @param target the target filename 107 * @throws Exception is thrown if some error 108 */ 109 protected void processExchange(Exchange exchange, String target) throws Exception { 110 log.trace("Processing file: {} for exchange: {}", target, exchange); 111 112 try { 113 preWriteCheck(); 114 115 // should we write to a temporary name and then afterwards rename to real target 116 boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName()); 117 String tempTarget = null; 118 // remember if target exists to avoid checking twice 119 Boolean targetExists = null; 120 if (writeAsTempAndRename) { 121 // compute temporary name with the temp prefix 122 tempTarget = createTempFileName(exchange, target); 123 124 log.trace("Writing using tempNameFile: {}", tempTarget); 125 126 //if we should eager delete target file before deploying temporary file 127 if (endpoint.getFileExist() != GenericFileExist.TryRename && endpoint.isEagerDeleteTargetFile()) { 128 129 // cater for file exists option on the real target as 130 // the file operations code will work on the temp file 131 132 // if an existing file already exists what should we do? 133 targetExists = operations.existsFile(target); 134 if (targetExists) { 135 136 log.trace("EagerDeleteTargetFile, target exists"); 137 138 if (endpoint.getFileExist() == GenericFileExist.Ignore) { 139 // ignore but indicate that the file was written 140 log.trace("An existing file already exists: {}. Ignore and do not override it.", target); 141 return; 142 } else if (endpoint.getFileExist() == GenericFileExist.Fail) { 143 throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file."); 144 } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) { 145 // we override the target so we do this by deleting it so the temp file can be renamed later 146 // with success as the existing target file have been deleted 147 log.trace("Eagerly deleting existing file: {}", target); 148 if (!operations.deleteFile(target)) { 149 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 150 } 151 } 152 } 153 } 154 155 // delete any pre existing temp file 156 if (operations.existsFile(tempTarget)) { 157 log.trace("Deleting existing temp file: {}", tempTarget); 158 if (!operations.deleteFile(tempTarget)) { 159 throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget); 160 } 161 } 162 } 163 164 // write/upload the file 165 writeFile(exchange, tempTarget != null ? tempTarget : target); 166 167 // if we did write to a temporary name then rename it to the real 168 // name after we have written the file 169 if (tempTarget != null) { 170 // if we did not eager delete the target file 171 if (endpoint.getFileExist() != GenericFileExist.TryRename && !endpoint.isEagerDeleteTargetFile()) { 172 173 // if an existing file already exists what should we do? 174 targetExists = operations.existsFile(target); 175 if (targetExists) { 176 177 log.trace("Not using EagerDeleteTargetFile, target exists"); 178 179 if (endpoint.getFileExist() == GenericFileExist.Ignore) { 180 // ignore but indicate that the file was written 181 log.trace("An existing file already exists: {}. Ignore and do not override it.", target); 182 return; 183 } else if (endpoint.getFileExist() == GenericFileExist.Fail) { 184 throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file."); 185 } else if (endpoint.getFileExist() == GenericFileExist.Override) { 186 // we override the target so we do this by deleting it so the temp file can be renamed later 187 // with success as the existing target file have been deleted 188 log.trace("Deleting existing file: {}", target); 189 if (!operations.deleteFile(target)) { 190 throw new GenericFileOperationFailedException("Cannot delete file: " + target); 191 } 192 } 193 } 194 } 195 196 // now we are ready to rename the temp file to the target file 197 log.trace("Renaming file: [{}] to: [{}]", tempTarget, target); 198 boolean renamed = operations.renameFile(tempTarget, target); 199 if (!renamed) { 200 throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target); 201 } 202 } 203 204 // any done file to write? 205 if (endpoint.getDoneFileName() != null) { 206 String doneFileName = endpoint.createDoneFileName(target); 207 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 208 209 // create empty exchange with empty body to write as the done file 210 Exchange empty = new DefaultExchange(exchange); 211 empty.getIn().setBody(""); 212 213 log.trace("Writing done file: [{}]", doneFileName); 214 // delete any existing done file 215 if (operations.existsFile(doneFileName)) { 216 if (!operations.deleteFile(doneFileName)) { 217 throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName); 218 } 219 } 220 writeFile(empty, doneFileName); 221 } 222 223 // let's store the name we really used in the header, so end-users 224 // can retrieve it 225 exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); 226 } catch (Exception e) { 227 handleFailedWrite(exchange, e); 228 } 229 230 postWriteCheck(); 231 } 232 233 /** 234 * If we fail writing out a file, we will call this method. This hook is 235 * provided to disconnect from servers or clean up files we created (if needed). 236 */ 237 public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception { 238 throw exception; 239 } 240 241 /** 242 * Perform any actions that need to occur before we write such as connecting to an FTP server etc. 243 */ 244 public void preWriteCheck() throws Exception { 245 // nothing needed to check 246 } 247 248 /** 249 * Perform any actions that need to occur after we are done such as disconnecting. 250 */ 251 public void postWriteCheck() { 252 // nothing needed to check 253 } 254 255 public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException { 256 // build directory if auto create is enabled 257 if (endpoint.isAutoCreate()) { 258 // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File) 259 String name = FileUtil.normalizePath(fileName); 260 261 // use java.io.File to compute the file path 262 File file = new File(name); 263 String directory = file.getParent(); 264 boolean absolute = FileUtil.isAbsolute(file); 265 if (directory != null) { 266 if (!operations.buildDirectory(directory, absolute)) { 267 log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory); 268 } 269 } 270 } 271 272 // upload 273 if (log.isTraceEnabled()) { 274 log.trace("About to write [{}] to [{}] from exchange [{}]", new Object[]{fileName, getEndpoint(), exchange}); 275 } 276 277 boolean success = operations.storeFile(fileName, exchange); 278 if (!success) { 279 throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]"); 280 } 281 log.debug("Wrote [{}] to [{}]", fileName, getEndpoint()); 282 } 283 284 public String createFileName(Exchange exchange) { 285 String answer; 286 287 // overrule takes precedence 288 Object value; 289 290 Object overrule = exchange.getIn().getHeader(Exchange.OVERRULE_FILE_NAME); 291 if (overrule != null) { 292 if (overrule instanceof Expression) { 293 value = overrule; 294 } else { 295 value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule); 296 } 297 } else { 298 value = exchange.getIn().getHeader(Exchange.FILE_NAME); 299 } 300 301 // if we have an overrule then override the existing header to use the overrule computed name from this point forward 302 if (overrule != null) { 303 exchange.getIn().setHeader(Exchange.FILE_NAME, value); 304 } 305 306 if (value != null && value instanceof String && StringHelper.hasStartToken((String) value, "simple")) { 307 log.warn("Simple expression: {} detected in header: {} of type String. This feature has been removed (see CAMEL-6748).", value, Exchange.FILE_NAME); 308 } 309 310 // expression support 311 Expression expression = endpoint.getFileName(); 312 if (value != null && value instanceof Expression) { 313 expression = (Expression) value; 314 } 315 316 // evaluate the name as a String from the value 317 String name; 318 if (expression != null) { 319 log.trace("Filename evaluated as expression: {}", expression); 320 name = expression.evaluate(exchange, String.class); 321 } else { 322 name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); 323 } 324 325 // flatten name 326 if (name != null && endpoint.isFlatten()) { 327 // check for both windows and unix separators 328 int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\")); 329 if (pos != -1) { 330 name = name.substring(pos + 1); 331 } 332 } 333 334 // compute path by adding endpoint starting directory 335 String endpointPath = endpoint.getConfiguration().getDirectory(); 336 String baseDir = ""; 337 if (endpointPath.length() > 0) { 338 // Its a directory so we should use it as a base path for the filename 339 // If the path isn't empty, we need to add a trailing / if it isn't already there 340 baseDir = endpointPath; 341 boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\"); 342 if (!trailingSlash) { 343 baseDir += getFileSeparator(); 344 } 345 } 346 if (name != null) { 347 answer = baseDir + name; 348 } else { 349 // use a generated filename if no name provided 350 answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn()); 351 } 352 353 if (endpoint.getConfiguration().needToNormalize()) { 354 // must normalize path to cater for Windows and other OS 355 answer = normalizePath(answer); 356 } 357 358 return answer; 359 } 360 361 public String createTempFileName(Exchange exchange, String fileName) { 362 String answer = fileName; 363 364 String tempName; 365 if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) { 366 // its a generated filename then add it to header so we can evaluate the expression 367 exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName)); 368 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 369 // and remove it again after evaluation 370 exchange.getIn().removeHeader(Exchange.FILE_NAME); 371 } else { 372 tempName = endpoint.getTempFileName().evaluate(exchange, String.class); 373 } 374 375 // check for both windows and unix separators 376 int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\")); 377 if (pos == -1) { 378 // no path so use temp name as calculated 379 answer = tempName; 380 } else { 381 // path should be prefixed before the temp name 382 StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1)); 383 sb.append(tempName); 384 answer = sb.toString(); 385 } 386 387 if (endpoint.getConfiguration().needToNormalize()) { 388 // must normalize path to cater for Windows and other OS 389 answer = normalizePath(answer); 390 } 391 392 return answer; 393 } 394 395 @Override 396 protected void doStart() throws Exception { 397 super.doStart(); 398 ServiceHelper.startService(locks); 399 } 400 401 @Override 402 protected void doStop() throws Exception { 403 ServiceHelper.stopService(locks); 404 super.doStop(); 405 } 406 }