001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.ExecutorService; 020 import java.util.concurrent.RejectedExecutionException; 021 import java.util.concurrent.atomic.AtomicBoolean; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Rejectable; 028 import org.apache.camel.ThreadPoolRejectedPolicy; 029 import org.apache.camel.support.ServiceSupport; 030 import org.apache.camel.util.AsyncProcessorHelper; 031 import org.apache.camel.util.ObjectHelper; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 /** 036 * Threads processor that leverage a thread pool for continue processing the {@link Exchange}s 037 * using the asynchronous routing engine. 038 * <p/> 039 * <b>Notice:</b> For transacted routes then this {@link ThreadsProcessor} is not in use, as we want to 040 * process messages using the same thread to support all work done in the same transaction. The reason 041 * is that the transaction manager that orchestrate the transaction, requires all the work to be done 042 * on the same thread. 043 * <p/> 044 * Pay attention to how this processor handles rejected tasks. 045 * <ul> 046 * <li>Abort - The current exchange will be set with a {@link RejectedExecutionException} exception, 047 * and marked to stop continue routing. 048 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>failed</b>, due the exception.</li> 049 * <li>Discard - The current exchange will be marked to stop continue routing (notice no exception is set). 050 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set.</li> 051 * <li>DiscardOldest - The oldest exchange will be marked to stop continue routing (notice no exception is set). 052 * The {@link org.apache.camel.spi.UnitOfWork} will be regarded as <b>successful</b>, due no exception being set. 053 * And the current exchange will be added to the task queue.</li> 054 * <li>CallerRuns - The current exchange will be processed by the current thread. Which mean the current thread 055 * will not be free to process a new exchange, as its processing the current exchange.</li> 056 * </ul> 057 */ 058 public class ThreadsProcessor extends ServiceSupport implements AsyncProcessor { 059 060 private static final Logger LOG = LoggerFactory.getLogger(ThreadsProcessor.class); 061 private final CamelContext camelContext; 062 private final ExecutorService executorService; 063 private volatile boolean shutdownExecutorService; 064 private final AtomicBoolean shutdown = new AtomicBoolean(true); 065 private boolean callerRunsWhenRejected = true; 066 private ThreadPoolRejectedPolicy rejectedPolicy; 067 068 private final class ProcessCall implements Runnable, Rejectable { 069 private final Exchange exchange; 070 private final AsyncCallback callback; 071 072 public ProcessCall(Exchange exchange, AsyncCallback callback) { 073 this.exchange = exchange; 074 this.callback = callback; 075 } 076 077 @Override 078 public void run() { 079 LOG.trace("Continue routing exchange {} ", exchange); 080 if (shutdown.get()) { 081 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); 082 } 083 callback.done(false); 084 } 085 086 @Override 087 public void reject() { 088 // abort should mark the exchange with an rejected exception 089 boolean abort = ThreadPoolRejectedPolicy.Abort == rejectedPolicy; 090 if (abort) { 091 exchange.setException(new RejectedExecutionException()); 092 } 093 094 LOG.trace("{} routing exchange {} ", abort ? "Aborted" : "Rejected", exchange); 095 // we should not continue routing, and no redelivery should be performed 096 exchange.setProperty(Exchange.ROUTE_STOP, true); 097 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, true); 098 099 if (shutdown.get()) { 100 exchange.setException(new RejectedExecutionException("ThreadsProcessor is not running.")); 101 } 102 callback.done(false); 103 } 104 105 @Override 106 public String toString() { 107 return "ProcessCall[" + exchange + "]"; 108 } 109 } 110 111 public ThreadsProcessor(CamelContext camelContext, ExecutorService executorService, boolean shutdownExecutorService) { 112 ObjectHelper.notNull(camelContext, "camelContext"); 113 ObjectHelper.notNull(executorService, "executorService"); 114 this.camelContext = camelContext; 115 this.executorService = executorService; 116 this.shutdownExecutorService = shutdownExecutorService; 117 } 118 119 public void process(final Exchange exchange) throws Exception { 120 AsyncProcessorHelper.process(this, exchange); 121 } 122 123 public boolean process(Exchange exchange, AsyncCallback callback) { 124 if (shutdown.get()) { 125 throw new IllegalStateException("ThreadsProcessor is not running."); 126 } 127 128 // we cannot execute this asynchronously for transacted exchanges, as the transaction manager doesn't support 129 // using different threads in the same transaction 130 if (exchange.isTransacted()) { 131 LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 132 callback.done(true); 133 return true; 134 } 135 136 ProcessCall call = new ProcessCall(exchange, callback); 137 try { 138 LOG.trace("Submitting task {}", call); 139 executorService.submit(call); 140 // tell Camel routing engine we continue routing asynchronous 141 return false; 142 } catch (RejectedExecutionException e) { 143 boolean callerRuns = isCallerRunsWhenRejected(); 144 if (!callerRuns) { 145 exchange.setException(e); 146 } 147 148 LOG.trace("{} executing task {}", callerRuns ? "CallerRuns" : "Aborted", call); 149 if (shutdown.get()) { 150 exchange.setException(new RejectedExecutionException()); 151 } 152 callback.done(true); 153 return true; 154 } 155 } 156 157 public boolean isCallerRunsWhenRejected() { 158 return callerRunsWhenRejected; 159 } 160 161 public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { 162 this.callerRunsWhenRejected = callerRunsWhenRejected; 163 } 164 165 public ThreadPoolRejectedPolicy getRejectedPolicy() { 166 return rejectedPolicy; 167 } 168 169 public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 170 this.rejectedPolicy = rejectedPolicy; 171 } 172 173 public String toString() { 174 return "Threads"; 175 } 176 177 protected void doStart() throws Exception { 178 shutdown.set(false); 179 } 180 181 protected void doStop() throws Exception { 182 shutdown.set(true); 183 } 184 185 protected void doShutdown() throws Exception { 186 if (shutdownExecutorService) { 187 camelContext.getExecutorServiceManager().shutdownNow(executorService); 188 } 189 super.doShutdown(); 190 } 191 192 }