001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlElement; 024 import javax.xml.bind.annotation.XmlElementRef; 025 import javax.xml.bind.annotation.XmlElements; 026 import javax.xml.bind.annotation.XmlRootElement; 027 import javax.xml.bind.annotation.XmlTransient; 028 029 import org.apache.camel.Expression; 030 import org.apache.camel.Processor; 031 import org.apache.camel.model.config.BatchResequencerConfig; 032 import org.apache.camel.model.config.ResequencerConfig; 033 import org.apache.camel.model.config.StreamResequencerConfig; 034 import org.apache.camel.model.language.ExpressionDefinition; 035 import org.apache.camel.processor.CamelInternalProcessor; 036 import org.apache.camel.processor.Resequencer; 037 import org.apache.camel.processor.StreamResequencer; 038 import org.apache.camel.processor.resequencer.ExpressionResultComparator; 039 import org.apache.camel.spi.Required; 040 import org.apache.camel.spi.RouteContext; 041 import org.apache.camel.util.CamelContextHelper; 042 import org.apache.camel.util.ObjectHelper; 043 044 /** 045 * Represents an XML <resequence/> element 046 * 047 * @version 048 */ 049 @XmlRootElement(name = "resequence") 050 @XmlAccessorType(XmlAccessType.FIELD) 051 public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> { 052 @XmlElements({ 053 @XmlElement(required = false, name = "batch-config", type = BatchResequencerConfig.class), 054 @XmlElement(required = false, name = "stream-config", type = StreamResequencerConfig.class)} 055 ) 056 private ResequencerConfig resequencerConfig; 057 @XmlTransient 058 private BatchResequencerConfig batchConfig; 059 @XmlTransient 060 private StreamResequencerConfig streamConfig; 061 @XmlElementRef 062 @Required 063 private ExpressionDefinition expression; 064 @XmlElementRef 065 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 066 067 public ResequenceDefinition() { 068 } 069 070 @Override 071 public String getShortName() { 072 return "resequence"; 073 } 074 075 public List<ProcessorDefinition<?>> getOutputs() { 076 return outputs; 077 } 078 079 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 080 this.outputs = outputs; 081 } 082 083 @Override 084 public boolean isOutputSupported() { 085 return true; 086 } 087 088 // Fluent API 089 // ------------------------------------------------------------------------- 090 /** 091 * Configures the stream-based resequencing algorithm using the default 092 * configuration. 093 * 094 * @return the builder 095 */ 096 public ResequenceDefinition stream() { 097 return stream(StreamResequencerConfig.getDefault()); 098 } 099 100 /** 101 * Configures the batch-based resequencing algorithm using the default 102 * configuration. 103 * 104 * @return the builder 105 */ 106 public ResequenceDefinition batch() { 107 return batch(BatchResequencerConfig.getDefault()); 108 } 109 110 /** 111 * Configures the stream-based resequencing algorithm using the given 112 * {@link StreamResequencerConfig}. 113 * 114 * @param config the config 115 * @return the builder 116 */ 117 public ResequenceDefinition stream(StreamResequencerConfig config) { 118 this.streamConfig = config; 119 this.batchConfig = null; 120 return this; 121 } 122 123 /** 124 * Configures the batch-based resequencing algorithm using the given 125 * {@link BatchResequencerConfig}. 126 * 127 * @param config the config 128 * @return the builder 129 */ 130 public ResequenceDefinition batch(BatchResequencerConfig config) { 131 this.batchConfig = config; 132 this.streamConfig = null; 133 return this; 134 } 135 136 /** 137 * Sets the timeout 138 * @param timeout timeout in millis 139 * @return the builder 140 */ 141 public ResequenceDefinition timeout(long timeout) { 142 if (streamConfig != null) { 143 streamConfig.setTimeout(timeout); 144 } else { 145 // initialize batch mode as its default mode 146 if (batchConfig == null) { 147 batch(); 148 } 149 batchConfig.setBatchTimeout(timeout); 150 } 151 return this; 152 } 153 154 /** 155 * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed 156 * @return the builder 157 */ 158 public ResequenceDefinition rejectOld() { 159 if (streamConfig == null) { 160 throw new IllegalStateException("rejectOld() only supported for stream resequencer"); 161 } 162 streamConfig.setRejectOld(true); 163 return this; 164 } 165 166 /** 167 * Sets the in batch size for number of exchanges received 168 * @param batchSize the batch size 169 * @return the builder 170 */ 171 public ResequenceDefinition size(int batchSize) { 172 if (streamConfig != null) { 173 throw new IllegalStateException("size() only supported for batch resequencer"); 174 } 175 // initialize batch mode as its default mode 176 if (batchConfig == null) { 177 batch(); 178 } 179 batchConfig.setBatchSize(batchSize); 180 return this; 181 } 182 183 /** 184 * Sets the capacity for the stream resequencer 185 * 186 * @param capacity the capacity 187 * @return the builder 188 */ 189 public ResequenceDefinition capacity(int capacity) { 190 if (streamConfig == null) { 191 throw new IllegalStateException("capacity() only supported for stream resequencer"); 192 } 193 streamConfig.setCapacity(capacity); 194 return this; 195 196 } 197 198 /** 199 * Enables duplicates for the batch resequencer mode 200 * @return the builder 201 */ 202 public ResequenceDefinition allowDuplicates() { 203 if (streamConfig != null) { 204 throw new IllegalStateException("allowDuplicates() only supported for batch resequencer"); 205 } 206 // initialize batch mode as its default mode 207 if (batchConfig == null) { 208 batch(); 209 } 210 batchConfig.setAllowDuplicates(true); 211 return this; 212 } 213 214 /** 215 * Enables reverse mode for the batch resequencer mode. 216 * <p/> 217 * This means the expression for determine the sequence order will be reversed. 218 * Can be used for Z..A or 9..0 ordering. 219 * 220 * @return the builder 221 */ 222 public ResequenceDefinition reverse() { 223 if (streamConfig != null) { 224 throw new IllegalStateException("reverse() only supported for batch resequencer"); 225 } 226 // initialize batch mode as its default mode 227 if (batchConfig == null) { 228 batch(); 229 } 230 batchConfig.setReverse(true); 231 return this; 232 } 233 234 /** 235 * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored. 236 * 237 * @return builder 238 */ 239 public ResequenceDefinition ignoreInvalidExchanges() { 240 if (streamConfig != null) { 241 streamConfig.setIgnoreInvalidExchanges(true); 242 } else { 243 // initialize batch mode as its default mode 244 if (batchConfig == null) { 245 batch(); 246 } 247 batchConfig.setIgnoreInvalidExchanges(true); 248 } 249 return this; 250 } 251 252 /** 253 * Sets the comparator to use for stream resequencer 254 * 255 * @param comparator the comparator 256 * @return the builder 257 */ 258 public ResequenceDefinition comparator(ExpressionResultComparator comparator) { 259 if (streamConfig == null) { 260 throw new IllegalStateException("comparator() only supported for stream resequencer"); 261 } 262 streamConfig.setComparator(comparator); 263 return this; 264 } 265 266 @Override 267 public String toString() { 268 return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]"; 269 } 270 271 @Override 272 public String getLabel() { 273 return "resequencer[" + (getExpression() != null ? getExpression().getLabel() : "") + "]"; 274 } 275 276 public ResequencerConfig getResequencerConfig() { 277 return resequencerConfig; 278 } 279 280 public void setResequencerConfig(ResequencerConfig resequencerConfig) { 281 this.resequencerConfig = resequencerConfig; 282 } 283 284 public BatchResequencerConfig getBatchConfig() { 285 if (batchConfig == null && resequencerConfig != null && resequencerConfig instanceof BatchResequencerConfig) { 286 return (BatchResequencerConfig) resequencerConfig; 287 } 288 return batchConfig; 289 } 290 291 public StreamResequencerConfig getStreamConfig() { 292 if (streamConfig == null && resequencerConfig != null && resequencerConfig instanceof StreamResequencerConfig) { 293 return (StreamResequencerConfig) resequencerConfig; 294 } 295 return streamConfig; 296 } 297 298 public void setBatchConfig(BatchResequencerConfig batchConfig) { 299 this.batchConfig = batchConfig; 300 } 301 302 public void setStreamConfig(StreamResequencerConfig streamConfig) { 303 this.streamConfig = streamConfig; 304 } 305 306 public ExpressionDefinition getExpression() { 307 return expression; 308 } 309 310 public void setExpression(ExpressionDefinition expression) { 311 this.expression = expression; 312 } 313 314 @Override 315 public Processor createProcessor(RouteContext routeContext) throws Exception { 316 // if configured from XML then streamConfig has been set with the configuration 317 if (resequencerConfig != null) { 318 if (resequencerConfig instanceof StreamResequencerConfig) { 319 streamConfig = (StreamResequencerConfig) resequencerConfig; 320 } else { 321 batchConfig = (BatchResequencerConfig) resequencerConfig; 322 } 323 } 324 325 if (streamConfig != null) { 326 return createStreamResequencer(routeContext, streamConfig); 327 } else { 328 if (batchConfig == null) { 329 // default as batch mode 330 batch(); 331 } 332 return createBatchResequencer(routeContext, batchConfig); 333 } 334 } 335 336 /** 337 * Creates a batch {@link Resequencer} instance applying the given <code>config</code>. 338 * 339 * @param routeContext route context. 340 * @param config batch resequencer configuration. 341 * @return the configured batch resequencer. 342 * @throws Exception can be thrown 343 */ 344 @SuppressWarnings("deprecation") 345 protected Resequencer createBatchResequencer(RouteContext routeContext, 346 BatchResequencerConfig config) throws Exception { 347 Processor processor = this.createChildProcessor(routeContext, true); 348 Expression expression = getExpression().createExpression(routeContext); 349 350 // and wrap in unit of work 351 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 352 CamelInternalProcessor internal = new CamelInternalProcessor(processor); 353 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId)); 354 internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext)); 355 356 ObjectHelper.notNull(config, "config", this); 357 ObjectHelper.notNull(expression, "expression", this); 358 359 Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression, 360 config.isAllowDuplicates(), config.isReverse()); 361 resequencer.setBatchSize(config.getBatchSize()); 362 resequencer.setBatchTimeout(config.getBatchTimeout()); 363 if (config.getIgnoreInvalidExchanges() != null) { 364 resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); 365 } 366 return resequencer; 367 } 368 369 /** 370 * Creates a {@link StreamResequencer} instance applying the given <code>config</code>. 371 * 372 * @param routeContext route context. 373 * @param config stream resequencer configuration. 374 * @return the configured stream resequencer. 375 * @throws Exception can be thrwon 376 */ 377 protected StreamResequencer createStreamResequencer(RouteContext routeContext, 378 StreamResequencerConfig config) throws Exception { 379 Processor processor = this.createChildProcessor(routeContext, true); 380 Expression expression = getExpression().createExpression(routeContext); 381 382 // and wrap in unit of work 383 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 384 CamelInternalProcessor internal = new CamelInternalProcessor(processor); 385 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId)); 386 internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext)); 387 388 ObjectHelper.notNull(config, "config", this); 389 ObjectHelper.notNull(expression, "expression", this); 390 391 ExpressionResultComparator comparator; 392 if (config.getComparatorRef() != null) { 393 comparator = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), config.getComparatorRef(), ExpressionResultComparator.class); 394 } else { 395 comparator = config.getComparator(); 396 } 397 comparator.setExpression(expression); 398 399 StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator); 400 resequencer.setTimeout(config.getTimeout()); 401 resequencer.setCapacity(config.getCapacity()); 402 resequencer.setRejectOld(config.getRejectOld()); 403 if (config.getIgnoreInvalidExchanges() != null) { 404 resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges()); 405 } 406 return resequencer; 407 } 408 409 }