001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Arrays;
021    import java.util.Collection;
022    import java.util.List;
023    
024    import javax.xml.bind.annotation.XmlAccessType;
025    import javax.xml.bind.annotation.XmlAccessorType;
026    import javax.xml.bind.annotation.XmlAttribute;
027    import javax.xml.bind.annotation.XmlElement;
028    import javax.xml.bind.annotation.XmlElementRef;
029    import javax.xml.bind.annotation.XmlElements;
030    import javax.xml.bind.annotation.XmlRootElement;
031    
032    import org.apache.camel.Expression;
033    import org.apache.camel.Processor;
034    import org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition;
035    import org.apache.camel.model.loadbalancer.FailoverLoadBalancerDefinition;
036    import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition;
037    import org.apache.camel.model.loadbalancer.RoundRobinLoadBalancerDefinition;
038    import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition;
039    import org.apache.camel.model.loadbalancer.TopicLoadBalancerDefinition;
040    import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition;
041    import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer;
042    import org.apache.camel.processor.loadbalancer.LoadBalancer;
043    import org.apache.camel.processor.loadbalancer.RandomLoadBalancer;
044    import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer;
045    import org.apache.camel.processor.loadbalancer.StickyLoadBalancer;
046    import org.apache.camel.processor.loadbalancer.TopicLoadBalancer;
047    import org.apache.camel.processor.loadbalancer.WeightedLoadBalancer;
048    import org.apache.camel.processor.loadbalancer.WeightedRandomLoadBalancer;
049    import org.apache.camel.processor.loadbalancer.WeightedRoundRobinLoadBalancer;
050    import org.apache.camel.spi.RouteContext;
051    import org.apache.camel.util.CollectionStringBuffer;
052    
053    /**
054     * Represents an XML <loadBalance/> element
055     */
056    @XmlRootElement(name = "loadBalance")
057    @XmlAccessorType(XmlAccessType.FIELD)
058    public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefinition> {
059        @XmlAttribute
060        @Deprecated
061        private String ref;
062        @XmlElements({
063                @XmlElement(required = false, name = "failover", type = FailoverLoadBalancerDefinition.class),
064                @XmlElement(required = false, name = "random", type = RandomLoadBalancerDefinition.class),
065                @XmlElement(required = false, name = "custom", type = CustomLoadBalancerDefinition.class),
066                @XmlElement(required = false, name = "roundRobin", type = RoundRobinLoadBalancerDefinition.class),
067                @XmlElement(required = false, name = "sticky", type = StickyLoadBalancerDefinition.class),
068                @XmlElement(required = false, name = "topic", type = TopicLoadBalancerDefinition.class),
069                @XmlElement(required = false, name = "weighted", type = WeightedLoadBalancerDefinition.class)}
070        )
071        private LoadBalancerDefinition loadBalancerType;
072        @XmlElementRef
073        private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
074    
075        public LoadBalanceDefinition() {
076        }
077    
078        @Override
079        public String getShortName() {
080            return "loadbalance";
081        }
082    
083        @Override
084        public List<ProcessorDefinition<?>> getOutputs() {
085            return outputs;
086        }
087    
088        public void setOutputs(List<ProcessorDefinition<?>> outputs) {
089            this.outputs = outputs;
090            if (outputs != null) {
091                for (ProcessorDefinition<?> output : outputs) {
092                    configureChild(output);
093                }
094            }
095        }
096    
097        public boolean isOutputSupported() {
098            return true;
099        }
100    
101        public String getRef() {
102            return ref;
103        }
104    
105        public void setRef(String ref) {
106            this.ref = ref;
107        }
108    
109        public LoadBalancerDefinition getLoadBalancerType() {
110            return loadBalancerType;
111        }
112    
113        public void setLoadBalancerType(LoadBalancerDefinition loadbalancer) {
114            if (loadBalancerType != null) {
115                throw new IllegalArgumentException("Loadbalancer already configured to: " + loadBalancerType + ". Cannot set it to: " + loadbalancer);
116            }
117            loadBalancerType = loadbalancer;
118        }
119    
120        protected Processor createOutputsProcessor(RouteContext routeContext,
121            Collection<ProcessorDefinition<?>> outputs) throws Exception {
122    
123            LoadBalancer loadBalancer = LoadBalancerDefinition.getLoadBalancer(routeContext, loadBalancerType, ref);
124            for (ProcessorDefinition<?> processorType : outputs) {
125                Processor processor = createProcessor(routeContext, processorType);
126                loadBalancer.addProcessor(processor);
127            }
128            return loadBalancer;
129        }
130        
131        @Override
132        public Processor createProcessor(RouteContext routeContext) throws Exception {
133            LoadBalancer loadBalancer = LoadBalancerDefinition.getLoadBalancer(routeContext, loadBalancerType, ref);
134            for (ProcessorDefinition<?> processorType : getOutputs()) {
135                // output must not be another load balancer
136                // check for instanceof as the code below as there is compilation errors on earlier versions of JDK6
137                // on Windows boxes or with IBM JDKs etc.
138                if (LoadBalanceDefinition.class.isInstance(processorType)) {
139                    throw new IllegalArgumentException("Loadbalancer already configured to: " + loadBalancerType + ". Cannot set it to: " + processorType);
140                }
141                Processor processor = createProcessor(routeContext, processorType);
142                processor = wrapChannel(routeContext, processor, processorType);
143                loadBalancer.addProcessor(processor);
144            }
145            return loadBalancer;
146        }
147        
148        // Fluent API
149        // -------------------------------------------------------------------------
150    
151        /**
152         * Uses a custom load balancer
153         *
154         * @param loadBalancer  the load balancer
155         * @return the builder
156         */
157        public LoadBalanceDefinition loadBalance(LoadBalancer loadBalancer) {
158            setLoadBalancerType(new LoadBalancerDefinition(loadBalancer));
159            return this;
160        }
161        
162        /**
163         * Uses fail over load balancer
164         * <p/>
165         * Will not round robin and inherit the error handler.
166         *
167         * @return the builder
168         */
169        public LoadBalanceDefinition failover() {
170            return failover(-1, true, false);
171        }
172        
173        /**
174         * Uses fail over load balancer
175         * <p/>
176         * Will not round robin and inherit the error handler.
177         *
178         * @param exceptions exception classes which we want to failover if one of them was thrown
179         * @return the builder
180         */
181        public LoadBalanceDefinition failover(Class<?>... exceptions) {
182            return failover(-1, true, false, exceptions);
183        }
184    
185        /**
186         * Uses fail over load balancer
187         *
188         * @param maximumFailoverAttempts  maximum number of failover attempts before exhausting.
189         *                                 Use -1 to newer exhaust when round robin is also enabled.
190         *                                 If round robin is disabled then it will exhaust when there are no more endpoints to failover
191         * @param inheritErrorHandler      whether or not to inherit error handler.
192         *                                 If <tt>false</tt> then it will failover immediately in case of an exception
193         * @param roundRobin               whether or not to use round robin (which keeps state)
194         * @param exceptions               exception classes which we want to failover if one of them was thrown
195         * @return the builder
196         */
197        public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, Class<?>... exceptions) {
198            FailOverLoadBalancer failover = new FailOverLoadBalancer(Arrays.asList(exceptions));
199            failover.setMaximumFailoverAttempts(maximumFailoverAttempts);
200            failover.setRoundRobin(roundRobin);
201            setLoadBalancerType(new LoadBalancerDefinition(failover));
202            this.setInheritErrorHandler(inheritErrorHandler);
203            return this;
204        }
205    
206        /**
207         * Uses weighted load balancer
208         *
209         * @param roundRobin                   used to set the processor selection algorithm.
210         * @param distributionRatio            String of weighted ratios for distribution of messages.
211         * @return the builder
212         */
213        public LoadBalanceDefinition weighted(boolean roundRobin, String distributionRatio) {
214            return weighted(roundRobin, distributionRatio, ",");
215        }
216        
217        /**
218         * Uses weighted load balancer
219         *
220         * @param roundRobin                   used to set the processor selection algorithm.
221         * @param distributionRatio            String of weighted ratios for distribution of messages.
222         * @param distributionRatioDelimiter   String containing delimiter to be used for ratios
223         * @return the builder
224         */
225        public LoadBalanceDefinition weighted(boolean roundRobin, String distributionRatio, String distributionRatioDelimiter) {
226            WeightedLoadBalancer weighted;
227            List<Integer> distributionRatioList = new ArrayList<Integer>();
228            
229            String[] ratios = distributionRatio.split(distributionRatioDelimiter);
230            for (String ratio : ratios) {
231                distributionRatioList.add(new Integer(ratio.trim()));
232            }
233            
234            if (!roundRobin) {
235                weighted = new WeightedRandomLoadBalancer(distributionRatioList);
236            } else {
237                weighted = new WeightedRoundRobinLoadBalancer(distributionRatioList);
238            }
239            setLoadBalancerType(new LoadBalancerDefinition(weighted));
240            return this;
241        }
242        
243        /**
244         * Uses round robin load balancer
245         *
246         * @return the builder
247         */
248        public LoadBalanceDefinition roundRobin() {
249            setLoadBalancerType(new LoadBalancerDefinition(new RoundRobinLoadBalancer()));
250            return this;
251        }
252    
253        /**
254         * Uses random load balancer
255         *
256         * @return the builder
257         */
258        public LoadBalanceDefinition random() {
259            setLoadBalancerType(new LoadBalancerDefinition(new RandomLoadBalancer()));
260            return this;
261        }
262    
263        /**
264         * Uses the custom load balancer
265         *
266         * @param ref reference to lookup a custom load balancer from the {@link org.apache.camel.spi.Registry} to be used.
267         * @return the builder
268         */
269        public LoadBalanceDefinition custom(String ref) {
270            CustomLoadBalancerDefinition balancer = new CustomLoadBalancerDefinition();
271            balancer.setRef(ref);
272            setLoadBalancerType(balancer);
273            return this;
274        }
275    
276        /**
277         * Uses sticky load balancer
278         *
279         * @param correlationExpression  the expression for correlation
280         * @return  the builder
281         */
282        public LoadBalanceDefinition sticky(Expression correlationExpression) {
283            setLoadBalancerType(new LoadBalancerDefinition(new StickyLoadBalancer(correlationExpression)));
284            return this;
285        }
286    
287        /**
288         * Uses topic load balancer
289         * 
290         * @return the builder
291         */
292        public LoadBalanceDefinition topic() {
293            setLoadBalancerType(new LoadBalancerDefinition(new TopicLoadBalancer()));
294            return this;
295        }
296    
297        @Override
298        public String getLabel() {
299            CollectionStringBuffer buffer = new CollectionStringBuffer("loadBalance[");
300            List<ProcessorDefinition<?>> list = getOutputs();
301            for (ProcessorDefinition<?> processorType : list) {
302                buffer.append(processorType.getLabel());
303            }
304            buffer.append("]");
305            return buffer.toString();
306        }
307    
308        @Override
309        public String toString() {
310            if (loadBalancerType != null) {
311                return "LoadBalanceType[" + loadBalancerType + ", " + getOutputs() + "]";
312            } else {
313                return "LoadBalanceType[ref:" + ref + ", " + getOutputs() + "]";
314            }
315        }
316    }