001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file.strategy;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.Channel;
023    import java.nio.channels.FileChannel;
024    import java.nio.channels.FileLock;
025    
026    import org.apache.camel.Exchange;
027    import org.apache.camel.LoggingLevel;
028    import org.apache.camel.component.file.GenericFile;
029    import org.apache.camel.component.file.GenericFileEndpoint;
030    import org.apache.camel.component.file.GenericFileOperations;
031    import org.apache.camel.util.CamelLogger;
032    import org.apache.camel.util.IOHelper;
033    import org.apache.camel.util.StopWatch;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
039     * After granting the read lock it is released, we just want to make sure that when we start
040     * consuming the file its not currently in progress of being written by third party.
041     */
042    public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy {
043        private static final Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class);
044        private long timeout;
045        private long checkInterval = 1000;
046        private FileLock lock;
047        private String lockFileName;
048        private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
049    
050        @Override
051        public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
052            // noop
053        }
054    
055        @Override
056        public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
057            // must call super
058            if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
059                return false;
060            }
061    
062            File target = new File(file.getAbsoluteFilePath());
063    
064            LOG.trace("Waiting for exclusive read lock to file: {}", target);
065    
066            FileChannel channel = null;
067            RandomAccessFile randomAccessFile = null;
068            try {
069                randomAccessFile = new RandomAccessFile(target, "rw");
070                // try to acquire rw lock on the file before we can consume it
071                channel = randomAccessFile.getChannel();
072    
073                boolean exclusive = false;
074                StopWatch watch = new StopWatch();
075    
076                while (!exclusive) {
077                    // timeout check
078                    if (timeout > 0) {
079                        long delta = watch.taken();
080                        if (delta > timeout) {
081                            CamelLogger.log(LOG, readLockLoggingLevel,
082                                    "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target);
083                            // we could not get the lock within the timeout period, so return false
084                            return false;
085                        }
086                    }
087    
088                    // get the lock using either try lock or not depending on if we are using timeout or not
089                    try {
090                        lock = timeout > 0 ? channel.tryLock() : channel.lock();
091                    } catch (IllegalStateException ex) {
092                        // Also catch the OverlappingFileLockException here. Do nothing here                    
093                    }
094                    if (lock != null) {
095                        LOG.trace("Acquired exclusive read lock: {} to file: {}", lock, target);
096                        lockFileName = target.getName();
097                        exclusive = true;
098                    } else {
099                        boolean interrupted = sleep();
100                        if (interrupted) {
101                            // we were interrupted while sleeping, we are likely being shutdown so return false
102                            return false;
103                        }
104                    }
105                }
106            } catch (IOException e) {
107                // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
108                // such as AntiVirus or MS Office that has special locks for it's supported files
109                if (timeout == 0) {
110                    // if not using timeout, then we cant retry, so rethrow
111                    throw e;
112                }
113                LOG.debug("Cannot acquire read lock. Will try again.", e);
114                boolean interrupted = sleep();
115                if (interrupted) {
116                    // we were interrupted while sleeping, we are likely being shutdown so return false
117                    return false;
118                }
119            } finally {
120                IOHelper.close(channel, "while acquiring exclusive read lock for file: " + lockFileName, LOG);
121                IOHelper.close(randomAccessFile, "while acquiring exclusive read lock for file: " + lockFileName, LOG);
122            }
123    
124            return true;
125        }
126    
127        @Override
128        public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
129                                             GenericFile<File> file, Exchange exchange) throws Exception {
130    
131            // must call super
132            super.releaseExclusiveReadLock(operations, file, exchange);
133    
134            if (lock != null) {
135                Channel channel = lock.channel();
136                try {
137                    lock.release();
138                } finally {
139                    // must close channel first
140                    IOHelper.close(channel, "while releasing exclusive read lock for file: " + lockFileName, LOG);
141                }
142            }
143        }
144    
145        private boolean sleep() {
146            LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval);
147            try {
148                Thread.sleep(checkInterval);
149                return false;
150            } catch (InterruptedException e) {
151                LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out");
152                return true;
153            }
154        }
155    
156        public long getTimeout() {
157            return timeout;
158        }
159    
160        @Override
161        public void setTimeout(long timeout) {
162            this.timeout = timeout;
163        }
164    
165        @Override
166        public void setCheckInterval(long checkInterval) {
167            this.checkInterval = checkInterval;
168        }
169    
170        @Override
171        public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
172            this.readLockLoggingLevel = readLockLoggingLevel;
173        }
174    
175    }