001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.timer; 018 019 import java.util.Date; 020 import java.util.Timer; 021 import java.util.TimerTask; 022 import java.util.concurrent.atomic.AtomicLong; 023 024 import org.apache.camel.AsyncCallback; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Processor; 027 import org.apache.camel.impl.DefaultConsumer; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * The timer consumer. 033 * 034 * @version 035 */ 036 public class TimerConsumer extends DefaultConsumer { 037 private static final Logger LOG = LoggerFactory.getLogger(TimerConsumer.class); 038 private final TimerEndpoint endpoint; 039 private volatile TimerTask task; 040 041 public TimerConsumer(TimerEndpoint endpoint, Processor processor) { 042 super(endpoint, processor); 043 this.endpoint = endpoint; 044 } 045 046 @Override 047 protected void doStart() throws Exception { 048 task = new TimerTask() { 049 // counter 050 private final AtomicLong counter = new AtomicLong(); 051 052 @Override 053 public void run() { 054 if (!isTaskRunAllowed()) { 055 // do not run timer task as it was not allowed 056 return; 057 } 058 059 try { 060 long count = counter.incrementAndGet(); 061 062 boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount(); 063 if (fire) { 064 sendTimerExchange(count); 065 } else { 066 // no need to fire anymore as we exceeded repeat count 067 LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount()); 068 cancel(); 069 } 070 } catch (Throwable e) { 071 // catch all to avoid the JVM closing the thread and not firing again 072 LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e); 073 } 074 } 075 }; 076 077 Timer timer = endpoint.getTimer(); 078 configureTask(task, timer); 079 } 080 081 @Override 082 protected void doStop() throws Exception { 083 if (task != null) { 084 task.cancel(); 085 } 086 task = null; 087 } 088 089 /** 090 * Whether the timer task is allow to run or not 091 */ 092 protected boolean isTaskRunAllowed() { 093 // only allow running the timer task if we can run and are not suspended, 094 // and CamelContext must have been fully started 095 return endpoint.getCamelContext().getStatus().isStarted() && isRunAllowed() && !isSuspended(); 096 } 097 098 protected void configureTask(TimerTask task, Timer timer) { 099 if (endpoint.isFixedRate()) { 100 if (endpoint.getTime() != null) { 101 timer.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod()); 102 } else { 103 timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod()); 104 } 105 } else { 106 if (endpoint.getTime() != null) { 107 if (endpoint.getPeriod() > 0) { 108 timer.schedule(task, endpoint.getTime(), endpoint.getPeriod()); 109 } else { 110 timer.schedule(task, endpoint.getTime()); 111 } 112 } else { 113 if (endpoint.getPeriod() > 0) { 114 timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod()); 115 } else { 116 timer.schedule(task, endpoint.getDelay()); 117 } 118 } 119 } 120 } 121 122 protected void sendTimerExchange(long counter) { 123 final Exchange exchange = endpoint.createExchange(); 124 exchange.setProperty(Exchange.TIMER_COUNTER, counter); 125 exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName()); 126 exchange.setProperty(Exchange.TIMER_TIME, endpoint.getTime()); 127 exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod()); 128 129 Date now = new Date(); 130 exchange.setProperty(Exchange.TIMER_FIRED_TIME, now); 131 // also set now on in header with same key as quartz to be consistent 132 exchange.getIn().setHeader("firedTime", now); 133 134 if (LOG.isTraceEnabled()) { 135 LOG.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter); 136 } 137 138 if (!endpoint.isSynchronous()) { 139 getAsyncProcessor().process(exchange, new AsyncCallback() { 140 @Override 141 public void done(boolean doneSync) { 142 // handle any thrown exception 143 if (exchange.getException() != null) { 144 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 145 } 146 } 147 }); 148 } else { 149 try { 150 getProcessor().process(exchange); 151 } catch (Exception e) { 152 exchange.setException(e); 153 } 154 155 // handle any thrown exception 156 if (exchange.getException() != null) { 157 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 158 } 159 } 160 } 161 }