001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.timer;
018    
019    import java.util.Date;
020    import java.util.Timer;
021    import java.util.TimerTask;
022    import java.util.concurrent.atomic.AtomicLong;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Processor;
027    import org.apache.camel.impl.DefaultConsumer;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * The timer consumer.
033     *
034     * @version 
035     */
036    public class TimerConsumer extends DefaultConsumer {
037        private static final Logger LOG = LoggerFactory.getLogger(TimerConsumer.class);
038        private final TimerEndpoint endpoint;
039        private volatile TimerTask task;
040    
041        public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
042            super(endpoint, processor);
043            this.endpoint = endpoint;
044        }
045    
046        @Override
047        protected void doStart() throws Exception {
048            task = new TimerTask() {
049                // counter
050                private final AtomicLong counter = new AtomicLong();
051    
052                @Override
053                public void run() {
054                    if (!isTaskRunAllowed()) {
055                        // do not run timer task as it was not allowed
056                        return;
057                    }
058    
059                    try {
060                        long count = counter.incrementAndGet();
061    
062                        boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
063                        if (fire) {
064                            sendTimerExchange(count);
065                        } else {
066                            // no need to fire anymore as we exceeded repeat count
067                            LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
068                            cancel();
069                        }
070                    } catch (Throwable e) {
071                        // catch all to avoid the JVM closing the thread and not firing again
072                        LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
073                    }
074                }
075            };
076    
077            Timer timer = endpoint.getTimer();
078            configureTask(task, timer);
079        }
080    
081        @Override
082        protected void doStop() throws Exception {
083            if (task != null) {
084                task.cancel();
085            }
086            task = null;
087        }
088    
089        /**
090         * Whether the timer task is allow to run or not
091         */
092        protected boolean isTaskRunAllowed() {
093            // only allow running the timer task if we can run and are not suspended,
094            // and CamelContext must have been fully started
095            return endpoint.getCamelContext().getStatus().isStarted() && isRunAllowed() && !isSuspended();
096        }
097    
098        protected void configureTask(TimerTask task, Timer timer) {
099            if (endpoint.isFixedRate()) {
100                if (endpoint.getTime() != null) {
101                    timer.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod());
102                } else {
103                    timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
104                }
105            } else {
106                if (endpoint.getTime() != null) {
107                    if (endpoint.getPeriod() > 0) {
108                        timer.schedule(task, endpoint.getTime(), endpoint.getPeriod());
109                    } else {
110                        timer.schedule(task, endpoint.getTime());
111                    }
112                } else {
113                    if (endpoint.getPeriod() > 0) {
114                        timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
115                    } else {
116                        timer.schedule(task, endpoint.getDelay());
117                    }
118                }
119            }
120        }
121    
122        protected void sendTimerExchange(long counter) {
123            final Exchange exchange = endpoint.createExchange();
124            exchange.setProperty(Exchange.TIMER_COUNTER, counter);
125            exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
126            exchange.setProperty(Exchange.TIMER_TIME, endpoint.getTime());
127            exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
128    
129            Date now = new Date();
130            exchange.setProperty(Exchange.TIMER_FIRED_TIME, now);
131            // also set now on in header with same key as quartz to be consistent
132            exchange.getIn().setHeader("firedTime", now);
133    
134            if (LOG.isTraceEnabled()) {
135                LOG.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter);
136            }
137    
138            if (!endpoint.isSynchronous()) {
139                getAsyncProcessor().process(exchange, new AsyncCallback() {
140                    @Override
141                    public void done(boolean doneSync) {
142                        // handle any thrown exception
143                        if (exchange.getException() != null) {
144                            getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
145                        }
146                    }
147                });
148            } else {
149                try {
150                    getProcessor().process(exchange);
151                } catch (Exception e) {
152                    exchange.setException(e);
153                }
154    
155                // handle any thrown exception
156                if (exchange.getException() != null) {
157                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
158                }
159            }
160        }
161    }