001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file.strategy; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.io.RandomAccessFile; 022 import java.nio.channels.Channel; 023 import java.nio.channels.FileChannel; 024 import java.nio.channels.FileLock; 025 026 import org.apache.camel.Exchange; 027 import org.apache.camel.LoggingLevel; 028 import org.apache.camel.component.file.GenericFile; 029 import org.apache.camel.component.file.GenericFileEndpoint; 030 import org.apache.camel.component.file.GenericFileOperations; 031 import org.apache.camel.util.CamelLogger; 032 import org.apache.camel.util.IOHelper; 033 import org.apache.camel.util.StopWatch; 034 import org.slf4j.Logger; 035 import org.slf4j.LoggerFactory; 036 037 /** 038 * Acquires exclusive read lock to the given file. Will wait until the lock is granted. 039 * After granting the read lock it is released, we just want to make sure that when we start 040 * consuming the file its not currently in progress of being written by third party. 041 */ 042 public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy { 043 private static final Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class); 044 private long timeout; 045 private long checkInterval = 1000; 046 private FileLock lock; 047 private String lockFileName; 048 private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; 049 050 @Override 051 public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) { 052 // noop 053 } 054 055 @Override 056 public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 057 // must call super 058 if (!super.acquireExclusiveReadLock(operations, file, exchange)) { 059 return false; 060 } 061 062 File target = new File(file.getAbsoluteFilePath()); 063 064 LOG.trace("Waiting for exclusive read lock to file: {}", target); 065 066 FileChannel channel = null; 067 RandomAccessFile randomAccessFile = null; 068 try { 069 randomAccessFile = new RandomAccessFile(target, "rw"); 070 // try to acquire rw lock on the file before we can consume it 071 channel = randomAccessFile.getChannel(); 072 073 boolean exclusive = false; 074 StopWatch watch = new StopWatch(); 075 076 while (!exclusive) { 077 // timeout check 078 if (timeout > 0) { 079 long delta = watch.taken(); 080 if (delta > timeout) { 081 CamelLogger.log(LOG, readLockLoggingLevel, 082 "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target); 083 // we could not get the lock within the timeout period, so return false 084 return false; 085 } 086 } 087 088 // get the lock using either try lock or not depending on if we are using timeout or not 089 try { 090 lock = timeout > 0 ? channel.tryLock() : channel.lock(); 091 } catch (IllegalStateException ex) { 092 // Also catch the OverlappingFileLockException here. Do nothing here 093 } 094 if (lock != null) { 095 LOG.trace("Acquired exclusive read lock: {} to file: {}", lock, target); 096 lockFileName = target.getName(); 097 exclusive = true; 098 } else { 099 boolean interrupted = sleep(); 100 if (interrupted) { 101 // we were interrupted while sleeping, we are likely being shutdown so return false 102 return false; 103 } 104 } 105 } 106 } catch (IOException e) { 107 // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file 108 // such as AntiVirus or MS Office that has special locks for it's supported files 109 if (timeout == 0) { 110 // if not using timeout, then we cant retry, so rethrow 111 throw e; 112 } 113 LOG.debug("Cannot acquire read lock. Will try again.", e); 114 boolean interrupted = sleep(); 115 if (interrupted) { 116 // we were interrupted while sleeping, we are likely being shutdown so return false 117 return false; 118 } 119 } finally { 120 IOHelper.close(channel, "while acquiring exclusive read lock for file: " + lockFileName, LOG); 121 IOHelper.close(randomAccessFile, "while acquiring exclusive read lock for file: " + lockFileName, LOG); 122 } 123 124 return true; 125 } 126 127 @Override 128 public void releaseExclusiveReadLock(GenericFileOperations<File> operations, 129 GenericFile<File> file, Exchange exchange) throws Exception { 130 131 // must call super 132 super.releaseExclusiveReadLock(operations, file, exchange); 133 134 if (lock != null) { 135 Channel channel = lock.channel(); 136 try { 137 lock.release(); 138 } finally { 139 // must close channel first 140 IOHelper.close(channel, "while releasing exclusive read lock for file: " + lockFileName, LOG); 141 } 142 } 143 } 144 145 private boolean sleep() { 146 LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval); 147 try { 148 Thread.sleep(checkInterval); 149 return false; 150 } catch (InterruptedException e) { 151 LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out"); 152 return true; 153 } 154 } 155 156 public long getTimeout() { 157 return timeout; 158 } 159 160 @Override 161 public void setTimeout(long timeout) { 162 this.timeout = timeout; 163 } 164 165 @Override 166 public void setCheckInterval(long checkInterval) { 167 this.checkInterval = checkInterval; 168 } 169 170 @Override 171 public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { 172 this.readLockLoggingLevel = readLockLoggingLevel; 173 } 174 175 }