001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.Deque;
022    import java.util.LinkedList;
023    import java.util.List;
024    import java.util.Queue;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Processor;
029    import org.apache.camel.ShutdownRunningTask;
030    import org.apache.camel.impl.ScheduledBatchPollingConsumer;
031    import org.apache.camel.spi.UriParam;
032    import org.apache.camel.util.CastUtils;
033    import org.apache.camel.util.ObjectHelper;
034    import org.apache.camel.util.StopWatch;
035    import org.apache.camel.util.TimeUtils;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    /**
040     * Base class for file consumers.
041     */
042    public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer {
043        protected final Logger log = LoggerFactory.getLogger(getClass());
044        protected GenericFileEndpoint<T> endpoint;
045        protected GenericFileOperations<T> operations;
046        protected volatile boolean loggedIn;
047        protected String fileExpressionResult;
048        protected volatile ShutdownRunningTask shutdownRunningTask;
049        protected volatile int pendingExchanges;
050        protected Processor customProcessor;
051        @UriParam
052        protected boolean eagerLimitMaxMessagesPerPoll = true;
053        protected volatile boolean prepareOnStartup;
054    
055        public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
056            super(endpoint, processor);
057            this.endpoint = endpoint;
058            this.operations = operations;
059        }
060    
061        public Processor getCustomProcessor() {
062            return customProcessor;
063        }
064    
065        /**
066         * Use a custom processor to process the exchange.
067         * <p/>
068         * Only set this if you need to do custom processing, instead of the regular processing.
069         * <p/>
070         * This is for example used to browse file endpoints by leveraging the file consumer to poll
071         * the directory to gather the list of exchanges. But to avoid processing the files regularly
072         * we can use a custom processor.
073         *
074         * @param processor a custom processor
075         */
076        public void setCustomProcessor(Processor processor) {
077            this.customProcessor = processor;
078        }
079    
080        public boolean isEagerLimitMaxMessagesPerPoll() {
081            return eagerLimitMaxMessagesPerPoll;
082        }
083    
084        public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
085            this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
086        }
087    
088        /**
089         * Poll for files
090         */
091        protected int poll() throws Exception {
092            // must prepare on startup the very first time
093            if (!prepareOnStartup) {
094                // prepare on startup
095                endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
096                prepareOnStartup = true;
097            }
098    
099            // must reset for each poll
100            fileExpressionResult = null;
101            shutdownRunningTask = null;
102            pendingExchanges = 0;
103    
104            // before we poll is there anything we need to check?
105            // such as are we connected to the FTP Server still?
106            if (!prePollCheck()) {
107                log.debug("Skipping poll as pre poll check returned false");
108                return 0;
109            }
110    
111            // gather list of files to process
112            List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
113            String name = endpoint.getConfiguration().getDirectory();
114    
115            // time how long time it takes to poll
116            StopWatch stop = new StopWatch();
117            boolean limitHit = !pollDirectory(name, files, 0);
118            long delta = stop.stop();
119            if (log.isDebugEnabled()) {
120                log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name);
121            }
122    
123            // log if we hit the limit
124            if (limitHit) {
125                log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll);
126            }
127    
128            // sort files using file comparator if provided
129            if (endpoint.getSorter() != null) {
130                Collections.sort(files, endpoint.getSorter());
131            }
132    
133            // sort using build in sorters so we can use expressions
134            // use a linked list so we can dequeue the exchanges
135            LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
136            for (GenericFile<T> file : files) {
137                Exchange exchange = endpoint.createExchange(file);
138                endpoint.configureExchange(exchange);
139                endpoint.configureMessage(file, exchange.getIn());
140                exchanges.add(exchange);
141            }
142            // sort files using exchange comparator if provided
143            if (endpoint.getSortBy() != null) {
144                Collections.sort(exchanges, endpoint.getSortBy());
145            }
146    
147            // use a queue for the exchanges
148            Deque<Exchange> q = exchanges;
149    
150            // we are not eager limiting, but we have configured a limit, so cut the list of files
151            if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
152                if (files.size() > maxMessagesPerPoll) {
153                    log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll);
154                    // must first remove excessive files from the in progress repository
155                    removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
156                }
157            }
158    
159            // consume files one by one
160            int total = exchanges.size();
161            if (total > 0) {
162                log.debug("Total {} files to consume", total);
163            }
164    
165            int polledMessages = processBatch(CastUtils.cast(q));
166    
167            postPollCheck();
168    
169            return polledMessages;
170        }
171    
172        public int processBatch(Queue<Object> exchanges) {
173            int total = exchanges.size();
174            int answer = total;
175    
176            // limit if needed
177            if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
178                log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", maxMessagesPerPoll, total);
179                total = maxMessagesPerPoll;
180            }
181    
182            for (int index = 0; index < total && isBatchAllowed(); index++) {
183                // only loop if we are started (allowed to run)
184                // use poll to remove the head so it does not consume memory even after we have processed it
185                Exchange exchange = (Exchange) exchanges.poll();
186                // add current index and total as properties
187                exchange.setProperty(Exchange.BATCH_INDEX, index);
188                exchange.setProperty(Exchange.BATCH_SIZE, total);
189                exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
190    
191                // update pending number of exchanges
192                pendingExchanges = total - index - 1;
193    
194                // process the current exchange
195                boolean started;
196                if (customProcessor != null) {
197                    // use a custom processor
198                    started = customProcessExchange(exchange, customProcessor);
199                } else {
200                    // process the exchange regular
201                    started = processExchange(exchange);
202                }
203    
204                // if we did not start process the file then decrement the counter
205                if (!started) {
206                    answer--;
207                }
208            }
209    
210            // drain any in progress files as we are done with this batch
211            removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
212    
213            return answer;
214        }
215    
216        /**
217         * Drain any in progress files as we are done with this batch
218         *
219         * @param exchanges  the exchanges
220         * @param limit      the limit
221         */
222        protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
223            // remove the file from the in progress list in case the batch was limited by max messages per poll
224            while (exchanges.size() > limit) {
225                // must remove last
226                Exchange exchange = exchanges.removeLast();
227                GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
228                String key = file.getAbsoluteFilePath();
229                endpoint.getInProgressRepository().remove(key);
230            }
231        }
232    
233        /**
234         * Whether or not we can continue polling for more files
235         *
236         * @param fileList  the current list of gathered files
237         * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
238         */
239        public boolean canPollMoreFiles(List<?> fileList) {
240            // at this point we should not limit if we are not eager
241            if (!eagerLimitMaxMessagesPerPoll) {
242                return true;
243            }
244    
245            if (maxMessagesPerPoll <= 0) {
246                // no limitation
247                return true;
248            }
249    
250            // then only poll if we haven't reached the max limit
251            return fileList.size() < maxMessagesPerPoll;
252        }
253    
254        /**
255         * Override if required. Perform some checks (and perhaps actions) before we poll.
256         *
257         * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll.
258         */
259        protected boolean prePollCheck() throws Exception {
260            return true;
261        }
262    
263        /**
264         * Override if required. Perform some checks (and perhaps actions) after we have polled.
265         */
266        protected void postPollCheck() {
267            // noop
268        }
269    
270        /**
271         * Polls the given directory for files to process
272         *
273         * @param fileName current directory or file
274         * @param fileList current list of files gathered
275         * @param depth the current depth of the directory (will start from 0)
276         * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
277         */
278        protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth);
279    
280        /**
281         * Sets the operations to be used.
282         * <p/>
283         * Can be used to set a fresh operations in case of recovery attempts
284         *
285         * @param operations the operations
286         */
287        public void setOperations(GenericFileOperations<T> operations) {
288            this.operations = operations;
289        }
290    
291        /**
292         * Whether to ignore if the file cannot be retrieved.
293         * <p/>
294         * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
295         * <p/>
296         * This method allows to suppress this and just ignore that.
297         *
298         * @param name        the file name
299         * @param exchange    the exchange
300         * @param cause       optional exception occurred during retrieving file
301         * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
302         */
303        protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) {
304            return false;
305        }
306    
307        /**
308         * Processes the exchange
309         *
310         * @param exchange the exchange
311         * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
312         * to be processed, for some reason (not found, or aborted etc)
313         */
314        protected boolean processExchange(final Exchange exchange) {
315            GenericFile<T> file = getExchangeFileProperty(exchange);
316            log.trace("Processing file: {}", file);
317    
318            // must extract the absolute name before the begin strategy as the file could potentially be pre moved
319            // and then the file name would be changed
320            String absoluteFileName = file.getAbsoluteFilePath();
321    
322            // check if we can begin processing the file
323            try {
324                final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
325    
326                boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
327                if (!begin) {
328                    log.debug("{} cannot begin processing file: {}", endpoint, file);
329                    try {
330                        // abort
331                        processStrategy.abort(operations, endpoint, exchange, file);
332                    } finally {
333                        // begin returned false, so remove file from the in progress list as its no longer in progress
334                        endpoint.getInProgressRepository().remove(absoluteFileName);
335                    }
336                    return false;
337                }
338            } catch (Exception e) {
339                // remove file from the in progress list due to failure
340                endpoint.getInProgressRepository().remove(absoluteFileName);
341    
342                String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage();
343                handleException(msg, e);
344                return false;
345            }
346    
347            // must use file from exchange as it can be updated due the
348            // preMoveNamePrefix/preMoveNamePostfix options
349            final GenericFile<T> target = getExchangeFileProperty(exchange);
350            // must use full name when downloading so we have the correct path
351            final String name = target.getAbsoluteFilePath();
352            try {
353                
354                if (isRetrieveFile()) {
355                    // retrieve the file using the stream
356                    log.trace("Retrieving file: {} from: {}", name, endpoint);
357        
358                    // retrieve the file and check it was a success
359                    boolean retrieved;
360                    Exception cause = null;
361                    try {
362                        retrieved = operations.retrieveFile(name, exchange);
363                    } catch (Exception e) {
364                        retrieved = false;
365                        cause = e;
366                    }
367    
368                    if (!retrieved) {
369                        if (ignoreCannotRetrieveFile(name, exchange, cause)) {
370                            log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name);
371                            // remove file from the in progress list as we could not retrieve it, but should ignore
372                            endpoint.getInProgressRepository().remove(absoluteFileName);
373                            return false;
374                        } else {
375                            // throw exception to handle the problem with retrieving the file
376                            // then if the method return false or throws an exception is handled the same in here
377                            // as in both cases an exception is being thrown
378                            if (cause != null && cause instanceof GenericFileOperationFailedException) {
379                                throw cause;
380                            } else {
381                                throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause);
382                            }
383                        }
384                    }
385        
386                    log.trace("Retrieved file: {} from: {}", name, endpoint);                
387                } else {
388                    log.trace("Skipped retrieval of file: {} from: {}", name, endpoint);
389                    exchange.getIn().setBody(null);
390                }
391    
392                // register on completion callback that does the completion strategies
393                // (for instance to move the file after we have processed it)
394                exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
395    
396                log.debug("About to process file: {} using exchange: {}", target, exchange);
397    
398                // process the exchange using the async consumer to support async routing engine
399                // which can be supported by this file consumer as all the done work is
400                // provided in the GenericFileOnCompletion
401                getAsyncProcessor().process(exchange, new AsyncCallback() {
402                    public void done(boolean doneSync) {
403                        // noop
404                        if (log.isTraceEnabled()) {
405                            log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
406                        }
407                    }
408                });
409    
410            } catch (Exception e) {
411                // remove file from the in progress list due to failure
412                // (cannot be in finally block due to GenericFileOnCompletion will remove it
413                // from in progress when it takes over and processes the file, which may happen
414                // by another thread at a later time. So its only safe to remove it if there was an exception)
415                endpoint.getInProgressRepository().remove(absoluteFileName);
416    
417                String msg = "Error processing file " + file + " due to " + e.getMessage();
418                handleException(msg, e);
419            }
420    
421            return true;
422        }
423    
424        /**
425         * Override if required.  Files are retrieved / returns true by default
426         *
427         * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files.
428         */
429        protected boolean isRetrieveFile() {
430            return true;
431        }
432    
433        /**
434         * Processes the exchange using a custom processor.
435         *
436         * @param exchange the exchange
437         * @param processor the custom processor
438         */
439        protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
440            GenericFile<T> file = getExchangeFileProperty(exchange);
441            log.trace("Custom processing file: {}", file);
442    
443            // must extract the absolute name before the begin strategy as the file could potentially be pre moved
444            // and then the file name would be changed
445            String absoluteFileName = file.getAbsoluteFilePath();
446    
447            try {
448                // process using the custom processor
449                processor.process(exchange);
450            } catch (Exception e) {
451                if (log.isDebugEnabled()) {
452                    log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
453                }
454                handleException(e);
455            } finally {
456                // always remove file from the in progress list as its no longer in progress
457                // use the original file name that was used to add it to the repository
458                // as the name can be different when using preMove option
459                endpoint.getInProgressRepository().remove(absoluteFileName);
460            }
461    
462            return true;
463        }
464    
465        /**
466         * Strategy for validating if the given remote file should be included or not
467         *
468         * @param file        the file
469         * @param isDirectory whether the file is a directory or a file
470         * @param files       files in the directory
471         * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
472         */
473        protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
474            String absoluteFilePath = file.getAbsoluteFilePath();
475    
476            if (!isMatched(file, isDirectory, files)) {
477                log.trace("File did not match. Will skip this file: {}", file);
478                return false;
479            }
480    
481            // directory is always valid
482            if (isDirectory) {
483                return true;
484            }
485    
486            // check if file is already in progress
487            if (endpoint.getInProgressRepository().contains(absoluteFilePath)) {
488                if (log.isTraceEnabled()) {
489                    log.trace("Skipping as file is already in progress: {}", file.getFileName());
490                }
491                return false;
492            }
493    
494            // if its a file then check we have the file in the idempotent registry already
495            if (endpoint.isIdempotent()) {
496                // use absolute file path as default key, but evaluate if an expression key was configured
497                String key = file.getAbsoluteFilePath();
498                if (endpoint.getIdempotentKey() != null) {
499                    Exchange dummy = endpoint.createExchange(file);
500                    key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
501                }
502                if (key != null && endpoint.getIdempotentRepository().contains(key)) {
503                    log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file);
504                    return false;
505                }
506            }
507    
508            // okay so final step is to be able to add atomic as in-progress, so we are the
509            // only thread processing this file
510            return endpoint.getInProgressRepository().add(absoluteFilePath);
511        }
512    
513        /**
514         * Strategy to perform file matching based on endpoint configuration.
515         * <p/>
516         * Will always return <tt>false</tt> for certain files/folders:
517         * <ul>
518         * <li>Starting with a dot</li>
519         * <li>lock files</li>
520         * </ul>
521         * And then <tt>true</tt> for directories.
522         *
523         * @param file        the file
524         * @param isDirectory whether the file is a directory or a file
525         * @param files       files in the directory
526         * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
527         */
528        protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
529            String name = file.getFileNameOnly();
530    
531            // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
532            if (name.startsWith(".")) {
533                return false;
534            }
535    
536            // lock files should be skipped
537            if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
538                return false;
539            }
540    
541            if (endpoint.getFilter() != null) {
542                if (!endpoint.getFilter().accept(file)) {
543                    return false;
544                }
545            }
546    
547            if (endpoint.getAntFilter() != null) {
548                if (!endpoint.getAntFilter().accept(file)) {
549                    return false;
550                }
551            }
552    
553            // directories are regarded as matched if filter accepted them
554            if (isDirectory) {
555                return true;
556            }
557    
558            if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
559                if (name.matches(endpoint.getExclude())) {
560                    return false;
561                }
562            }
563    
564            if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
565                if (!name.matches(endpoint.getInclude())) {
566                    return false;
567                }
568            }
569    
570            // use file expression for a simple dynamic file filter
571            if (endpoint.getFileName() != null) {
572                fileExpressionResult = evaluateFileExpression();
573                if (fileExpressionResult != null) {
574                    if (!name.equals(fileExpressionResult)) {
575                        return false;
576                    }
577                }
578            }
579    
580            // if done file name is enabled, then the file is only valid if a done file exists
581            if (endpoint.getDoneFileName() != null) {
582                // done file must be in same path as the file
583                String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
584                ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
585    
586                // is it a done file name?
587                if (endpoint.isDoneFile(file.getFileNameOnly())) {
588                    log.trace("Skipping done file: {}", file);
589                    return false;
590                }
591    
592                if (!isMatched(file, doneFileName, files)) {
593                    return false;
594                }
595            }
596    
597            return true;
598        }
599    
600        /**
601         * Strategy to perform file matching based on endpoint configuration in terms of done file name.
602         *
603         * @param file         the file
604         * @param doneFileName the done file name (without any paths)
605         * @param files        files in the directory
606         * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
607         */
608        protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
609    
610        /**
611         * Is the given file already in progress.
612         *
613         * @param file the file
614         * @return <tt>true</tt> if the file is already in progress
615         * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead.
616         */
617        @Deprecated
618        protected boolean isInProgress(GenericFile<T> file) {
619            String key = file.getAbsoluteFilePath();
620            // must use add, to have operation as atomic
621            return !endpoint.getInProgressRepository().add(key);
622        }
623    
624        protected String evaluateFileExpression() {
625            if (fileExpressionResult == null && endpoint.getFileName() != null) {
626                // create a dummy exchange as Exchange is needed for expression evaluation
627                Exchange dummy = endpoint.createExchange();
628                fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
629            }
630            return fileExpressionResult;
631        }
632    
633        @SuppressWarnings("unchecked")
634        private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
635            return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
636        }
637    
638        @Override
639        protected void doStart() throws Exception {
640            super.doStart();
641        }
642    
643        @Override
644        protected void doStop() throws Exception {
645            prepareOnStartup = false;
646            super.doStop();
647        }
648    }