001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.impl.LoggingExceptionHandler; 021 import org.apache.camel.spi.ExceptionHandler; 022 import org.apache.camel.spi.Synchronization; 023 import org.apache.camel.util.ObjectHelper; 024 import org.slf4j.Logger; 025 import org.slf4j.LoggerFactory; 026 027 /** 028 * On completion strategy that performs the required work after the {@link Exchange} has been processed. 029 * <p/> 030 * The work is for example to move the processed file into a backup folder, delete the file or 031 * in case of processing failure do a rollback. 032 * 033 * @version 034 */ 035 public class GenericFileOnCompletion<T> implements Synchronization { 036 037 private final Logger log = LoggerFactory.getLogger(GenericFileOnCompletion.class); 038 private GenericFileEndpoint<T> endpoint; 039 private GenericFileOperations<T> operations; 040 private ExceptionHandler exceptionHandler; 041 private GenericFile<T> file; 042 private String absoluteFileName; 043 044 public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations, 045 GenericFile<T> file, String absoluteFileName) { 046 this.endpoint = endpoint; 047 this.operations = operations; 048 this.file = file; 049 this.absoluteFileName = absoluteFileName; 050 this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); 051 } 052 053 public void onComplete(Exchange exchange) { 054 onCompletion(exchange); 055 } 056 057 public void onFailure(Exchange exchange) { 058 onCompletion(exchange); 059 } 060 061 public ExceptionHandler getExceptionHandler() { 062 return exceptionHandler; 063 } 064 065 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 066 this.exceptionHandler = exceptionHandler; 067 } 068 069 protected void onCompletion(Exchange exchange) { 070 GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 071 072 log.debug("Done processing file: {} using exchange: {}", file, exchange); 073 074 // commit or rollback 075 boolean committed = false; 076 try { 077 boolean failed = exchange.isFailed(); 078 if (!failed) { 079 // commit the file strategy if there was no failure or already handled by the DeadLetterChannel 080 processStrategyCommit(processStrategy, exchange, file); 081 committed = true; 082 } 083 // if we failed, then it will be handled by the rollback in the finally block below 084 } finally { 085 if (!committed) { 086 processStrategyRollback(processStrategy, exchange, file); 087 } 088 089 // remove file from the in progress list as its no longer in progress 090 // use the original file name that was used to add it to the repository 091 // as the name can be different when using preMove option 092 endpoint.getInProgressRepository().remove(absoluteFileName); 093 } 094 } 095 096 /** 097 * Strategy when the file was processed and a commit should be executed. 098 * 099 * @param processStrategy the strategy to perform the commit 100 * @param exchange the exchange 101 * @param file the file processed 102 */ 103 protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy, 104 Exchange exchange, GenericFile<T> file) { 105 if (endpoint.isIdempotent()) { 106 107 // use absolute file path as default key, but evaluate if an expression key was configured 108 String key = absoluteFileName; 109 if (endpoint.getIdempotentKey() != null) { 110 Exchange dummy = endpoint.createExchange(file); 111 key = endpoint.getIdempotentKey().evaluate(dummy, String.class); 112 } 113 114 // only add to idempotent repository if we could process the file 115 if (key != null) { 116 endpoint.getIdempotentRepository().add(key); 117 } 118 } 119 120 handleDoneFile(exchange); 121 122 try { 123 log.trace("Commit file strategy: {} for file: {}", processStrategy, file); 124 processStrategy.commit(operations, endpoint, exchange, file); 125 } catch (Exception e) { 126 handleException("Error during commit", exchange, e); 127 } 128 } 129 130 /** 131 * Strategy when the file was not processed and a rollback should be executed. 132 * 133 * @param processStrategy the strategy to perform the commit 134 * @param exchange the exchange 135 * @param file the file processed 136 */ 137 protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy, 138 Exchange exchange, GenericFile<T> file) { 139 140 if (log.isWarnEnabled()) { 141 log.warn("Rollback file strategy: " + processStrategy + " for file: " + file); 142 } 143 144 // only delete done file if moveFailed option is enabled, as otherwise on rollback, 145 // we should leave the done file so we can retry 146 if (endpoint.getMoveFailed() != null) { 147 handleDoneFile(exchange); 148 } 149 150 try { 151 processStrategy.rollback(operations, endpoint, exchange, file); 152 } catch (Exception e) { 153 handleException("Error during rollback", exchange, e); 154 } 155 } 156 157 protected void handleDoneFile(Exchange exchange) { 158 // must be last in batch to delete the done file name 159 // delete done file if used (and not noop=true) 160 boolean complete = exchange.getProperty(Exchange.BATCH_COMPLETE, false, Boolean.class); 161 if (endpoint.getDoneFileName() != null && !endpoint.isNoop()) { 162 // done file must be in same path as the original input file 163 String doneFileName = endpoint.createDoneFileName(absoluteFileName); 164 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 165 // we should delete the dynamic done file 166 if (endpoint.getDoneFileName().indexOf("{file:name") > 0 || complete) { 167 try { 168 // delete done file 169 boolean deleted = operations.deleteFile(doneFileName); 170 log.trace("Done file: {} was deleted: {}", doneFileName, deleted); 171 if (!deleted) { 172 log.warn("Done file: " + doneFileName + " could not be deleted"); 173 } 174 } catch (Exception e) { 175 handleException("Error deleting done file: " + doneFileName, exchange, e); 176 } 177 } 178 } 179 } 180 181 protected void handleException(String message, Exchange exchange, Throwable t) { 182 Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t; 183 getExceptionHandler().handleException(message, exchange, newt); 184 } 185 186 @Override 187 public String toString() { 188 return "GenericFileOnCompletion"; 189 } 190 }