001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Arrays; 021 import java.util.Collection; 022 import java.util.List; 023 024 import javax.xml.bind.annotation.XmlAccessType; 025 import javax.xml.bind.annotation.XmlAccessorType; 026 import javax.xml.bind.annotation.XmlAttribute; 027 import javax.xml.bind.annotation.XmlElement; 028 import javax.xml.bind.annotation.XmlElementRef; 029 import javax.xml.bind.annotation.XmlElements; 030 import javax.xml.bind.annotation.XmlRootElement; 031 032 import org.apache.camel.Expression; 033 import org.apache.camel.Processor; 034 import org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition; 035 import org.apache.camel.model.loadbalancer.FailoverLoadBalancerDefinition; 036 import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition; 037 import org.apache.camel.model.loadbalancer.RoundRobinLoadBalancerDefinition; 038 import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition; 039 import org.apache.camel.model.loadbalancer.TopicLoadBalancerDefinition; 040 import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition; 041 import org.apache.camel.processor.loadbalancer.FailOverLoadBalancer; 042 import org.apache.camel.processor.loadbalancer.LoadBalancer; 043 import org.apache.camel.processor.loadbalancer.RandomLoadBalancer; 044 import org.apache.camel.processor.loadbalancer.RoundRobinLoadBalancer; 045 import org.apache.camel.processor.loadbalancer.StickyLoadBalancer; 046 import org.apache.camel.processor.loadbalancer.TopicLoadBalancer; 047 import org.apache.camel.processor.loadbalancer.WeightedLoadBalancer; 048 import org.apache.camel.processor.loadbalancer.WeightedRandomLoadBalancer; 049 import org.apache.camel.processor.loadbalancer.WeightedRoundRobinLoadBalancer; 050 import org.apache.camel.spi.RouteContext; 051 import org.apache.camel.util.CollectionStringBuffer; 052 053 /** 054 * Represents an XML <loadBalance/> element 055 */ 056 @XmlRootElement(name = "loadBalance") 057 @XmlAccessorType(XmlAccessType.FIELD) 058 public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefinition> { 059 @XmlAttribute 060 @Deprecated 061 private String ref; 062 @XmlElements({ 063 @XmlElement(required = false, name = "failover", type = FailoverLoadBalancerDefinition.class), 064 @XmlElement(required = false, name = "random", type = RandomLoadBalancerDefinition.class), 065 @XmlElement(required = false, name = "custom", type = CustomLoadBalancerDefinition.class), 066 @XmlElement(required = false, name = "roundRobin", type = RoundRobinLoadBalancerDefinition.class), 067 @XmlElement(required = false, name = "sticky", type = StickyLoadBalancerDefinition.class), 068 @XmlElement(required = false, name = "topic", type = TopicLoadBalancerDefinition.class), 069 @XmlElement(required = false, name = "weighted", type = WeightedLoadBalancerDefinition.class)} 070 ) 071 private LoadBalancerDefinition loadBalancerType; 072 @XmlElementRef 073 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 074 075 public LoadBalanceDefinition() { 076 } 077 078 @Override 079 public String getShortName() { 080 return "loadbalance"; 081 } 082 083 @Override 084 public List<ProcessorDefinition<?>> getOutputs() { 085 return outputs; 086 } 087 088 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 089 this.outputs = outputs; 090 if (outputs != null) { 091 for (ProcessorDefinition<?> output : outputs) { 092 configureChild(output); 093 } 094 } 095 } 096 097 public boolean isOutputSupported() { 098 return true; 099 } 100 101 public String getRef() { 102 return ref; 103 } 104 105 public void setRef(String ref) { 106 this.ref = ref; 107 } 108 109 public LoadBalancerDefinition getLoadBalancerType() { 110 return loadBalancerType; 111 } 112 113 public void setLoadBalancerType(LoadBalancerDefinition loadbalancer) { 114 if (loadBalancerType != null) { 115 throw new IllegalArgumentException("Loadbalancer already configured to: " + loadBalancerType + ". Cannot set it to: " + loadbalancer); 116 } 117 loadBalancerType = loadbalancer; 118 } 119 120 protected Processor createOutputsProcessor(RouteContext routeContext, 121 Collection<ProcessorDefinition<?>> outputs) throws Exception { 122 123 LoadBalancer loadBalancer = LoadBalancerDefinition.getLoadBalancer(routeContext, loadBalancerType, ref); 124 for (ProcessorDefinition<?> processorType : outputs) { 125 Processor processor = createProcessor(routeContext, processorType); 126 loadBalancer.addProcessor(processor); 127 } 128 return loadBalancer; 129 } 130 131 @Override 132 public Processor createProcessor(RouteContext routeContext) throws Exception { 133 LoadBalancer loadBalancer = LoadBalancerDefinition.getLoadBalancer(routeContext, loadBalancerType, ref); 134 for (ProcessorDefinition<?> processorType : getOutputs()) { 135 // output must not be another load balancer 136 // check for instanceof as the code below as there is compilation errors on earlier versions of JDK6 137 // on Windows boxes or with IBM JDKs etc. 138 if (LoadBalanceDefinition.class.isInstance(processorType)) { 139 throw new IllegalArgumentException("Loadbalancer already configured to: " + loadBalancerType + ". Cannot set it to: " + processorType); 140 } 141 Processor processor = createProcessor(routeContext, processorType); 142 processor = wrapChannel(routeContext, processor, processorType); 143 loadBalancer.addProcessor(processor); 144 } 145 return loadBalancer; 146 } 147 148 // Fluent API 149 // ------------------------------------------------------------------------- 150 151 /** 152 * Uses a custom load balancer 153 * 154 * @param loadBalancer the load balancer 155 * @return the builder 156 */ 157 public LoadBalanceDefinition loadBalance(LoadBalancer loadBalancer) { 158 setLoadBalancerType(new LoadBalancerDefinition(loadBalancer)); 159 return this; 160 } 161 162 /** 163 * Uses fail over load balancer 164 * <p/> 165 * Will not round robin and inherit the error handler. 166 * 167 * @return the builder 168 */ 169 public LoadBalanceDefinition failover() { 170 return failover(-1, true, false); 171 } 172 173 /** 174 * Uses fail over load balancer 175 * <p/> 176 * Will not round robin and inherit the error handler. 177 * 178 * @param exceptions exception classes which we want to failover if one of them was thrown 179 * @return the builder 180 */ 181 public LoadBalanceDefinition failover(Class<?>... exceptions) { 182 return failover(-1, true, false, exceptions); 183 } 184 185 /** 186 * Uses fail over load balancer 187 * 188 * @param maximumFailoverAttempts maximum number of failover attempts before exhausting. 189 * Use -1 to newer exhaust when round robin is also enabled. 190 * If round robin is disabled then it will exhaust when there are no more endpoints to failover 191 * @param inheritErrorHandler whether or not to inherit error handler. 192 * If <tt>false</tt> then it will failover immediately in case of an exception 193 * @param roundRobin whether or not to use round robin (which keeps state) 194 * @param exceptions exception classes which we want to failover if one of them was thrown 195 * @return the builder 196 */ 197 public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, Class<?>... exceptions) { 198 FailOverLoadBalancer failover = new FailOverLoadBalancer(Arrays.asList(exceptions)); 199 failover.setMaximumFailoverAttempts(maximumFailoverAttempts); 200 failover.setRoundRobin(roundRobin); 201 setLoadBalancerType(new LoadBalancerDefinition(failover)); 202 this.setInheritErrorHandler(inheritErrorHandler); 203 return this; 204 } 205 206 /** 207 * Uses weighted load balancer 208 * 209 * @param roundRobin used to set the processor selection algorithm. 210 * @param distributionRatio String of weighted ratios for distribution of messages. 211 * @return the builder 212 */ 213 public LoadBalanceDefinition weighted(boolean roundRobin, String distributionRatio) { 214 return weighted(roundRobin, distributionRatio, ","); 215 } 216 217 /** 218 * Uses weighted load balancer 219 * 220 * @param roundRobin used to set the processor selection algorithm. 221 * @param distributionRatio String of weighted ratios for distribution of messages. 222 * @param distributionRatioDelimiter String containing delimiter to be used for ratios 223 * @return the builder 224 */ 225 public LoadBalanceDefinition weighted(boolean roundRobin, String distributionRatio, String distributionRatioDelimiter) { 226 WeightedLoadBalancer weighted; 227 List<Integer> distributionRatioList = new ArrayList<Integer>(); 228 229 String[] ratios = distributionRatio.split(distributionRatioDelimiter); 230 for (String ratio : ratios) { 231 distributionRatioList.add(new Integer(ratio.trim())); 232 } 233 234 if (!roundRobin) { 235 weighted = new WeightedRandomLoadBalancer(distributionRatioList); 236 } else { 237 weighted = new WeightedRoundRobinLoadBalancer(distributionRatioList); 238 } 239 setLoadBalancerType(new LoadBalancerDefinition(weighted)); 240 return this; 241 } 242 243 /** 244 * Uses round robin load balancer 245 * 246 * @return the builder 247 */ 248 public LoadBalanceDefinition roundRobin() { 249 setLoadBalancerType(new LoadBalancerDefinition(new RoundRobinLoadBalancer())); 250 return this; 251 } 252 253 /** 254 * Uses random load balancer 255 * 256 * @return the builder 257 */ 258 public LoadBalanceDefinition random() { 259 setLoadBalancerType(new LoadBalancerDefinition(new RandomLoadBalancer())); 260 return this; 261 } 262 263 /** 264 * Uses the custom load balancer 265 * 266 * @param ref reference to lookup a custom load balancer from the {@link org.apache.camel.spi.Registry} to be used. 267 * @return the builder 268 */ 269 public LoadBalanceDefinition custom(String ref) { 270 CustomLoadBalancerDefinition balancer = new CustomLoadBalancerDefinition(); 271 balancer.setRef(ref); 272 setLoadBalancerType(balancer); 273 return this; 274 } 275 276 /** 277 * Uses sticky load balancer 278 * 279 * @param correlationExpression the expression for correlation 280 * @return the builder 281 */ 282 public LoadBalanceDefinition sticky(Expression correlationExpression) { 283 setLoadBalancerType(new LoadBalancerDefinition(new StickyLoadBalancer(correlationExpression))); 284 return this; 285 } 286 287 /** 288 * Uses topic load balancer 289 * 290 * @return the builder 291 */ 292 public LoadBalanceDefinition topic() { 293 setLoadBalancerType(new LoadBalancerDefinition(new TopicLoadBalancer())); 294 return this; 295 } 296 297 @Override 298 public String getLabel() { 299 CollectionStringBuffer buffer = new CollectionStringBuffer("loadBalance["); 300 List<ProcessorDefinition<?>> list = getOutputs(); 301 for (ProcessorDefinition<?> processorType : list) { 302 buffer.append(processorType.getLabel()); 303 } 304 buffer.append("]"); 305 return buffer.toString(); 306 } 307 308 @Override 309 public String toString() { 310 if (loadBalancerType != null) { 311 return "LoadBalanceType[" + loadBalancerType + ", " + getOutputs() + "]"; 312 } else { 313 return "LoadBalanceType[ref:" + ref + ", " + getOutputs() + "]"; 314 } 315 } 316 }