001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.ExecutorService; 022 023 import javax.xml.bind.annotation.XmlAccessType; 024 import javax.xml.bind.annotation.XmlAccessorType; 025 import javax.xml.bind.annotation.XmlAttribute; 026 import javax.xml.bind.annotation.XmlRootElement; 027 import javax.xml.bind.annotation.XmlTransient; 028 029 import org.apache.camel.CamelContextAware; 030 import org.apache.camel.Expression; 031 import org.apache.camel.Processor; 032 import org.apache.camel.model.language.ExpressionDefinition; 033 import org.apache.camel.processor.EvaluateExpressionProcessor; 034 import org.apache.camel.processor.Pipeline; 035 import org.apache.camel.processor.RecipientList; 036 import org.apache.camel.processor.aggregate.AggregationStrategy; 037 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 038 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 039 import org.apache.camel.spi.RouteContext; 040 import org.apache.camel.util.CamelContextHelper; 041 042 /** 043 * Represents an XML <recipientList/> element 044 * 045 * @version 046 */ 047 @XmlRootElement(name = "recipientList") 048 @XmlAccessorType(XmlAccessType.FIELD) 049 public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> { 050 @XmlTransient 051 private AggregationStrategy aggregationStrategy; 052 @XmlTransient 053 private ExecutorService executorService; 054 @XmlAttribute 055 private String delimiter; 056 @XmlAttribute 057 private Boolean parallelProcessing; 058 @XmlAttribute 059 private String strategyRef; 060 @XmlAttribute 061 private String strategyMethodName; 062 @XmlAttribute 063 private Boolean strategyMethodAllowNull; 064 @XmlAttribute 065 private String executorServiceRef; 066 @XmlAttribute 067 private Boolean stopOnException; 068 @XmlAttribute 069 private Boolean ignoreInvalidEndpoints; 070 @XmlAttribute 071 private Boolean streaming; 072 @XmlAttribute 073 private Long timeout; 074 @XmlAttribute 075 private String onPrepareRef; 076 @XmlTransient 077 private Processor onPrepare; 078 @XmlAttribute 079 private Boolean shareUnitOfWork; 080 081 public RecipientListDefinition() { 082 } 083 084 public RecipientListDefinition(ExpressionDefinition expression) { 085 super(expression); 086 } 087 088 public RecipientListDefinition(Expression expression) { 089 super(expression); 090 } 091 092 @Override 093 public String toString() { 094 return "RecipientList[" + getExpression() + "]"; 095 } 096 097 @Override 098 public String getShortName() { 099 return "recipientList"; 100 } 101 102 @Override 103 public String getLabel() { 104 return "recipientList[" + getExpression() + "]"; 105 } 106 107 @Override 108 public Processor createProcessor(RouteContext routeContext) throws Exception { 109 final Expression expression = getExpression().createExpression(routeContext); 110 111 RecipientList answer; 112 if (delimiter != null) { 113 answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter); 114 } else { 115 answer = new RecipientList(routeContext.getCamelContext(), expression); 116 } 117 answer.setAggregationStrategy(createAggregationStrategy(routeContext)); 118 answer.setParallelProcessing(isParallelProcessing()); 119 answer.setStreaming(isStreaming()); 120 answer.setShareUnitOfWork(isShareUnitOfWork()); 121 if (onPrepareRef != null) { 122 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 123 } 124 if (onPrepare != null) { 125 answer.setOnPrepare(onPrepare); 126 } 127 if (stopOnException != null) { 128 answer.setStopOnException(isStopOnException()); 129 } 130 if (ignoreInvalidEndpoints != null) { 131 answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints); 132 } 133 if (getTimeout() != null) { 134 answer.setTimeout(getTimeout()); 135 } 136 137 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 138 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing()); 139 answer.setExecutorService(threadPool); 140 answer.setShutdownExecutorService(shutdownThreadPool); 141 long timeout = getTimeout() != null ? getTimeout() : 0; 142 if (timeout > 0 && !isParallelProcessing()) { 143 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 144 } 145 146 // create a pipeline with two processors 147 // the first is the eval processor which evaluates the expression to use 148 // the second is the recipient list 149 List<Processor> pipe = new ArrayList<Processor>(2); 150 151 // the eval processor must be wrapped in error handler, so in case there was an 152 // error during evaluation, the error handler can deal with it 153 // the recipient list is not in error handler, as its has its own special error handling 154 // when sending to the recipients individually 155 Processor evalProcessor = new EvaluateExpressionProcessor(expression); 156 evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor); 157 158 pipe.add(evalProcessor); 159 pipe.add(answer); 160 161 // wrap in nested pipeline so this appears as one processor 162 // (threads definition does this as well) 163 return new Pipeline(routeContext.getCamelContext(), pipe) { 164 @Override 165 public String toString() { 166 return "RecipientList[" + expression + "]"; 167 } 168 }; 169 } 170 171 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 172 AggregationStrategy strategy = getAggregationStrategy(); 173 if (strategy == null && strategyRef != null) { 174 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 175 if (aggStrategy instanceof AggregationStrategy) { 176 strategy = (AggregationStrategy) aggStrategy; 177 } else if (aggStrategy != null) { 178 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); 179 if (getStrategyMethodAllowNull() != null) { 180 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 181 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 182 } 183 strategy = adapter; 184 } else { 185 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 186 } 187 } 188 if (strategy == null) { 189 // fallback to use latest 190 strategy = new UseLatestAggregationStrategy(); 191 } 192 193 if (strategy instanceof CamelContextAware) { 194 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 195 } 196 197 return strategy; 198 } 199 200 // Fluent API 201 // ------------------------------------------------------------------------- 202 203 @Override 204 @SuppressWarnings("unchecked") 205 public Type end() { 206 // allow end() to return to previous type so you can continue in the DSL 207 return (Type) super.end(); 208 } 209 210 /** 211 * Set the delimiter 212 * 213 * @param delimiter the delimiter 214 * @return the builder 215 */ 216 public RecipientListDefinition<Type> delimiter(String delimiter) { 217 setDelimiter(delimiter); 218 return this; 219 } 220 221 /** 222 * Set the aggregationStrategy 223 * 224 * @param aggregationStrategy the strategy 225 * @return the builder 226 */ 227 public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) { 228 setAggregationStrategy(aggregationStrategy); 229 return this; 230 } 231 232 /** 233 * Set the aggregationStrategy 234 * 235 * @param aggregationStrategyRef a reference to a strategy to lookup 236 * @return the builder 237 */ 238 public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) { 239 setStrategyRef(aggregationStrategyRef); 240 return this; 241 } 242 243 /** 244 * Sets the method name to use when using a POJO as {@link AggregationStrategy}. 245 * 246 * @param methodName the method name to call 247 * @return the builder 248 */ 249 public RecipientListDefinition<Type> aggregationStrategyMethodName(String methodName) { 250 setStrategyMethodName(methodName); 251 return this; 252 } 253 254 /** 255 * Sets allowing null when using a POJO as {@link AggregationStrategy}. 256 * 257 * @return the builder 258 */ 259 public RecipientListDefinition<Type> aggregationStrategyMethodAllowNull() { 260 setStrategyMethodAllowNull(true); 261 return this; 262 } 263 264 /** 265 * Ignore the invalidate endpoint exception when try to create a producer with that endpoint 266 * 267 * @return the builder 268 */ 269 public RecipientListDefinition<Type> ignoreInvalidEndpoints() { 270 setIgnoreInvalidEndpoints(true); 271 return this; 272 } 273 274 /** 275 * Doing the recipient list work in parallel 276 * 277 * @return the builder 278 */ 279 public RecipientListDefinition<Type> parallelProcessing() { 280 setParallelProcessing(true); 281 return this; 282 } 283 284 /** 285 * Doing the recipient list work in streaming model 286 * 287 * @return the builder 288 */ 289 public RecipientListDefinition<Type> streaming() { 290 setStreaming(true); 291 return this; 292 } 293 294 /** 295 * Will now stop further processing if an exception or failure occurred during processing of an 296 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 297 * <p/> 298 * Will also stop if processing the exchange failed (has a fault message) or an exception 299 * was thrown and handled by the error handler (such as using onException). In all situations 300 * the recipient list will stop further processing. This is the same behavior as in pipeline, which 301 * is used by the routing engine. 302 * <p/> 303 * The default behavior is to <b>not</b> stop but continue processing till the end 304 * 305 * @return the builder 306 */ 307 public RecipientListDefinition<Type> stopOnException() { 308 setStopOnException(true); 309 return this; 310 } 311 312 public RecipientListDefinition<Type> executorService(ExecutorService executorService) { 313 setExecutorService(executorService); 314 return this; 315 } 316 317 public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) { 318 setExecutorServiceRef(executorServiceRef); 319 return this; 320 } 321 322 /** 323 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send. 324 * This can be used to deep-clone messages that should be send, or any custom logic needed before 325 * the exchange is send. 326 * 327 * @param onPrepare the processor 328 * @return the builder 329 */ 330 public RecipientListDefinition<Type> onPrepare(Processor onPrepare) { 331 setOnPrepare(onPrepare); 332 return this; 333 } 334 335 /** 336 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 337 * This can be used to deep-clone messages that should be send, or any custom logic needed before 338 * the exchange is send. 339 * 340 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 341 * @return the builder 342 */ 343 public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) { 344 setOnPrepareRef(onPrepareRef); 345 return this; 346 } 347 348 /** 349 * Sets a timeout value in millis to use when using parallelProcessing. 350 * 351 * @param timeout timeout in millis 352 * @return the builder 353 */ 354 public RecipientListDefinition<Type> timeout(long timeout) { 355 setTimeout(timeout); 356 return this; 357 } 358 359 /** 360 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 361 * 362 * @return the builder. 363 * @see org.apache.camel.spi.SubUnitOfWork 364 */ 365 public RecipientListDefinition<Type> shareUnitOfWork() { 366 setShareUnitOfWork(true); 367 return this; 368 } 369 370 // Properties 371 //------------------------------------------------------------------------- 372 373 public String getDelimiter() { 374 return delimiter; 375 } 376 377 public void setDelimiter(String delimiter) { 378 this.delimiter = delimiter; 379 } 380 381 public Boolean getParallelProcessing() { 382 return parallelProcessing; 383 } 384 385 public void setParallelProcessing(Boolean parallelProcessing) { 386 this.parallelProcessing = parallelProcessing; 387 } 388 389 public boolean isParallelProcessing() { 390 return parallelProcessing != null && parallelProcessing; 391 } 392 393 public String getStrategyRef() { 394 return strategyRef; 395 } 396 397 public void setStrategyRef(String strategyRef) { 398 this.strategyRef = strategyRef; 399 } 400 401 public String getStrategyMethodName() { 402 return strategyMethodName; 403 } 404 405 public void setStrategyMethodName(String strategyMethodName) { 406 this.strategyMethodName = strategyMethodName; 407 } 408 409 public Boolean getStrategyMethodAllowNull() { 410 return strategyMethodAllowNull; 411 } 412 413 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 414 this.strategyMethodAllowNull = strategyMethodAllowNull; 415 } 416 417 public String getExecutorServiceRef() { 418 return executorServiceRef; 419 } 420 421 public void setExecutorServiceRef(String executorServiceRef) { 422 this.executorServiceRef = executorServiceRef; 423 } 424 425 public Boolean getIgnoreInvalidEndpoints() { 426 return ignoreInvalidEndpoints; 427 } 428 429 public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) { 430 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 431 } 432 433 public boolean isIgnoreInvalidEndpoints() { 434 return ignoreInvalidEndpoints != null && ignoreInvalidEndpoints; 435 } 436 437 public Boolean getStopOnException() { 438 return stopOnException; 439 } 440 441 public void setStopOnException(Boolean stopOnException) { 442 this.stopOnException = stopOnException; 443 } 444 445 public boolean isStopOnException() { 446 return stopOnException != null && stopOnException; 447 } 448 449 public AggregationStrategy getAggregationStrategy() { 450 return aggregationStrategy; 451 } 452 453 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 454 this.aggregationStrategy = aggregationStrategy; 455 } 456 457 public ExecutorService getExecutorService() { 458 return executorService; 459 } 460 461 public void setExecutorService(ExecutorService executorService) { 462 this.executorService = executorService; 463 } 464 465 public Boolean getStreaming() { 466 return streaming; 467 } 468 469 public void setStreaming(Boolean streaming) { 470 this.streaming = streaming; 471 } 472 473 public boolean isStreaming() { 474 return streaming != null && streaming; 475 } 476 477 public Long getTimeout() { 478 return timeout; 479 } 480 481 public void setTimeout(Long timeout) { 482 this.timeout = timeout; 483 } 484 485 public String getOnPrepareRef() { 486 return onPrepareRef; 487 } 488 489 public void setOnPrepareRef(String onPrepareRef) { 490 this.onPrepareRef = onPrepareRef; 491 } 492 493 public Processor getOnPrepare() { 494 return onPrepare; 495 } 496 497 public void setOnPrepare(Processor onPrepare) { 498 this.onPrepare = onPrepare; 499 } 500 501 public Boolean getShareUnitOfWork() { 502 return shareUnitOfWork; 503 } 504 505 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 506 this.shareUnitOfWork = shareUnitOfWork; 507 } 508 509 public boolean isShareUnitOfWork() { 510 return shareUnitOfWork != null && shareUnitOfWork; 511 } 512 513 }