001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.ExecutorService; 022 import javax.xml.bind.annotation.XmlAccessType; 023 import javax.xml.bind.annotation.XmlAccessorType; 024 import javax.xml.bind.annotation.XmlAttribute; 025 import javax.xml.bind.annotation.XmlRootElement; 026 import javax.xml.bind.annotation.XmlTransient; 027 028 import org.apache.camel.CamelContextAware; 029 import org.apache.camel.Processor; 030 import org.apache.camel.processor.CamelInternalProcessor; 031 import org.apache.camel.processor.MulticastProcessor; 032 import org.apache.camel.processor.aggregate.AggregationStrategy; 033 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 034 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 035 import org.apache.camel.spi.RouteContext; 036 import org.apache.camel.util.CamelContextHelper; 037 038 /** 039 * Represents an XML <multicast/> element 040 * 041 * @version 042 */ 043 @XmlRootElement(name = "multicast") 044 @XmlAccessorType(XmlAccessType.FIELD) 045 public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> { 046 @XmlAttribute 047 private Boolean parallelProcessing; 048 @XmlAttribute 049 private String strategyRef; 050 @XmlAttribute 051 private String strategyMethodName; 052 @XmlAttribute 053 private Boolean strategyMethodAllowNull; 054 @XmlTransient 055 private ExecutorService executorService; 056 @XmlAttribute 057 private String executorServiceRef; 058 @XmlAttribute 059 private Boolean streaming; 060 @XmlAttribute 061 private Boolean stopOnException; 062 @XmlAttribute 063 private Long timeout; 064 @XmlTransient 065 private AggregationStrategy aggregationStrategy; 066 @XmlAttribute 067 private String onPrepareRef; 068 @XmlTransient 069 private Processor onPrepare; 070 @XmlAttribute 071 private Boolean shareUnitOfWork; 072 073 public MulticastDefinition() { 074 } 075 076 @Override 077 public String toString() { 078 return "Multicast[" + getOutputs() + "]"; 079 } 080 081 @Override 082 public String getLabel() { 083 return "multicast"; 084 } 085 086 @Override 087 public String getShortName() { 088 return "multicast"; 089 } 090 091 @Override 092 public Processor createProcessor(RouteContext routeContext) throws Exception { 093 Processor answer = this.createChildProcessor(routeContext, true); 094 095 // force the answer as a multicast processor even if there is only one child processor in the multicast 096 if (!(answer instanceof MulticastProcessor)) { 097 List<Processor> list = new ArrayList<Processor>(1); 098 list.add(answer); 099 answer = createCompositeProcessor(routeContext, list); 100 } 101 return answer; 102 } 103 104 // Fluent API 105 // ------------------------------------------------------------------------- 106 107 /** 108 * Set the multicasting aggregationStrategy 109 * 110 * @return the builder 111 */ 112 public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 113 setAggregationStrategy(aggregationStrategy); 114 return this; 115 } 116 117 /** 118 * Set the aggregationStrategy 119 * 120 * @param aggregationStrategyRef a reference to a strategy to lookup 121 * @return the builder 122 */ 123 public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) { 124 setStrategyRef(aggregationStrategyRef); 125 return this; 126 } 127 128 /** 129 * Sets the method name to use when using a POJO as {@link AggregationStrategy}. 130 * 131 * @param methodName the method name to call 132 * @return the builder 133 */ 134 public MulticastDefinition aggregationStrategyMethodName(String methodName) { 135 setStrategyMethodName(methodName); 136 return this; 137 } 138 139 /** 140 * Sets allowing null when using a POJO as {@link AggregationStrategy}. 141 * 142 * @return the builder 143 */ 144 public MulticastDefinition aggregationStrategyMethodAllowNull() { 145 setStrategyMethodAllowNull(true); 146 return this; 147 } 148 149 /** 150 * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work 151 * 152 * @return the builder 153 */ 154 public MulticastDefinition parallelProcessing() { 155 setParallelProcessing(true); 156 return this; 157 } 158 159 /** 160 * Aggregates the responses as the are done (e.g. out of order sequence) 161 * 162 * @return the builder 163 */ 164 public MulticastDefinition streaming() { 165 setStreaming(true); 166 return this; 167 } 168 169 /** 170 * Will now stop further processing if an exception or failure occurred during processing of an 171 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 172 * <p/> 173 * Will also stop if processing the exchange failed (has a fault message) or an exception 174 * was thrown and handled by the error handler (such as using onException). In all situations 175 * the multicast will stop further processing. This is the same behavior as in pipeline, which 176 * is used by the routing engine. 177 * <p/> 178 * The default behavior is to <b>not</b> stop but continue processing till the end 179 * 180 * @return the builder 181 */ 182 public MulticastDefinition stopOnException() { 183 setStopOnException(true); 184 return this; 185 } 186 187 public MulticastDefinition executorService(ExecutorService executorService) { 188 setExecutorService(executorService); 189 return this; 190 } 191 192 public MulticastDefinition executorServiceRef(String executorServiceRef) { 193 setExecutorServiceRef(executorServiceRef); 194 return this; 195 } 196 197 /** 198 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 199 * This can be used to deep-clone messages that should be send, or any custom logic needed before 200 * the exchange is send. 201 * 202 * @param onPrepare the processor 203 * @return the builder 204 */ 205 public MulticastDefinition onPrepare(Processor onPrepare) { 206 setOnPrepare(onPrepare); 207 return this; 208 } 209 210 /** 211 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 212 * This can be used to deep-clone messages that should be send, or any custom logic needed before 213 * the exchange is send. 214 * 215 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 216 * @return the builder 217 */ 218 public MulticastDefinition onPrepareRef(String onPrepareRef) { 219 setOnPrepareRef(onPrepareRef); 220 return this; 221 } 222 223 /** 224 * Sets a timeout value in millis to use when using parallelProcessing. 225 * 226 * @param timeout timeout in millis 227 * @return the builder 228 */ 229 public MulticastDefinition timeout(long timeout) { 230 setTimeout(timeout); 231 return this; 232 } 233 234 /** 235 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 236 * 237 * @return the builder. 238 * @see org.apache.camel.spi.SubUnitOfWork 239 */ 240 public MulticastDefinition shareUnitOfWork() { 241 setShareUnitOfWork(true); 242 return this; 243 } 244 245 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { 246 AggregationStrategy strategy = createAggregationStrategy(routeContext); 247 if (strategy == null) { 248 // default to use latest aggregation strategy 249 strategy = new UseLatestAggregationStrategy(); 250 } 251 252 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 253 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing()); 254 255 long timeout = getTimeout() != null ? getTimeout() : 0; 256 if (timeout > 0 && !isParallelProcessing()) { 257 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 258 } 259 if (onPrepareRef != null) { 260 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 261 } 262 263 MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing(), 264 threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork()); 265 if (isShareUnitOfWork()) { 266 // wrap answer in a sub unit of work, since we share the unit of work 267 CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer); 268 internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice()); 269 return internalProcessor; 270 } 271 return answer; 272 } 273 274 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 275 AggregationStrategy strategy = getAggregationStrategy(); 276 if (strategy == null && strategyRef != null) { 277 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 278 if (aggStrategy instanceof AggregationStrategy) { 279 strategy = (AggregationStrategy) aggStrategy; 280 } else if (aggStrategy != null) { 281 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); 282 if (getStrategyMethodAllowNull() != null) { 283 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 284 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 285 } 286 strategy = adapter; 287 } else { 288 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 289 } 290 } 291 292 if (strategy != null && strategy instanceof CamelContextAware) { 293 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 294 } 295 296 return strategy; 297 } 298 299 300 public AggregationStrategy getAggregationStrategy() { 301 return aggregationStrategy; 302 } 303 304 public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) { 305 this.aggregationStrategy = aggregationStrategy; 306 return this; 307 } 308 309 public Boolean getParallelProcessing() { 310 return parallelProcessing; 311 } 312 313 public void setParallelProcessing(Boolean parallelProcessing) { 314 this.parallelProcessing = parallelProcessing; 315 } 316 317 public boolean isParallelProcessing() { 318 return parallelProcessing != null && parallelProcessing; 319 } 320 321 public Boolean getStreaming() { 322 return streaming; 323 } 324 325 public void setStreaming(Boolean streaming) { 326 this.streaming = streaming; 327 } 328 329 public boolean isStreaming() { 330 return streaming != null && streaming; 331 } 332 333 public Boolean getStopOnException() { 334 return stopOnException; 335 } 336 337 public void setStopOnException(Boolean stopOnException) { 338 this.stopOnException = stopOnException; 339 } 340 341 public Boolean isStopOnException() { 342 return stopOnException != null && stopOnException; 343 } 344 345 public ExecutorService getExecutorService() { 346 return executorService; 347 } 348 349 public void setExecutorService(ExecutorService executorService) { 350 this.executorService = executorService; 351 } 352 353 public String getStrategyRef() { 354 return strategyRef; 355 } 356 357 public void setStrategyRef(String strategyRef) { 358 this.strategyRef = strategyRef; 359 } 360 361 public String getStrategyMethodName() { 362 return strategyMethodName; 363 } 364 365 public void setStrategyMethodName(String strategyMethodName) { 366 this.strategyMethodName = strategyMethodName; 367 } 368 369 public Boolean getStrategyMethodAllowNull() { 370 return strategyMethodAllowNull; 371 } 372 373 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 374 this.strategyMethodAllowNull = strategyMethodAllowNull; 375 } 376 377 public String getExecutorServiceRef() { 378 return executorServiceRef; 379 } 380 381 public void setExecutorServiceRef(String executorServiceRef) { 382 this.executorServiceRef = executorServiceRef; 383 } 384 385 public Long getTimeout() { 386 return timeout; 387 } 388 389 public void setTimeout(Long timeout) { 390 this.timeout = timeout; 391 } 392 393 public String getOnPrepareRef() { 394 return onPrepareRef; 395 } 396 397 public void setOnPrepareRef(String onPrepareRef) { 398 this.onPrepareRef = onPrepareRef; 399 } 400 401 public Processor getOnPrepare() { 402 return onPrepare; 403 } 404 405 public void setOnPrepare(Processor onPrepare) { 406 this.onPrepare = onPrepare; 407 } 408 409 public Boolean getShareUnitOfWork() { 410 return shareUnitOfWork; 411 } 412 413 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 414 this.shareUnitOfWork = shareUnitOfWork; 415 } 416 417 public boolean isShareUnitOfWork() { 418 return shareUnitOfWork != null && shareUnitOfWork; 419 } 420 421 }