001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.concurrent.ExecutorService; 020 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlAttribute; 024 import javax.xml.bind.annotation.XmlRootElement; 025 import javax.xml.bind.annotation.XmlTransient; 026 027 import org.apache.camel.CamelContextAware; 028 import org.apache.camel.Expression; 029 import org.apache.camel.Processor; 030 import org.apache.camel.model.language.ExpressionDefinition; 031 import org.apache.camel.processor.CamelInternalProcessor; 032 import org.apache.camel.processor.Splitter; 033 import org.apache.camel.processor.aggregate.AggregationStrategy; 034 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 035 import org.apache.camel.spi.RouteContext; 036 import org.apache.camel.util.CamelContextHelper; 037 038 /** 039 * Represents an XML <split/> element 040 * 041 * @version 042 */ 043 @XmlRootElement(name = "split") 044 @XmlAccessorType(XmlAccessType.FIELD) 045 public class SplitDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<SplitDefinition> { 046 @XmlTransient 047 private AggregationStrategy aggregationStrategy; 048 @XmlTransient 049 private ExecutorService executorService; 050 @XmlAttribute 051 private Boolean parallelProcessing; 052 @XmlAttribute 053 private String strategyRef; 054 @XmlAttribute 055 private String strategyMethodName; 056 @XmlAttribute 057 private Boolean strategyMethodAllowNull; 058 @XmlAttribute 059 private String executorServiceRef; 060 @XmlAttribute 061 private Boolean streaming; 062 @XmlAttribute 063 private Boolean stopOnException; 064 @XmlAttribute 065 private Long timeout; 066 @XmlAttribute 067 private String onPrepareRef; 068 @XmlTransient 069 private Processor onPrepare; 070 @XmlAttribute 071 private Boolean shareUnitOfWork; 072 073 public SplitDefinition() { 074 } 075 076 public SplitDefinition(Expression expression) { 077 super(expression); 078 } 079 080 public SplitDefinition(ExpressionDefinition expression) { 081 super(expression); 082 } 083 084 @Override 085 public String toString() { 086 return "Split[" + getExpression() + " -> " + getOutputs() + "]"; 087 } 088 089 @Override 090 public String getShortName() { 091 return "split"; 092 } 093 094 @Override 095 public String getLabel() { 096 return "split[" + getExpression() + "]"; 097 } 098 099 @Override 100 public Processor createProcessor(RouteContext routeContext) throws Exception { 101 Processor childProcessor = this.createChildProcessor(routeContext, true); 102 aggregationStrategy = createAggregationStrategy(routeContext); 103 104 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 105 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing()); 106 107 long timeout = getTimeout() != null ? getTimeout() : 0; 108 if (timeout > 0 && !isParallelProcessing()) { 109 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 110 } 111 if (onPrepareRef != null) { 112 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 113 } 114 115 Expression exp = getExpression().createExpression(routeContext); 116 117 Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy, 118 isParallelProcessing(), threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), 119 timeout, onPrepare, isShareUnitOfWork()); 120 if (isShareUnitOfWork()) { 121 // wrap answer in a sub unit of work, since we share the unit of work 122 CamelInternalProcessor internalProcessor = new CamelInternalProcessor(answer); 123 internalProcessor.addAdvice(new CamelInternalProcessor.SubUnitOfWorkProcessorAdvice()); 124 return internalProcessor; 125 } 126 return answer; 127 } 128 129 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 130 AggregationStrategy strategy = getAggregationStrategy(); 131 if (strategy == null && strategyRef != null) { 132 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 133 if (aggStrategy instanceof AggregationStrategy) { 134 strategy = (AggregationStrategy) aggStrategy; 135 } else if (aggStrategy != null) { 136 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); 137 if (getStrategyMethodAllowNull() != null) { 138 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 139 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 140 } 141 strategy = adapter; 142 } else { 143 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 144 } 145 } 146 147 if (strategy != null && strategy instanceof CamelContextAware) { 148 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 149 } 150 151 return strategy; 152 } 153 154 // Fluent API 155 // ------------------------------------------------------------------------- 156 157 /** 158 * Set the aggregationStrategy 159 * 160 * @return the builder 161 */ 162 public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 163 setAggregationStrategy(aggregationStrategy); 164 return this; 165 } 166 167 /** 168 * Set the aggregationStrategy 169 * 170 * @param aggregationStrategyRef a reference to a strategy to lookup 171 * @return the builder 172 */ 173 public SplitDefinition aggregationStrategyRef(String aggregationStrategyRef) { 174 setStrategyRef(aggregationStrategyRef); 175 return this; 176 } 177 178 /** 179 * Sets the method name to use when using a POJO as {@link AggregationStrategy}. 180 * 181 * @param methodName the method name to call 182 * @return the builder 183 */ 184 public SplitDefinition aggregationStrategyMethodName(String methodName) { 185 setStrategyMethodName(methodName); 186 return this; 187 } 188 189 /** 190 * Sets allowing null when using a POJO as {@link AggregationStrategy}. 191 * 192 * @return the builder 193 */ 194 public SplitDefinition aggregationStrategyMethodAllowNull() { 195 setStrategyMethodAllowNull(true); 196 return this; 197 } 198 199 /** 200 * Doing the splitting work in parallel 201 * 202 * @return the builder 203 */ 204 public SplitDefinition parallelProcessing() { 205 setParallelProcessing(true); 206 return this; 207 } 208 209 /** 210 * Enables streaming. 211 * See {@link org.apache.camel.model.SplitDefinition#isStreaming()} for more information 212 * 213 * @return the builder 214 */ 215 public SplitDefinition streaming() { 216 setStreaming(true); 217 return this; 218 } 219 220 /** 221 * Will now stop further processing if an exception or failure occurred during processing of an 222 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 223 * <p/> 224 * Will also stop if processing the exchange failed (has a fault message) or an exception 225 * was thrown and handled by the error handler (such as using onException). In all situations 226 * the splitter will stop further processing. This is the same behavior as in pipeline, which 227 * is used by the routing engine. 228 * <p/> 229 * The default behavior is to <b>not</b> stop but continue processing till the end 230 * 231 * @return the builder 232 */ 233 public SplitDefinition stopOnException() { 234 setStopOnException(true); 235 return this; 236 } 237 238 public SplitDefinition executorService(ExecutorService executorService) { 239 setExecutorService(executorService); 240 return this; 241 } 242 243 public SplitDefinition executorServiceRef(String executorServiceRef) { 244 setExecutorServiceRef(executorServiceRef); 245 return this; 246 } 247 248 /** 249 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 250 * This can be used to deep-clone messages that should be send, or any custom logic needed before 251 * the exchange is send. 252 * 253 * @param onPrepare the processor 254 * @return the builder 255 */ 256 public SplitDefinition onPrepare(Processor onPrepare) { 257 setOnPrepare(onPrepare); 258 return this; 259 } 260 261 /** 262 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 263 * This can be used to deep-clone messages that should be send, or any custom logic needed before 264 * the exchange is send. 265 * 266 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 267 * @return the builder 268 */ 269 public SplitDefinition onPrepareRef(String onPrepareRef) { 270 setOnPrepareRef(onPrepareRef); 271 return this; 272 } 273 274 /** 275 * Sets a timeout value in millis to use when using parallelProcessing. 276 * 277 * @param timeout timeout in millis 278 * @return the builder 279 */ 280 public SplitDefinition timeout(long timeout) { 281 setTimeout(timeout); 282 return this; 283 } 284 285 /** 286 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 287 * 288 * @return the builder. 289 * @see org.apache.camel.spi.SubUnitOfWork 290 */ 291 public SplitDefinition shareUnitOfWork() { 292 setShareUnitOfWork(true); 293 return this; 294 } 295 296 // Properties 297 //------------------------------------------------------------------------- 298 299 public AggregationStrategy getAggregationStrategy() { 300 return aggregationStrategy; 301 } 302 303 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 304 this.aggregationStrategy = aggregationStrategy; 305 } 306 307 public Boolean getParallelProcessing() { 308 return parallelProcessing; 309 } 310 311 public void setParallelProcessing(Boolean parallelProcessing) { 312 this.parallelProcessing = parallelProcessing; 313 } 314 315 public boolean isParallelProcessing() { 316 return parallelProcessing != null && parallelProcessing; 317 } 318 319 public Boolean getStreaming() { 320 return streaming; 321 } 322 323 public void setStreaming(Boolean streaming) { 324 this.streaming = streaming; 325 } 326 327 /** 328 * The splitter should use streaming -- exchanges are being sent as the data for them becomes available. 329 * This improves throughput and memory usage, but it has a drawback: 330 * - the sent exchanges will no longer contain the {@link org.apache.camel.Exchange#SPLIT_SIZE} header property 331 * 332 * @return whether or not streaming should be used 333 */ 334 public boolean isStreaming() { 335 return streaming != null && streaming; 336 } 337 338 public Boolean getStopOnException() { 339 return stopOnException; 340 } 341 342 public void setStopOnException(Boolean stopOnException) { 343 this.stopOnException = stopOnException; 344 } 345 346 public Boolean isStopOnException() { 347 return stopOnException != null && stopOnException; 348 } 349 350 public ExecutorService getExecutorService() { 351 return executorService; 352 } 353 354 public void setExecutorService(ExecutorService executorService) { 355 this.executorService = executorService; 356 } 357 358 public String getStrategyRef() { 359 return strategyRef; 360 } 361 362 public void setStrategyRef(String strategyRef) { 363 this.strategyRef = strategyRef; 364 } 365 366 public String getStrategyMethodName() { 367 return strategyMethodName; 368 } 369 370 public void setStrategyMethodName(String strategyMethodName) { 371 this.strategyMethodName = strategyMethodName; 372 } 373 374 public Boolean getStrategyMethodAllowNull() { 375 return strategyMethodAllowNull; 376 } 377 378 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 379 this.strategyMethodAllowNull = strategyMethodAllowNull; 380 } 381 382 public String getExecutorServiceRef() { 383 return executorServiceRef; 384 } 385 386 public void setExecutorServiceRef(String executorServiceRef) { 387 this.executorServiceRef = executorServiceRef; 388 } 389 390 public Long getTimeout() { 391 return timeout; 392 } 393 394 public void setTimeout(Long timeout) { 395 this.timeout = timeout; 396 } 397 398 public String getOnPrepareRef() { 399 return onPrepareRef; 400 } 401 402 public void setOnPrepareRef(String onPrepareRef) { 403 this.onPrepareRef = onPrepareRef; 404 } 405 406 public Processor getOnPrepare() { 407 return onPrepare; 408 } 409 410 public void setOnPrepare(Processor onPrepare) { 411 this.onPrepare = onPrepare; 412 } 413 414 public Boolean getShareUnitOfWork() { 415 return shareUnitOfWork; 416 } 417 418 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 419 this.shareUnitOfWork = shareUnitOfWork; 420 } 421 422 public boolean isShareUnitOfWork() { 423 return shareUnitOfWork != null && shareUnitOfWork; 424 } 425 }