001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.impl.LoggingExceptionHandler;
021    import org.apache.camel.spi.ExceptionHandler;
022    import org.apache.camel.spi.Synchronization;
023    import org.apache.camel.util.ObjectHelper;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    /**
028     * On completion strategy that performs the required work after the {@link Exchange} has been processed.
029     * <p/>
030     * The work is for example to move the processed file into a backup folder, delete the file or
031     * in case of processing failure do a rollback. 
032     *
033     * @version 
034     */
035    public class GenericFileOnCompletion<T> implements Synchronization {
036    
037        private final Logger log = LoggerFactory.getLogger(GenericFileOnCompletion.class);
038        private GenericFileEndpoint<T> endpoint;
039        private GenericFileOperations<T> operations;
040        private ExceptionHandler exceptionHandler;
041        private GenericFile<T> file;
042        private String absoluteFileName;
043    
044        public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations,
045                                       GenericFile<T> file, String absoluteFileName) {
046            this.endpoint = endpoint;
047            this.operations = operations;
048            this.file = file;
049            this.absoluteFileName = absoluteFileName;
050            this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
051        }
052    
053        public void onComplete(Exchange exchange) {
054            onCompletion(exchange);
055        }
056    
057        public void onFailure(Exchange exchange) {
058            onCompletion(exchange);
059        }
060    
061        public ExceptionHandler getExceptionHandler() {
062            return exceptionHandler;
063        }
064    
065        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
066            this.exceptionHandler = exceptionHandler;
067        }
068    
069        protected void onCompletion(Exchange exchange) {
070            GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
071    
072            log.debug("Done processing file: {} using exchange: {}", file, exchange);
073    
074            // commit or rollback
075            boolean committed = false;
076            try {
077                boolean failed = exchange.isFailed();
078                if (!failed) {
079                    // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
080                    processStrategyCommit(processStrategy, exchange, file);
081                    committed = true;
082                }
083                // if we failed, then it will be handled by the rollback in the finally block below
084            } finally {
085                if (!committed) {
086                    processStrategyRollback(processStrategy, exchange, file);
087                }
088    
089                // remove file from the in progress list as its no longer in progress
090                // use the original file name that was used to add it to the repository
091                // as the name can be different when using preMove option
092                endpoint.getInProgressRepository().remove(absoluteFileName);
093            }
094        }
095    
096        /**
097         * Strategy when the file was processed and a commit should be executed.
098         *
099         * @param processStrategy the strategy to perform the commit
100         * @param exchange        the exchange
101         * @param file            the file processed
102         */
103        protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy,
104                                             Exchange exchange, GenericFile<T> file) {
105            if (endpoint.isIdempotent()) {
106    
107                // use absolute file path as default key, but evaluate if an expression key was configured
108                String key = absoluteFileName;
109                if (endpoint.getIdempotentKey() != null) {
110                    Exchange dummy = endpoint.createExchange(file);
111                    key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
112                }
113    
114                // only add to idempotent repository if we could process the file
115                if (key != null) {
116                    endpoint.getIdempotentRepository().add(key);
117                }
118            }
119    
120            handleDoneFile(exchange);
121    
122            try {
123                log.trace("Commit file strategy: {} for file: {}", processStrategy, file);
124                processStrategy.commit(operations, endpoint, exchange, file);
125            } catch (Exception e) {
126                handleException("Error during commit", exchange, e);
127            }
128        }
129    
130        /**
131         * Strategy when the file was not processed and a rollback should be executed.
132         *
133         * @param processStrategy the strategy to perform the commit
134         * @param exchange        the exchange
135         * @param file            the file processed
136         */
137        protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy,
138                                               Exchange exchange, GenericFile<T> file) {
139    
140            if (log.isWarnEnabled()) {
141                log.warn("Rollback file strategy: " + processStrategy + " for file: " + file);
142            }
143    
144            // only delete done file if moveFailed option is enabled, as otherwise on rollback,
145            // we should leave the done file so we can retry
146            if (endpoint.getMoveFailed() != null) {
147                handleDoneFile(exchange);
148            }
149    
150            try {
151                processStrategy.rollback(operations, endpoint, exchange, file);
152            } catch (Exception e) {
153                handleException("Error during rollback", exchange, e);
154            }
155        }
156    
157        protected void handleDoneFile(Exchange exchange) {
158            // must be last in batch to delete the done file name
159            // delete done file if used (and not noop=true)
160            boolean complete = exchange.getProperty(Exchange.BATCH_COMPLETE, false, Boolean.class);
161            if (endpoint.getDoneFileName() != null && !endpoint.isNoop()) {
162                // done file must be in same path as the original input file
163                String doneFileName = endpoint.createDoneFileName(absoluteFileName);
164                ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
165                // we should delete the dynamic done file
166                if (endpoint.getDoneFileName().indexOf("{file:name") > 0 || complete) {
167                    try {
168                        // delete done file
169                        boolean deleted = operations.deleteFile(doneFileName);
170                        log.trace("Done file: {} was deleted: {}", doneFileName, deleted);
171                        if (!deleted) {
172                            log.warn("Done file: " + doneFileName + " could not be deleted");
173                        }
174                    } catch (Exception e) {
175                        handleException("Error deleting done file: " + doneFileName, exchange, e);
176                    }
177                }
178            }
179        }
180    
181        protected void handleException(String message, Exchange exchange, Throwable t) {
182            Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t;
183            getExceptionHandler().handleException(message, exchange, newt);
184        }
185    
186        @Override
187        public String toString() {
188            return "GenericFileOnCompletion";
189        }
190    }