001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Deque; 022 import java.util.LinkedList; 023 import java.util.List; 024 import java.util.Queue; 025 026 import org.apache.camel.AsyncCallback; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.Processor; 029 import org.apache.camel.ShutdownRunningTask; 030 import org.apache.camel.impl.ScheduledBatchPollingConsumer; 031 import org.apache.camel.spi.UriParam; 032 import org.apache.camel.util.CastUtils; 033 import org.apache.camel.util.ObjectHelper; 034 import org.apache.camel.util.StopWatch; 035 import org.apache.camel.util.TimeUtils; 036 import org.slf4j.Logger; 037 import org.slf4j.LoggerFactory; 038 039 /** 040 * Base class for file consumers. 041 */ 042 public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer { 043 protected final Logger log = LoggerFactory.getLogger(getClass()); 044 protected GenericFileEndpoint<T> endpoint; 045 protected GenericFileOperations<T> operations; 046 protected volatile boolean loggedIn; 047 protected String fileExpressionResult; 048 protected volatile ShutdownRunningTask shutdownRunningTask; 049 protected volatile int pendingExchanges; 050 protected Processor customProcessor; 051 @UriParam 052 protected boolean eagerLimitMaxMessagesPerPoll = true; 053 protected volatile boolean prepareOnStartup; 054 055 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { 056 super(endpoint, processor); 057 this.endpoint = endpoint; 058 this.operations = operations; 059 } 060 061 public Processor getCustomProcessor() { 062 return customProcessor; 063 } 064 065 /** 066 * Use a custom processor to process the exchange. 067 * <p/> 068 * Only set this if you need to do custom processing, instead of the regular processing. 069 * <p/> 070 * This is for example used to browse file endpoints by leveraging the file consumer to poll 071 * the directory to gather the list of exchanges. But to avoid processing the files regularly 072 * we can use a custom processor. 073 * 074 * @param processor a custom processor 075 */ 076 public void setCustomProcessor(Processor processor) { 077 this.customProcessor = processor; 078 } 079 080 public boolean isEagerLimitMaxMessagesPerPoll() { 081 return eagerLimitMaxMessagesPerPoll; 082 } 083 084 public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) { 085 this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll; 086 } 087 088 /** 089 * Poll for files 090 */ 091 protected int poll() throws Exception { 092 // must prepare on startup the very first time 093 if (!prepareOnStartup) { 094 // prepare on startup 095 endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint); 096 prepareOnStartup = true; 097 } 098 099 // must reset for each poll 100 fileExpressionResult = null; 101 shutdownRunningTask = null; 102 pendingExchanges = 0; 103 104 // before we poll is there anything we need to check? 105 // such as are we connected to the FTP Server still? 106 if (!prePollCheck()) { 107 log.debug("Skipping poll as pre poll check returned false"); 108 return 0; 109 } 110 111 // gather list of files to process 112 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); 113 String name = endpoint.getConfiguration().getDirectory(); 114 115 // time how long time it takes to poll 116 StopWatch stop = new StopWatch(); 117 boolean limitHit = !pollDirectory(name, files, 0); 118 long delta = stop.stop(); 119 if (log.isDebugEnabled()) { 120 log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name); 121 } 122 123 // log if we hit the limit 124 if (limitHit) { 125 log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll); 126 } 127 128 // sort files using file comparator if provided 129 if (endpoint.getSorter() != null) { 130 Collections.sort(files, endpoint.getSorter()); 131 } 132 133 // sort using build in sorters so we can use expressions 134 // use a linked list so we can dequeue the exchanges 135 LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); 136 for (GenericFile<T> file : files) { 137 Exchange exchange = endpoint.createExchange(file); 138 endpoint.configureExchange(exchange); 139 endpoint.configureMessage(file, exchange.getIn()); 140 exchanges.add(exchange); 141 } 142 // sort files using exchange comparator if provided 143 if (endpoint.getSortBy() != null) { 144 Collections.sort(exchanges, endpoint.getSortBy()); 145 } 146 147 // use a queue for the exchanges 148 Deque<Exchange> q = exchanges; 149 150 // we are not eager limiting, but we have configured a limit, so cut the list of files 151 if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) { 152 if (files.size() > maxMessagesPerPoll) { 153 log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll); 154 // must first remove excessive files from the in progress repository 155 removeExcessiveInProgressFiles(q, maxMessagesPerPoll); 156 } 157 } 158 159 // consume files one by one 160 int total = exchanges.size(); 161 if (total > 0) { 162 log.debug("Total {} files to consume", total); 163 } 164 165 int polledMessages = processBatch(CastUtils.cast(q)); 166 167 postPollCheck(); 168 169 return polledMessages; 170 } 171 172 public int processBatch(Queue<Object> exchanges) { 173 int total = exchanges.size(); 174 int answer = total; 175 176 // limit if needed 177 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { 178 log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", maxMessagesPerPoll, total); 179 total = maxMessagesPerPoll; 180 } 181 182 for (int index = 0; index < total && isBatchAllowed(); index++) { 183 // only loop if we are started (allowed to run) 184 // use poll to remove the head so it does not consume memory even after we have processed it 185 Exchange exchange = (Exchange) exchanges.poll(); 186 // add current index and total as properties 187 exchange.setProperty(Exchange.BATCH_INDEX, index); 188 exchange.setProperty(Exchange.BATCH_SIZE, total); 189 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); 190 191 // update pending number of exchanges 192 pendingExchanges = total - index - 1; 193 194 // process the current exchange 195 boolean started; 196 if (customProcessor != null) { 197 // use a custom processor 198 started = customProcessExchange(exchange, customProcessor); 199 } else { 200 // process the exchange regular 201 started = processExchange(exchange); 202 } 203 204 // if we did not start process the file then decrement the counter 205 if (!started) { 206 answer--; 207 } 208 } 209 210 // drain any in progress files as we are done with this batch 211 removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); 212 213 return answer; 214 } 215 216 /** 217 * Drain any in progress files as we are done with this batch 218 * 219 * @param exchanges the exchanges 220 * @param limit the limit 221 */ 222 protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) { 223 // remove the file from the in progress list in case the batch was limited by max messages per poll 224 while (exchanges.size() > limit) { 225 // must remove last 226 Exchange exchange = exchanges.removeLast(); 227 GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); 228 String key = file.getAbsoluteFilePath(); 229 endpoint.getInProgressRepository().remove(key); 230 } 231 } 232 233 /** 234 * Whether or not we can continue polling for more files 235 * 236 * @param fileList the current list of gathered files 237 * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit 238 */ 239 public boolean canPollMoreFiles(List<?> fileList) { 240 // at this point we should not limit if we are not eager 241 if (!eagerLimitMaxMessagesPerPoll) { 242 return true; 243 } 244 245 if (maxMessagesPerPoll <= 0) { 246 // no limitation 247 return true; 248 } 249 250 // then only poll if we haven't reached the max limit 251 return fileList.size() < maxMessagesPerPoll; 252 } 253 254 /** 255 * Override if required. Perform some checks (and perhaps actions) before we poll. 256 * 257 * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll. 258 */ 259 protected boolean prePollCheck() throws Exception { 260 return true; 261 } 262 263 /** 264 * Override if required. Perform some checks (and perhaps actions) after we have polled. 265 */ 266 protected void postPollCheck() { 267 // noop 268 } 269 270 /** 271 * Polls the given directory for files to process 272 * 273 * @param fileName current directory or file 274 * @param fileList current list of files gathered 275 * @param depth the current depth of the directory (will start from 0) 276 * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit 277 */ 278 protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth); 279 280 /** 281 * Sets the operations to be used. 282 * <p/> 283 * Can be used to set a fresh operations in case of recovery attempts 284 * 285 * @param operations the operations 286 */ 287 public void setOperations(GenericFileOperations<T> operations) { 288 this.operations = operations; 289 } 290 291 /** 292 * Whether to ignore if the file cannot be retrieved. 293 * <p/> 294 * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved. 295 * <p/> 296 * This method allows to suppress this and just ignore that. 297 * 298 * @param name the file name 299 * @param exchange the exchange 300 * @param cause optional exception occurred during retrieving file 301 * @return <tt>true</tt> to ignore, <tt>false</tt> is the default. 302 */ 303 protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) { 304 return false; 305 } 306 307 /** 308 * Processes the exchange 309 * 310 * @param exchange the exchange 311 * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started 312 * to be processed, for some reason (not found, or aborted etc) 313 */ 314 protected boolean processExchange(final Exchange exchange) { 315 GenericFile<T> file = getExchangeFileProperty(exchange); 316 log.trace("Processing file: {}", file); 317 318 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 319 // and then the file name would be changed 320 String absoluteFileName = file.getAbsoluteFilePath(); 321 322 // check if we can begin processing the file 323 try { 324 final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 325 326 boolean begin = processStrategy.begin(operations, endpoint, exchange, file); 327 if (!begin) { 328 log.debug("{} cannot begin processing file: {}", endpoint, file); 329 try { 330 // abort 331 processStrategy.abort(operations, endpoint, exchange, file); 332 } finally { 333 // begin returned false, so remove file from the in progress list as its no longer in progress 334 endpoint.getInProgressRepository().remove(absoluteFileName); 335 } 336 return false; 337 } 338 } catch (Exception e) { 339 // remove file from the in progress list due to failure 340 endpoint.getInProgressRepository().remove(absoluteFileName); 341 342 String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(); 343 handleException(msg, e); 344 return false; 345 } 346 347 // must use file from exchange as it can be updated due the 348 // preMoveNamePrefix/preMoveNamePostfix options 349 final GenericFile<T> target = getExchangeFileProperty(exchange); 350 // must use full name when downloading so we have the correct path 351 final String name = target.getAbsoluteFilePath(); 352 try { 353 354 if (isRetrieveFile()) { 355 // retrieve the file using the stream 356 log.trace("Retrieving file: {} from: {}", name, endpoint); 357 358 // retrieve the file and check it was a success 359 boolean retrieved; 360 Exception cause = null; 361 try { 362 retrieved = operations.retrieveFile(name, exchange); 363 } catch (Exception e) { 364 retrieved = false; 365 cause = e; 366 } 367 368 if (!retrieved) { 369 if (ignoreCannotRetrieveFile(name, exchange, cause)) { 370 log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name); 371 // remove file from the in progress list as we could not retrieve it, but should ignore 372 endpoint.getInProgressRepository().remove(absoluteFileName); 373 return false; 374 } else { 375 // throw exception to handle the problem with retrieving the file 376 // then if the method return false or throws an exception is handled the same in here 377 // as in both cases an exception is being thrown 378 if (cause != null && cause instanceof GenericFileOperationFailedException) { 379 throw cause; 380 } else { 381 throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause); 382 } 383 } 384 } 385 386 log.trace("Retrieved file: {} from: {}", name, endpoint); 387 } else { 388 log.trace("Skipped retrieval of file: {} from: {}", name, endpoint); 389 exchange.getIn().setBody(null); 390 } 391 392 // register on completion callback that does the completion strategies 393 // (for instance to move the file after we have processed it) 394 exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName)); 395 396 log.debug("About to process file: {} using exchange: {}", target, exchange); 397 398 // process the exchange using the async consumer to support async routing engine 399 // which can be supported by this file consumer as all the done work is 400 // provided in the GenericFileOnCompletion 401 getAsyncProcessor().process(exchange, new AsyncCallback() { 402 public void done(boolean doneSync) { 403 // noop 404 if (log.isTraceEnabled()) { 405 log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); 406 } 407 } 408 }); 409 410 } catch (Exception e) { 411 // remove file from the in progress list due to failure 412 // (cannot be in finally block due to GenericFileOnCompletion will remove it 413 // from in progress when it takes over and processes the file, which may happen 414 // by another thread at a later time. So its only safe to remove it if there was an exception) 415 endpoint.getInProgressRepository().remove(absoluteFileName); 416 417 String msg = "Error processing file " + file + " due to " + e.getMessage(); 418 handleException(msg, e); 419 } 420 421 return true; 422 } 423 424 /** 425 * Override if required. Files are retrieved / returns true by default 426 * 427 * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files. 428 */ 429 protected boolean isRetrieveFile() { 430 return true; 431 } 432 433 /** 434 * Processes the exchange using a custom processor. 435 * 436 * @param exchange the exchange 437 * @param processor the custom processor 438 */ 439 protected boolean customProcessExchange(final Exchange exchange, final Processor processor) { 440 GenericFile<T> file = getExchangeFileProperty(exchange); 441 log.trace("Custom processing file: {}", file); 442 443 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 444 // and then the file name would be changed 445 String absoluteFileName = file.getAbsoluteFilePath(); 446 447 try { 448 // process using the custom processor 449 processor.process(exchange); 450 } catch (Exception e) { 451 if (log.isDebugEnabled()) { 452 log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); 453 } 454 handleException(e); 455 } finally { 456 // always remove file from the in progress list as its no longer in progress 457 // use the original file name that was used to add it to the repository 458 // as the name can be different when using preMove option 459 endpoint.getInProgressRepository().remove(absoluteFileName); 460 } 461 462 return true; 463 } 464 465 /** 466 * Strategy for validating if the given remote file should be included or not 467 * 468 * @param file the file 469 * @param isDirectory whether the file is a directory or a file 470 * @param files files in the directory 471 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 472 */ 473 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { 474 String absoluteFilePath = file.getAbsoluteFilePath(); 475 476 if (!isMatched(file, isDirectory, files)) { 477 log.trace("File did not match. Will skip this file: {}", file); 478 return false; 479 } 480 481 // directory is always valid 482 if (isDirectory) { 483 return true; 484 } 485 486 // check if file is already in progress 487 if (endpoint.getInProgressRepository().contains(absoluteFilePath)) { 488 if (log.isTraceEnabled()) { 489 log.trace("Skipping as file is already in progress: {}", file.getFileName()); 490 } 491 return false; 492 } 493 494 // if its a file then check we have the file in the idempotent registry already 495 if (endpoint.isIdempotent()) { 496 // use absolute file path as default key, but evaluate if an expression key was configured 497 String key = file.getAbsoluteFilePath(); 498 if (endpoint.getIdempotentKey() != null) { 499 Exchange dummy = endpoint.createExchange(file); 500 key = endpoint.getIdempotentKey().evaluate(dummy, String.class); 501 } 502 if (key != null && endpoint.getIdempotentRepository().contains(key)) { 503 log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file); 504 return false; 505 } 506 } 507 508 // okay so final step is to be able to add atomic as in-progress, so we are the 509 // only thread processing this file 510 return endpoint.getInProgressRepository().add(absoluteFilePath); 511 } 512 513 /** 514 * Strategy to perform file matching based on endpoint configuration. 515 * <p/> 516 * Will always return <tt>false</tt> for certain files/folders: 517 * <ul> 518 * <li>Starting with a dot</li> 519 * <li>lock files</li> 520 * </ul> 521 * And then <tt>true</tt> for directories. 522 * 523 * @param file the file 524 * @param isDirectory whether the file is a directory or a file 525 * @param files files in the directory 526 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 527 */ 528 protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { 529 String name = file.getFileNameOnly(); 530 531 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 532 if (name.startsWith(".")) { 533 return false; 534 } 535 536 // lock files should be skipped 537 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 538 return false; 539 } 540 541 if (endpoint.getFilter() != null) { 542 if (!endpoint.getFilter().accept(file)) { 543 return false; 544 } 545 } 546 547 if (endpoint.getAntFilter() != null) { 548 if (!endpoint.getAntFilter().accept(file)) { 549 return false; 550 } 551 } 552 553 // directories are regarded as matched if filter accepted them 554 if (isDirectory) { 555 return true; 556 } 557 558 if (ObjectHelper.isNotEmpty(endpoint.getExclude())) { 559 if (name.matches(endpoint.getExclude())) { 560 return false; 561 } 562 } 563 564 if (ObjectHelper.isNotEmpty(endpoint.getInclude())) { 565 if (!name.matches(endpoint.getInclude())) { 566 return false; 567 } 568 } 569 570 // use file expression for a simple dynamic file filter 571 if (endpoint.getFileName() != null) { 572 fileExpressionResult = evaluateFileExpression(); 573 if (fileExpressionResult != null) { 574 if (!name.equals(fileExpressionResult)) { 575 return false; 576 } 577 } 578 } 579 580 // if done file name is enabled, then the file is only valid if a done file exists 581 if (endpoint.getDoneFileName() != null) { 582 // done file must be in same path as the file 583 String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); 584 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 585 586 // is it a done file name? 587 if (endpoint.isDoneFile(file.getFileNameOnly())) { 588 log.trace("Skipping done file: {}", file); 589 return false; 590 } 591 592 if (!isMatched(file, doneFileName, files)) { 593 return false; 594 } 595 } 596 597 return true; 598 } 599 600 /** 601 * Strategy to perform file matching based on endpoint configuration in terms of done file name. 602 * 603 * @param file the file 604 * @param doneFileName the done file name (without any paths) 605 * @param files files in the directory 606 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 607 */ 608 protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); 609 610 /** 611 * Is the given file already in progress. 612 * 613 * @param file the file 614 * @return <tt>true</tt> if the file is already in progress 615 * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead. 616 */ 617 @Deprecated 618 protected boolean isInProgress(GenericFile<T> file) { 619 String key = file.getAbsoluteFilePath(); 620 // must use add, to have operation as atomic 621 return !endpoint.getInProgressRepository().add(key); 622 } 623 624 protected String evaluateFileExpression() { 625 if (fileExpressionResult == null && endpoint.getFileName() != null) { 626 // create a dummy exchange as Exchange is needed for expression evaluation 627 Exchange dummy = endpoint.createExchange(); 628 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); 629 } 630 return fileExpressionResult; 631 } 632 633 @SuppressWarnings("unchecked") 634 private GenericFile<T> getExchangeFileProperty(Exchange exchange) { 635 return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); 636 } 637 638 @Override 639 protected void doStart() throws Exception { 640 super.doStart(); 641 } 642 643 @Override 644 protected void doStop() throws Exception { 645 prepareOnStartup = false; 646 super.doStop(); 647 } 648 }