001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.RejectedExecutionException; 020 import java.util.concurrent.ScheduledExecutorService; 021 import java.util.concurrent.TimeUnit; 022 import java.util.concurrent.atomic.AtomicInteger; 023 024 import org.apache.camel.AsyncCallback; 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Processor; 028 import org.apache.camel.util.ObjectHelper; 029 import org.slf4j.Logger; 030 import org.slf4j.LoggerFactory; 031 032 /** 033 * A useful base class for any processor which provides some kind of throttling 034 * or delayed processing. 035 * <p/> 036 * This implementation will block while waiting. 037 * 038 * @version 039 */ 040 public abstract class DelayProcessorSupport extends DelegateAsyncProcessor { 041 protected final Logger log = LoggerFactory.getLogger(getClass()); 042 private final CamelContext camelContext; 043 private final ScheduledExecutorService executorService; 044 private final boolean shutdownExecutorService; 045 private boolean asyncDelayed; 046 private boolean callerRunsWhenRejected = true; 047 private final AtomicInteger delayedCount = new AtomicInteger(0); 048 049 // TODO: Add option to cancel tasks on shutdown so we can stop fast 050 051 private final class ProcessCall implements Runnable { 052 private final Exchange exchange; 053 private final AsyncCallback callback; 054 055 public ProcessCall(Exchange exchange, AsyncCallback callback) { 056 this.exchange = exchange; 057 this.callback = callback; 058 } 059 060 public void run() { 061 // we are running now so decrement the counter 062 delayedCount.decrementAndGet(); 063 064 log.trace("Delayed task woke up and continues routing for exchangeId: {}", exchange.getExchangeId()); 065 if (!isRunAllowed()) { 066 exchange.setException(new RejectedExecutionException("Run is not allowed")); 067 } 068 069 // process the exchange now that we woke up 070 DelayProcessorSupport.this.processor.process(exchange, new AsyncCallback() { 071 @Override 072 public void done(boolean doneSync) { 073 log.trace("Delayed task done for exchangeId: {}", exchange.getExchangeId()); 074 // we must done the callback from this async callback as well, to ensure callback is done correctly 075 // must invoke done on callback with false, as that is what the original caller would 076 // expect as we returned false in the process method 077 callback.done(false); 078 } 079 }); 080 } 081 } 082 083 public DelayProcessorSupport(CamelContext camelContext, Processor processor) { 084 this(camelContext, processor, null, false); 085 } 086 087 public DelayProcessorSupport(CamelContext camelContext, Processor processor, ScheduledExecutorService executorService, boolean shutdownExecutorService) { 088 super(processor); 089 this.camelContext = camelContext; 090 this.executorService = executorService; 091 this.shutdownExecutorService = shutdownExecutorService; 092 } 093 094 @Override 095 public boolean process(Exchange exchange, AsyncCallback callback) { 096 if (!isRunAllowed()) { 097 exchange.setException(new RejectedExecutionException("Run is not allowed")); 098 callback.done(true); 099 return true; 100 } 101 102 // calculate delay and wait 103 long delay; 104 try { 105 delay = calculateDelay(exchange); 106 if (delay <= 0) { 107 // no delay then continue routing 108 log.trace("No delay for exchangeId: {}", exchange.getExchangeId()); 109 return processor.process(exchange, callback); 110 } 111 } catch (Throwable e) { 112 exchange.setException(e); 113 callback.done(true); 114 return true; 115 } 116 117 if (!isAsyncDelayed() || exchange.isTransacted()) { 118 // use synchronous delay (also required if using transactions) 119 try { 120 delay(delay, exchange); 121 // then continue routing 122 return processor.process(exchange, callback); 123 } catch (Exception e) { 124 // exception occurred so we are done 125 exchange.setException(e); 126 callback.done(true); 127 return true; 128 } 129 } else { 130 // asynchronous delay so schedule a process call task 131 // and increment the counter (we decrement the counter when we run the ProcessCall) 132 delayedCount.incrementAndGet(); 133 ProcessCall call = new ProcessCall(exchange, callback); 134 try { 135 log.trace("Scheduling delayed task to run in {} millis for exchangeId: {}", 136 delay, exchange.getExchangeId()); 137 executorService.schedule(call, delay, TimeUnit.MILLISECONDS); 138 // tell Camel routing engine we continue routing asynchronous 139 return false; 140 } catch (RejectedExecutionException e) { 141 // we were not allowed to run the ProcessCall, so need to decrement the counter here 142 delayedCount.decrementAndGet(); 143 if (isCallerRunsWhenRejected()) { 144 if (!isRunAllowed()) { 145 exchange.setException(new RejectedExecutionException()); 146 } else { 147 log.debug("Scheduling rejected task, so letting caller run, delaying at first for {} millis for exchangeId: {}", delay, exchange.getExchangeId()); 148 // let caller run by processing 149 try { 150 delay(delay, exchange); 151 } catch (InterruptedException ie) { 152 exchange.setException(ie); 153 } 154 // then continue routing 155 return processor.process(exchange, callback); 156 } 157 } else { 158 exchange.setException(e); 159 } 160 // caller don't run the task so we are done 161 callback.done(true); 162 return true; 163 } 164 } 165 } 166 167 public boolean isAsyncDelayed() { 168 return asyncDelayed; 169 } 170 171 public void setAsyncDelayed(boolean asyncDelayed) { 172 this.asyncDelayed = asyncDelayed; 173 } 174 175 public boolean isCallerRunsWhenRejected() { 176 return callerRunsWhenRejected; 177 } 178 179 public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { 180 this.callerRunsWhenRejected = callerRunsWhenRejected; 181 } 182 183 protected abstract long calculateDelay(Exchange exchange); 184 185 /** 186 * Gets the current number of {@link Exchange}s being delayed (hold back due throttle limit hit) 187 */ 188 public int getDelayedCount() { 189 return delayedCount.get(); 190 } 191 192 /** 193 * Delays the given time before continuing. 194 * <p/> 195 * This implementation will block while waiting 196 * 197 * @param delay the delay time in millis 198 * @param exchange the exchange being processed 199 */ 200 protected void delay(long delay, Exchange exchange) throws InterruptedException { 201 // only run is we are started 202 if (!isRunAllowed()) { 203 return; 204 } 205 206 if (delay < 0) { 207 return; 208 } else { 209 try { 210 // keep track on delayer counter while we sleep 211 delayedCount.incrementAndGet(); 212 sleep(delay); 213 } catch (InterruptedException e) { 214 handleSleepInterruptedException(e, exchange); 215 } finally { 216 delayedCount.decrementAndGet(); 217 } 218 } 219 } 220 221 /** 222 * Called when a sleep is interrupted; allows derived classes to handle this case differently 223 */ 224 protected void handleSleepInterruptedException(InterruptedException e, Exchange exchange) throws InterruptedException { 225 if (log.isDebugEnabled()) { 226 log.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 227 } 228 Thread.currentThread().interrupt(); 229 throw e; 230 } 231 232 protected long currentSystemTime() { 233 return System.currentTimeMillis(); 234 } 235 236 private void sleep(long delay) throws InterruptedException { 237 if (delay <= 0) { 238 return; 239 } 240 log.trace("Sleeping for: {} millis", delay); 241 Thread.sleep(delay); 242 } 243 244 @Override 245 protected void doStart() throws Exception { 246 if (isAsyncDelayed()) { 247 ObjectHelper.notNull(executorService, "executorService", this); 248 } 249 super.doStart(); 250 } 251 252 @Override 253 protected void doShutdown() throws Exception { 254 if (shutdownExecutorService && executorService != null) { 255 camelContext.getExecutorServiceManager().shutdownNow(executorService); 256 } 257 super.doShutdown(); 258 } 259 }