001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Comparator;
020    import java.util.Set;
021    import java.util.TreeSet;
022    
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.Exchange;
025    import org.apache.camel.Expression;
026    import org.apache.camel.Processor;
027    import org.apache.camel.Traceable;
028    import org.apache.camel.util.ExpressionComparator;
029    
030    /**
031     * An implementation of the <a href="http://camel.apache.org/resequencer.html">Resequencer</a>
032     * which can reorder messages within a batch.
033     *
034     * @version 
035     */
036    @SuppressWarnings("deprecation")
037    public class Resequencer extends BatchProcessor implements Traceable {
038    
039        // TODO: Rework to avoid using BatchProcessor
040    
041        public Resequencer(CamelContext camelContext, Processor processor, Expression expression) {
042            this(camelContext, processor, createSet(expression, false, false), expression);
043        }
044    
045        public Resequencer(CamelContext camelContext, Processor processor, Expression expression,
046                           boolean allowDuplicates, boolean reverse) {
047            this(camelContext, processor, createSet(expression, allowDuplicates, reverse), expression);
048        }
049    
050        public Resequencer(CamelContext camelContext, Processor processor, Set<Exchange> collection, Expression expression) {
051            super(camelContext, processor, collection, expression);
052        }
053    
054        @Override
055        public String toString() {
056            return "Resequencer[to: " + getProcessor() + "]";
057        }
058    
059        public String getTraceLabel() {
060            return "resequencer";
061        }
062    
063        // Implementation methods
064        //-------------------------------------------------------------------------
065    
066        protected static Set<Exchange> createSet(Expression expression, boolean allowDuplicates, boolean reverse) {
067            return createSet(new ExpressionComparator(expression), allowDuplicates, reverse);
068        }
069    
070        protected static Set<Exchange> createSet(final Comparator<? super Exchange> comparator, boolean allowDuplicates, boolean reverse) {
071            Comparator<? super Exchange> answer = comparator;
072    
073            if (reverse) {
074                answer = new Comparator<Exchange>() {
075                    public int compare(Exchange o1, Exchange o2) {
076                        int answer = comparator.compare(o1, o2);
077                        // reverse it
078                        return answer * -1;
079                    }
080                };
081            }
082    
083            // if we allow duplicates then we need to cater for that in the comparator
084            final Comparator<? super Exchange> forAllowDuplicates = answer;
085            if (allowDuplicates) {
086                answer = new Comparator<Exchange>() {
087                    public int compare(Exchange o1, Exchange o2) {
088                        int answer = forAllowDuplicates.compare(o1, o2);
089                        if (answer == 0) {
090                            // they are equal but we should allow duplicates so say that o2 is higher
091                            // so it will come next
092                            return 1;
093                        }
094                        return answer;
095                    }
096                };
097            }
098    
099            return new TreeSet<Exchange>(answer);
100        }
101    
102    }