001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.Collections; 020 import java.util.Iterator; 021 import java.util.concurrent.ExecutorService; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.Expression; 029 import org.apache.camel.Processor; 030 import org.apache.camel.impl.ProducerCache; 031 import org.apache.camel.processor.aggregate.AggregationStrategy; 032 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 033 import org.apache.camel.support.ServiceSupport; 034 import org.apache.camel.util.AsyncProcessorHelper; 035 import org.apache.camel.util.ExchangeHelper; 036 import org.apache.camel.util.ObjectHelper; 037 import org.apache.camel.util.ServiceHelper; 038 039 import static org.apache.camel.util.ObjectHelper.notNull; 040 041 /** 042 * Implements a dynamic <a 043 * href="http://camel.apache.org/recipient-list.html">Recipient List</a> 044 * pattern where the list of actual endpoints to send a message exchange to are 045 * dependent on some dynamic expression. 046 * 047 * @version 048 */ 049 public class RecipientList extends ServiceSupport implements AsyncProcessor { 050 private static final String IGNORE_DELIMITER_MARKER = "false"; 051 private final CamelContext camelContext; 052 private ProducerCache producerCache; 053 private Expression expression; 054 private final String delimiter; 055 private boolean parallelProcessing; 056 private boolean stopOnException; 057 private boolean ignoreInvalidEndpoints; 058 private boolean streaming; 059 private long timeout; 060 private Processor onPrepare; 061 private boolean shareUnitOfWork; 062 private ExecutorService executorService; 063 private boolean shutdownExecutorService; 064 private ExecutorService aggregateExecutorService; 065 private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy(); 066 067 public RecipientList(CamelContext camelContext) { 068 // use comma by default as delimiter 069 this(camelContext, ","); 070 } 071 072 public RecipientList(CamelContext camelContext, String delimiter) { 073 notNull(camelContext, "camelContext"); 074 ObjectHelper.notEmpty(delimiter, "delimiter"); 075 this.camelContext = camelContext; 076 this.delimiter = delimiter; 077 } 078 079 public RecipientList(CamelContext camelContext, Expression expression) { 080 // use comma by default as delimiter 081 this(camelContext, expression, ","); 082 } 083 084 public RecipientList(CamelContext camelContext, Expression expression, String delimiter) { 085 notNull(camelContext, "camelContext"); 086 ObjectHelper.notNull(expression, "expression"); 087 ObjectHelper.notEmpty(delimiter, "delimiter"); 088 this.camelContext = camelContext; 089 this.expression = expression; 090 this.delimiter = delimiter; 091 } 092 093 @Override 094 public String toString() { 095 return "RecipientList[" + (expression != null ? expression : "") + "]"; 096 } 097 098 public void process(Exchange exchange) throws Exception { 099 AsyncProcessorHelper.process(this, exchange); 100 } 101 102 public boolean process(Exchange exchange, AsyncCallback callback) { 103 if (!isStarted()) { 104 throw new IllegalStateException("RecipientList has not been started: " + this); 105 } 106 107 // use the evaluate expression result if exists 108 Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT); 109 if (recipientList == null && expression != null) { 110 // fallback and evaluate the expression 111 recipientList = expression.evaluate(exchange, Object.class); 112 } 113 114 return sendToRecipientList(exchange, recipientList, callback); 115 } 116 117 /** 118 * Sends the given exchange to the recipient list 119 */ 120 public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) { 121 Iterator<Object> iter; 122 123 if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) { 124 iter = ObjectHelper.createIterator(recipientList, null); 125 } else { 126 iter = ObjectHelper.createIterator(recipientList, delimiter); 127 } 128 129 RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(), 130 isParallelProcessing(), getExecutorService(), isShutdownExecutorService(), 131 isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) { 132 @Override 133 protected synchronized ExecutorService createAggregateExecutorService(String name) { 134 // use a shared executor service to avoid creating new thread pools 135 if (aggregateExecutorService == null) { 136 aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask"); 137 } 138 return aggregateExecutorService; 139 } 140 }; 141 rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints()); 142 143 // start the service 144 try { 145 ServiceHelper.startService(rlp); 146 } catch (Exception e) { 147 exchange.setException(e); 148 callback.done(true); 149 return true; 150 } 151 152 AsyncProcessor target = rlp; 153 if (isShareUnitOfWork()) { 154 // wrap answer in a sub unit of work, since we share the unit of work 155 CamelInternalProcessor internalProcessor = new CamelInternalProcessor(rlp); 156 internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice()); 157 target = internalProcessor; 158 } 159 160 // now let the multicast process the exchange 161 return target.process(exchange, callback); 162 } 163 164 protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { 165 // trim strings as end users might have added spaces between separators 166 if (recipient instanceof String) { 167 recipient = ((String)recipient).trim(); 168 } 169 return ExchangeHelper.resolveEndpoint(exchange, recipient); 170 } 171 172 protected void doStart() throws Exception { 173 if (producerCache == null) { 174 producerCache = new ProducerCache(this, camelContext); 175 } 176 ServiceHelper.startServices(aggregationStrategy, producerCache); 177 } 178 179 protected void doStop() throws Exception { 180 ServiceHelper.stopServices(producerCache, aggregationStrategy); 181 } 182 183 protected void doShutdown() throws Exception { 184 ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy); 185 186 if (shutdownExecutorService && executorService != null) { 187 camelContext.getExecutorServiceManager().shutdownNow(executorService); 188 } 189 } 190 191 public boolean isStreaming() { 192 return streaming; 193 } 194 195 public void setStreaming(boolean streaming) { 196 this.streaming = streaming; 197 } 198 199 public boolean isIgnoreInvalidEndpoints() { 200 return ignoreInvalidEndpoints; 201 } 202 203 public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) { 204 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 205 } 206 207 public boolean isParallelProcessing() { 208 return parallelProcessing; 209 } 210 211 public void setParallelProcessing(boolean parallelProcessing) { 212 this.parallelProcessing = parallelProcessing; 213 } 214 215 public boolean isStopOnException() { 216 return stopOnException; 217 } 218 219 public void setStopOnException(boolean stopOnException) { 220 this.stopOnException = stopOnException; 221 } 222 223 public ExecutorService getExecutorService() { 224 return executorService; 225 } 226 227 public void setExecutorService(ExecutorService executorService) { 228 this.executorService = executorService; 229 } 230 231 public boolean isShutdownExecutorService() { 232 return shutdownExecutorService; 233 } 234 235 public void setShutdownExecutorService(boolean shutdownExecutorService) { 236 this.shutdownExecutorService = shutdownExecutorService; 237 } 238 239 public AggregationStrategy getAggregationStrategy() { 240 return aggregationStrategy; 241 } 242 243 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 244 this.aggregationStrategy = aggregationStrategy; 245 } 246 247 public long getTimeout() { 248 return timeout; 249 } 250 251 public void setTimeout(long timeout) { 252 this.timeout = timeout; 253 } 254 255 public Processor getOnPrepare() { 256 return onPrepare; 257 } 258 259 public void setOnPrepare(Processor onPrepare) { 260 this.onPrepare = onPrepare; 261 } 262 263 public boolean isShareUnitOfWork() { 264 return shareUnitOfWork; 265 } 266 267 public void setShareUnitOfWork(boolean shareUnitOfWork) { 268 this.shareUnitOfWork = shareUnitOfWork; 269 } 270 }