001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.io.File;
020    import java.util.concurrent.locks.Lock;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Expression;
025    import org.apache.camel.impl.DefaultExchange;
026    import org.apache.camel.impl.DefaultProducer;
027    import org.apache.camel.util.FileUtil;
028    import org.apache.camel.util.LRUCache;
029    import org.apache.camel.util.ObjectHelper;
030    import org.apache.camel.util.ServiceHelper;
031    import org.apache.camel.util.StringHelper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * Generic file producer
037     */
038    public class GenericFileProducer<T> extends DefaultProducer {
039        protected final Logger log = LoggerFactory.getLogger(getClass());
040        protected final GenericFileEndpoint<T> endpoint;
041        protected GenericFileOperations<T> operations;
042        // assume writing to 100 different files concurrently at most for the same file producer
043        private final LRUCache<String, Lock> locks = new LRUCache<String, Lock>(100);
044    
045        protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) {
046            super(endpoint);
047            this.endpoint = endpoint;
048            this.operations = operations;
049        }
050        
051        public String getFileSeparator() {
052            return File.separator;
053        }
054    
055        public String normalizePath(String name) {
056            return FileUtil.normalizePath(name);
057        }
058    
059        public void process(Exchange exchange) throws Exception {
060            // store any existing file header which we want to keep and propagate
061            final String existing = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
062    
063            // create the target file name
064            String target = createFileName(exchange);
065    
066            // use lock for same file name to avoid concurrent writes to the same file
067            // for example when you concurrently append to the same file
068            Lock lock;
069            synchronized (locks) {
070                lock = locks.get(target);
071                if (lock == null) {
072                    lock = new ReentrantLock();
073                    locks.put(target, lock);
074                }
075            }
076    
077            lock.lock();
078            try {
079                processExchange(exchange, target);
080            } finally {
081                // do not remove as the locks cache has an upper bound
082                // this ensure the locks is appropriate reused
083                lock.unlock();
084                // and remove the write file name header as we only want to use it once (by design)
085                exchange.getIn().removeHeader(Exchange.OVERRULE_FILE_NAME);
086                // and restore existing file name
087                exchange.getIn().setHeader(Exchange.FILE_NAME, existing);
088            }
089        }
090    
091        /**
092         * Sets the operations to be used.
093         * <p/>
094         * Can be used to set a fresh operations in case of recovery attempts
095         *
096         * @param operations the operations
097         */
098        public void setOperations(GenericFileOperations<T> operations) {
099            this.operations = operations;
100        }
101    
102        /**
103         * Perform the work to process the fileExchange
104         *
105         * @param exchange fileExchange
106         * @param target   the target filename
107         * @throws Exception is thrown if some error
108         */
109        protected void processExchange(Exchange exchange, String target) throws Exception {
110            log.trace("Processing file: {} for exchange: {}", target, exchange);
111    
112            try {
113                preWriteCheck();
114    
115                // should we write to a temporary name and then afterwards rename to real target
116                boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName());
117                String tempTarget = null;
118                // remember if target exists to avoid checking twice
119                Boolean targetExists = null;
120                if (writeAsTempAndRename) {
121                    // compute temporary name with the temp prefix
122                    tempTarget = createTempFileName(exchange, target);
123    
124                    log.trace("Writing using tempNameFile: {}", tempTarget);
125                   
126                    //if we should eager delete target file before deploying temporary file
127                    if (endpoint.getFileExist() != GenericFileExist.TryRename && endpoint.isEagerDeleteTargetFile()) {
128                        
129                        // cater for file exists option on the real target as
130                        // the file operations code will work on the temp file
131    
132                        // if an existing file already exists what should we do?
133                        targetExists = operations.existsFile(target);
134                        if (targetExists) {
135                            
136                            log.trace("EagerDeleteTargetFile, target exists");
137                            
138                            if (endpoint.getFileExist() == GenericFileExist.Ignore) {
139                                // ignore but indicate that the file was written
140                                log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
141                                return;
142                            } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
143                                throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
144                            } else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) {
145                                // we override the target so we do this by deleting it so the temp file can be renamed later
146                                // with success as the existing target file have been deleted
147                                log.trace("Eagerly deleting existing file: {}", target);
148                                if (!operations.deleteFile(target)) {
149                                    throw new GenericFileOperationFailedException("Cannot delete file: " + target);
150                                }
151                            }
152                        }
153                    }
154    
155                    // delete any pre existing temp file
156                    if (operations.existsFile(tempTarget)) {
157                        log.trace("Deleting existing temp file: {}", tempTarget);
158                        if (!operations.deleteFile(tempTarget)) {
159                            throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget);
160                        }
161                    }
162                }
163    
164                // write/upload the file
165                writeFile(exchange, tempTarget != null ? tempTarget : target);
166    
167                // if we did write to a temporary name then rename it to the real
168                // name after we have written the file
169                if (tempTarget != null) {
170                    // if we did not eager delete the target file
171                    if (endpoint.getFileExist() != GenericFileExist.TryRename && !endpoint.isEagerDeleteTargetFile()) {
172    
173                        // if an existing file already exists what should we do?
174                        targetExists = operations.existsFile(target);
175                        if (targetExists) {
176    
177                            log.trace("Not using EagerDeleteTargetFile, target exists");
178    
179                            if (endpoint.getFileExist() == GenericFileExist.Ignore) {
180                                // ignore but indicate that the file was written
181                                log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
182                                return;
183                            } else if (endpoint.getFileExist() == GenericFileExist.Fail) {
184                                throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
185                            } else if (endpoint.getFileExist() == GenericFileExist.Override) {
186                                // we override the target so we do this by deleting it so the temp file can be renamed later
187                                // with success as the existing target file have been deleted
188                                log.trace("Deleting existing file: {}", target);
189                                if (!operations.deleteFile(target)) {
190                                    throw new GenericFileOperationFailedException("Cannot delete file: " + target);
191                                }
192                            }
193                        }
194                    }
195    
196                    // now we are ready to rename the temp file to the target file
197                    log.trace("Renaming file: [{}] to: [{}]", tempTarget, target);
198                    boolean renamed = operations.renameFile(tempTarget, target);
199                    if (!renamed) {
200                        throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target);
201                    }
202                }
203    
204                // any done file to write?
205                if (endpoint.getDoneFileName() != null) {
206                    String doneFileName = endpoint.createDoneFileName(target);
207                    ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
208    
209                    // create empty exchange with empty body to write as the done file
210                    Exchange empty = new DefaultExchange(exchange);
211                    empty.getIn().setBody("");
212    
213                    log.trace("Writing done file: [{}]", doneFileName);
214                    // delete any existing done file
215                    if (operations.existsFile(doneFileName)) {
216                        if (!operations.deleteFile(doneFileName)) {
217                            throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName);
218                        }
219                    }
220                    writeFile(empty, doneFileName);
221                }
222    
223                // let's store the name we really used in the header, so end-users
224                // can retrieve it
225                exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target);
226            } catch (Exception e) {
227                handleFailedWrite(exchange, e);
228            }
229    
230            postWriteCheck();
231        }
232    
233        /**
234         * If we fail writing out a file, we will call this method. This hook is
235         * provided to disconnect from servers or clean up files we created (if needed).
236         */
237        public void handleFailedWrite(Exchange exchange, Exception exception) throws Exception {
238            throw exception;
239        }
240    
241        /**
242         * Perform any actions that need to occur before we write such as connecting to an FTP server etc.
243         */
244        public void preWriteCheck() throws Exception {
245            // nothing needed to check
246        }
247    
248        /**
249         * Perform any actions that need to occur after we are done such as disconnecting.
250         */
251        public void postWriteCheck() {
252            // nothing needed to check
253        }
254    
255        public void writeFile(Exchange exchange, String fileName) throws GenericFileOperationFailedException {
256            // build directory if auto create is enabled
257            if (endpoint.isAutoCreate()) {
258                // we must normalize it (to avoid having both \ and / in the name which confuses java.io.File)
259                String name = FileUtil.normalizePath(fileName);
260    
261                // use java.io.File to compute the file path
262                File file = new File(name);
263                String directory = file.getParent();
264                boolean absolute = FileUtil.isAbsolute(file);
265                if (directory != null) {
266                    if (!operations.buildDirectory(directory, absolute)) {
267                        log.debug("Cannot build directory [{}] (could be because of denied permissions)", directory);
268                    }
269                }
270            }
271    
272            // upload
273            if (log.isTraceEnabled()) {
274                log.trace("About to write [{}] to [{}] from exchange [{}]", new Object[]{fileName, getEndpoint(), exchange});
275            }
276    
277            boolean success = operations.storeFile(fileName, exchange);
278            if (!success) {
279                throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]");
280            }
281            log.debug("Wrote [{}] to [{}]", fileName, getEndpoint());
282        }
283    
284        public String createFileName(Exchange exchange) {
285            String answer;
286    
287            // overrule takes precedence
288            Object value;
289    
290            Object overrule = exchange.getIn().getHeader(Exchange.OVERRULE_FILE_NAME);
291            if (overrule != null) {
292                if (overrule instanceof Expression) {
293                    value = overrule;
294                } else {
295                    value = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, overrule);
296                }
297            } else {
298                value = exchange.getIn().getHeader(Exchange.FILE_NAME);
299            }
300    
301            // if we have an overrule then override the existing header to use the overrule computed name from this point forward
302            if (overrule != null) {
303                exchange.getIn().setHeader(Exchange.FILE_NAME, value);
304            }
305    
306            if (value != null && value instanceof String && StringHelper.hasStartToken((String) value, "simple")) {
307                log.warn("Simple expression: {} detected in header: {} of type String. This feature has been removed (see CAMEL-6748).", value, Exchange.FILE_NAME);
308            }
309    
310            // expression support
311            Expression expression = endpoint.getFileName();
312            if (value != null && value instanceof Expression) {
313                expression = (Expression) value;
314            }
315    
316            // evaluate the name as a String from the value
317            String name;
318            if (expression != null) {
319                log.trace("Filename evaluated as expression: {}", expression);
320                name = expression.evaluate(exchange, String.class);
321            } else {
322                name = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value);
323            }
324    
325            // flatten name
326            if (name != null && endpoint.isFlatten()) {
327                // check for both windows and unix separators
328                int pos = Math.max(name.lastIndexOf("/"), name.lastIndexOf("\\"));
329                if (pos != -1) {
330                    name = name.substring(pos + 1);
331                }
332            }
333    
334            // compute path by adding endpoint starting directory
335            String endpointPath = endpoint.getConfiguration().getDirectory();
336            String baseDir = "";
337            if (endpointPath.length() > 0) {
338                // Its a directory so we should use it as a base path for the filename
339                // If the path isn't empty, we need to add a trailing / if it isn't already there
340                baseDir = endpointPath;
341                boolean trailingSlash = endpointPath.endsWith("/") || endpointPath.endsWith("\\");
342                if (!trailingSlash) {
343                    baseDir += getFileSeparator();
344                }
345            }
346            if (name != null) {
347                answer = baseDir + name;
348            } else {
349                // use a generated filename if no name provided
350                answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn());
351            }
352    
353            if (endpoint.getConfiguration().needToNormalize()) {
354                // must normalize path to cater for Windows and other OS
355                answer = normalizePath(answer);
356            }
357    
358            return answer;
359        }
360    
361        public String createTempFileName(Exchange exchange, String fileName) {
362            String answer = fileName;
363    
364            String tempName;
365            if (exchange.getIn().getHeader(Exchange.FILE_NAME) == null) {
366                // its a generated filename then add it to header so we can evaluate the expression
367                exchange.getIn().setHeader(Exchange.FILE_NAME, FileUtil.stripPath(fileName));
368                tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
369                // and remove it again after evaluation
370                exchange.getIn().removeHeader(Exchange.FILE_NAME);
371            } else {
372                tempName = endpoint.getTempFileName().evaluate(exchange, String.class);
373            }
374    
375            // check for both windows and unix separators
376            int pos = Math.max(answer.lastIndexOf("/"), answer.lastIndexOf("\\"));
377            if (pos == -1) {
378                // no path so use temp name as calculated
379                answer = tempName;
380            } else {
381                // path should be prefixed before the temp name
382                StringBuilder sb = new StringBuilder(answer.substring(0, pos + 1));
383                sb.append(tempName);
384                answer = sb.toString();
385            }
386    
387            if (endpoint.getConfiguration().needToNormalize()) {
388                // must normalize path to cater for Windows and other OS
389                answer = normalizePath(answer);
390            }
391    
392            return answer;
393        }
394    
395        @Override
396        protected void doStart() throws Exception {
397            super.doStart();
398            ServiceHelper.startService(locks);
399        }
400    
401        @Override
402        protected void doStop() throws Exception {
403            ServiceHelper.stopService(locks);
404            super.doStop();
405        }
406    }