001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlElement;
024    import javax.xml.bind.annotation.XmlElementRef;
025    import javax.xml.bind.annotation.XmlElements;
026    import javax.xml.bind.annotation.XmlRootElement;
027    import javax.xml.bind.annotation.XmlTransient;
028    
029    import org.apache.camel.Expression;
030    import org.apache.camel.Processor;
031    import org.apache.camel.model.config.BatchResequencerConfig;
032    import org.apache.camel.model.config.ResequencerConfig;
033    import org.apache.camel.model.config.StreamResequencerConfig;
034    import org.apache.camel.model.language.ExpressionDefinition;
035    import org.apache.camel.processor.CamelInternalProcessor;
036    import org.apache.camel.processor.Resequencer;
037    import org.apache.camel.processor.StreamResequencer;
038    import org.apache.camel.processor.resequencer.ExpressionResultComparator;
039    import org.apache.camel.spi.Required;
040    import org.apache.camel.spi.RouteContext;
041    import org.apache.camel.util.CamelContextHelper;
042    import org.apache.camel.util.ObjectHelper;
043    
044    /**
045     * Represents an XML <resequence/> element
046     *
047     * @version 
048     */
049    @XmlRootElement(name = "resequence")
050    @XmlAccessorType(XmlAccessType.FIELD)
051    public class ResequenceDefinition extends ProcessorDefinition<ResequenceDefinition> {
052        @XmlElements({
053        @XmlElement(required = false, name = "batch-config", type = BatchResequencerConfig.class),
054        @XmlElement(required = false, name = "stream-config", type = StreamResequencerConfig.class)}
055        )
056        private ResequencerConfig resequencerConfig;
057        @XmlTransient
058        private BatchResequencerConfig batchConfig;
059        @XmlTransient
060        private StreamResequencerConfig streamConfig;
061        @XmlElementRef
062        @Required
063        private ExpressionDefinition expression;
064        @XmlElementRef
065        private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
066    
067        public ResequenceDefinition() {
068        }
069    
070        @Override
071        public String getShortName() {
072            return "resequence";
073        }
074    
075        public List<ProcessorDefinition<?>> getOutputs() {
076            return outputs;
077        }
078    
079        public void setOutputs(List<ProcessorDefinition<?>> outputs) {
080            this.outputs = outputs;
081        }
082    
083        @Override
084        public boolean isOutputSupported() {
085            return true;
086        }
087    
088        // Fluent API
089        // -------------------------------------------------------------------------
090        /**
091         * Configures the stream-based resequencing algorithm using the default
092         * configuration.
093         *
094         * @return the builder
095         */
096        public ResequenceDefinition stream() {
097            return stream(StreamResequencerConfig.getDefault());
098        }
099    
100        /**
101         * Configures the batch-based resequencing algorithm using the default
102         * configuration.
103         *
104         * @return the builder
105         */
106        public ResequenceDefinition batch() {
107            return batch(BatchResequencerConfig.getDefault());
108        }
109    
110        /**
111         * Configures the stream-based resequencing algorithm using the given
112         * {@link StreamResequencerConfig}.
113         *
114         * @param config  the config
115         * @return the builder
116         */
117        public ResequenceDefinition stream(StreamResequencerConfig config) {
118            this.streamConfig = config;
119            this.batchConfig = null;
120            return this;
121        }
122    
123        /**
124         * Configures the batch-based resequencing algorithm using the given
125         * {@link BatchResequencerConfig}.
126         *
127         * @param config  the config
128         * @return the builder
129         */
130        public ResequenceDefinition batch(BatchResequencerConfig config) {
131            this.batchConfig = config;
132            this.streamConfig = null;
133            return this;
134        }
135    
136        /**
137         * Sets the timeout
138         * @param timeout  timeout in millis
139         * @return the builder
140         */
141        public ResequenceDefinition timeout(long timeout) {
142            if (streamConfig != null) {
143                streamConfig.setTimeout(timeout);
144            } else {
145                // initialize batch mode as its default mode
146                if (batchConfig == null) {
147                    batch();
148                }
149                batchConfig.setBatchTimeout(timeout);
150            }
151            return this;
152        }
153    
154        /**
155         * Sets the rejectOld flag to throw an error when a message older than the last delivered message is processed
156         * @return the builder
157         */
158        public ResequenceDefinition rejectOld() {
159            if (streamConfig == null) {
160                throw new IllegalStateException("rejectOld() only supported for stream resequencer");
161            }
162            streamConfig.setRejectOld(true);
163            return this;
164        }
165    
166        /**
167         * Sets the in batch size for number of exchanges received
168         * @param batchSize  the batch size
169         * @return the builder
170         */
171        public ResequenceDefinition size(int batchSize) {
172            if (streamConfig != null) {
173                throw new IllegalStateException("size() only supported for batch resequencer");
174            }
175            // initialize batch mode as its default mode
176            if (batchConfig == null) {
177                batch();
178            }
179            batchConfig.setBatchSize(batchSize);
180            return this;
181        }
182    
183        /**
184         * Sets the capacity for the stream resequencer
185         *
186         * @param capacity  the capacity
187         * @return the builder
188         */
189        public ResequenceDefinition capacity(int capacity) {
190            if (streamConfig == null) {
191                throw new IllegalStateException("capacity() only supported for stream resequencer");
192            }
193            streamConfig.setCapacity(capacity);
194            return this;
195    
196        }
197    
198        /**
199         * Enables duplicates for the batch resequencer mode
200         * @return the builder
201         */
202        public ResequenceDefinition allowDuplicates() {
203            if (streamConfig != null) {
204                throw new IllegalStateException("allowDuplicates() only supported for batch resequencer");
205            }
206            // initialize batch mode as its default mode
207            if (batchConfig == null) {
208                batch();
209            }
210            batchConfig.setAllowDuplicates(true);
211            return this;
212        }
213    
214        /**
215         * Enables reverse mode for the batch resequencer mode.
216         * <p/>
217         * This means the expression for determine the sequence order will be reversed.
218         * Can be used for Z..A or 9..0 ordering.
219         *
220         * @return the builder
221         */
222        public ResequenceDefinition reverse() {
223            if (streamConfig != null) {
224                throw new IllegalStateException("reverse() only supported for batch resequencer");
225            }
226            // initialize batch mode as its default mode
227            if (batchConfig == null) {
228                batch();
229            }
230            batchConfig.setReverse(true);
231            return this;
232        }
233    
234        /**
235         * If an incoming {@link org.apache.camel.Exchange} is invalid, then it will be ignored.
236         *
237         * @return builder
238         */
239        public ResequenceDefinition ignoreInvalidExchanges() {
240            if (streamConfig != null) {
241                streamConfig.setIgnoreInvalidExchanges(true);
242            } else {
243                // initialize batch mode as its default mode
244                if (batchConfig == null) {
245                    batch();
246                }
247                batchConfig.setIgnoreInvalidExchanges(true);
248            }
249            return this;
250        }
251    
252        /**
253         * Sets the comparator to use for stream resequencer
254         *
255         * @param comparator  the comparator
256         * @return the builder
257         */
258        public ResequenceDefinition comparator(ExpressionResultComparator comparator) {
259            if (streamConfig == null) {
260                throw new IllegalStateException("comparator() only supported for stream resequencer");
261            }
262            streamConfig.setComparator(comparator);
263            return this;
264        }
265    
266        @Override
267        public String toString() {
268            return "Resequencer[" + getExpression() + " -> " + getOutputs() + "]";
269        }
270        
271        @Override
272        public String getLabel() {
273            return "resequencer[" + (getExpression() != null ? getExpression().getLabel() : "") + "]";
274        }
275    
276        public ResequencerConfig getResequencerConfig() {
277            return resequencerConfig;
278        }
279    
280        public void setResequencerConfig(ResequencerConfig resequencerConfig) {
281            this.resequencerConfig = resequencerConfig;
282        }
283    
284        public BatchResequencerConfig getBatchConfig() {
285            if (batchConfig == null && resequencerConfig != null && resequencerConfig instanceof BatchResequencerConfig) {
286                return (BatchResequencerConfig) resequencerConfig;
287            }
288            return batchConfig;
289        }
290    
291        public StreamResequencerConfig getStreamConfig() {
292            if (streamConfig == null && resequencerConfig != null && resequencerConfig instanceof StreamResequencerConfig) {
293                return (StreamResequencerConfig) resequencerConfig;
294            }
295            return streamConfig;
296        }
297    
298        public void setBatchConfig(BatchResequencerConfig batchConfig) {
299            this.batchConfig = batchConfig;
300        }
301    
302        public void setStreamConfig(StreamResequencerConfig streamConfig) {
303            this.streamConfig = streamConfig;
304        }
305    
306        public ExpressionDefinition getExpression() {
307            return expression;
308        }
309    
310        public void setExpression(ExpressionDefinition expression) {
311            this.expression = expression;
312        }
313    
314        @Override
315        public Processor createProcessor(RouteContext routeContext) throws Exception {
316            // if configured from XML then streamConfig has been set with the configuration
317            if (resequencerConfig != null) {
318                if (resequencerConfig instanceof StreamResequencerConfig) {
319                    streamConfig = (StreamResequencerConfig) resequencerConfig;
320                } else {
321                    batchConfig = (BatchResequencerConfig) resequencerConfig;
322                }
323            }
324    
325            if (streamConfig != null) {
326                return createStreamResequencer(routeContext, streamConfig);
327            } else {
328                if (batchConfig == null) {
329                    // default as batch mode
330                    batch();
331                }
332                return createBatchResequencer(routeContext, batchConfig);
333            }
334        }
335    
336        /**
337         * Creates a batch {@link Resequencer} instance applying the given <code>config</code>.
338         * 
339         * @param routeContext route context.
340         * @param config batch resequencer configuration.
341         * @return the configured batch resequencer.
342         * @throws Exception can be thrown
343         */
344        @SuppressWarnings("deprecation")
345        protected Resequencer createBatchResequencer(RouteContext routeContext,
346                                                     BatchResequencerConfig config) throws Exception {
347            Processor processor = this.createChildProcessor(routeContext, true);
348            Expression expression = getExpression().createExpression(routeContext);
349    
350            // and wrap in unit of work
351            String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
352            CamelInternalProcessor internal = new CamelInternalProcessor(processor);
353            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
354            internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
355    
356            ObjectHelper.notNull(config, "config", this);
357            ObjectHelper.notNull(expression, "expression", this);
358    
359            Resequencer resequencer = new Resequencer(routeContext.getCamelContext(), internal, expression,
360                    config.isAllowDuplicates(), config.isReverse());
361            resequencer.setBatchSize(config.getBatchSize());
362            resequencer.setBatchTimeout(config.getBatchTimeout());
363            if (config.getIgnoreInvalidExchanges() != null) {
364                resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
365            }
366            return resequencer;
367        }
368    
369        /**
370         * Creates a {@link StreamResequencer} instance applying the given <code>config</code>.
371         * 
372         * @param routeContext route context.
373         * @param config stream resequencer configuration.
374         * @return the configured stream resequencer.
375         * @throws Exception can be thrwon
376         */
377        protected StreamResequencer createStreamResequencer(RouteContext routeContext,
378                                                            StreamResequencerConfig config) throws Exception {
379            Processor processor = this.createChildProcessor(routeContext, true);
380            Expression expression = getExpression().createExpression(routeContext);
381    
382            // and wrap in unit of work
383            String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
384            CamelInternalProcessor internal = new CamelInternalProcessor(processor);
385            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeId));
386            internal.addAdvice(new CamelInternalProcessor.RouteContextAdvice(routeContext));
387    
388            ObjectHelper.notNull(config, "config", this);
389            ObjectHelper.notNull(expression, "expression", this);
390    
391            ExpressionResultComparator comparator;
392            if (config.getComparatorRef() != null) {
393                comparator = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), config.getComparatorRef(), ExpressionResultComparator.class);
394            } else {
395                comparator = config.getComparator();
396            }
397            comparator.setExpression(expression);
398    
399            StreamResequencer resequencer = new StreamResequencer(routeContext.getCamelContext(), internal, comparator);
400            resequencer.setTimeout(config.getTimeout());
401            resequencer.setCapacity(config.getCapacity());
402            resequencer.setRejectOld(config.getRejectOld());
403            if (config.getIgnoreInvalidExchanges() != null) {
404                resequencer.setIgnoreInvalidExchanges(config.getIgnoreInvalidExchanges());
405            }
406            return resequencer;
407        }
408    
409    }