001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file.strategy; 018 019 import java.io.File; 020 021 import org.apache.camel.Exchange; 022 import org.apache.camel.LoggingLevel; 023 import org.apache.camel.component.file.GenericFile; 024 import org.apache.camel.component.file.GenericFileOperations; 025 import org.apache.camel.util.CamelLogger; 026 import org.apache.camel.util.StopWatch; 027 import org.slf4j.Logger; 028 import org.slf4j.LoggerFactory; 029 030 /** 031 * Acquires exclusive read lock to the given file by checking whether the file is being 032 * changed by scanning the file at different intervals (to detect changes). 033 */ 034 public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy { 035 private static final Logger LOG = LoggerFactory.getLogger(FileChangedExclusiveReadLockStrategy.class); 036 private long timeout; 037 private long checkInterval = 1000; 038 private long minLength = 1; 039 private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN; 040 041 @Override 042 public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 043 // must call super 044 if (!super.acquireExclusiveReadLock(operations, file, exchange)) { 045 return false; 046 } 047 048 File target = new File(file.getAbsoluteFilePath()); 049 boolean exclusive = false; 050 051 LOG.trace("Waiting for exclusive read lock to file: {}", file); 052 053 long lastModified = Long.MIN_VALUE; 054 long length = Long.MIN_VALUE; 055 StopWatch watch = new StopWatch(); 056 057 while (!exclusive) { 058 // timeout check 059 if (timeout > 0) { 060 long delta = watch.taken(); 061 if (delta > timeout) { 062 CamelLogger.log(LOG, readLockLoggingLevel, 063 "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); 064 // we could not get the lock within the timeout period, so return false 065 return false; 066 } 067 } 068 069 long newLastModified = target.lastModified(); 070 long newLength = target.length(); 071 072 LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified); 073 LOG.trace("Previous length: {}, new length: {}", length, newLength); 074 075 if (length >= minLength && (newLastModified == lastModified && newLength == length)) { 076 LOG.trace("Read lock acquired."); 077 exclusive = true; 078 } else { 079 // set new base file change information 080 lastModified = newLastModified; 081 length = newLength; 082 083 boolean interrupted = sleep(); 084 if (interrupted) { 085 // we were interrupted while sleeping, we are likely being shutdown so return false 086 return false; 087 } 088 } 089 } 090 091 return exclusive; 092 } 093 094 private boolean sleep() { 095 LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval); 096 try { 097 Thread.sleep(checkInterval); 098 return false; 099 } catch (InterruptedException e) { 100 LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out"); 101 return true; 102 } 103 } 104 105 public long getTimeout() { 106 return timeout; 107 } 108 109 @Override 110 public void setTimeout(long timeout) { 111 this.timeout = timeout; 112 } 113 114 public long getCheckInterval() { 115 return checkInterval; 116 } 117 118 @Override 119 public void setCheckInterval(long checkInterval) { 120 this.checkInterval = checkInterval; 121 } 122 123 @Override 124 public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { 125 this.readLockLoggingLevel = readLockLoggingLevel; 126 } 127 128 public long getMinLength() { 129 return minLength; 130 } 131 132 public void setMinLength(long minLength) { 133 this.minLength = minLength; 134 } 135 }