001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.concurrent.ExecutorService; 020 import java.util.concurrent.ScheduledExecutorService; 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlAttribute; 024 import javax.xml.bind.annotation.XmlRootElement; 025 import javax.xml.bind.annotation.XmlTransient; 026 027 import org.apache.camel.Expression; 028 import org.apache.camel.Processor; 029 import org.apache.camel.builder.ExpressionBuilder; 030 import org.apache.camel.processor.Throttler; 031 import org.apache.camel.spi.RouteContext; 032 import org.apache.camel.util.ObjectHelper; 033 034 /** 035 * Represents an XML <throttle/> element 036 * 037 * @version 038 */ 039 @XmlRootElement(name = "throttle") 040 @XmlAccessorType(XmlAccessType.FIELD) 041 public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> { 042 // TODO: Camel 3.0 Should not support outputs 043 044 @XmlTransient 045 private ExecutorService executorService; 046 @XmlAttribute 047 private String executorServiceRef; 048 @XmlAttribute 049 private Long timePeriodMillis; 050 @XmlAttribute 051 private Boolean asyncDelayed; 052 @XmlAttribute 053 private Boolean callerRunsWhenRejected; 054 055 public ThrottleDefinition() { 056 } 057 058 public ThrottleDefinition(Expression maximumRequestsPerPeriod) { 059 super(maximumRequestsPerPeriod); 060 } 061 062 @Override 063 public String toString() { 064 return "Throttle[" + description() + " -> " + getOutputs() + "]"; 065 } 066 067 protected String description() { 068 return getExpression() + " request per " + getTimePeriodMillis() + " millis"; 069 } 070 071 @Override 072 public String getShortName() { 073 return "throttle"; 074 } 075 076 @Override 077 public String getLabel() { 078 return "throttle[" + description() + "]"; 079 } 080 081 @Override 082 public Processor createProcessor(RouteContext routeContext) throws Exception { 083 Processor childProcessor = this.createChildProcessor(routeContext, true); 084 085 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed()); 086 ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed()); 087 088 // should be default 1000 millis 089 long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L; 090 091 // max requests per period is mandatory 092 Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext); 093 if (maxRequestsExpression == null) { 094 throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); 095 } 096 097 Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool); 098 099 if (getAsyncDelayed() != null) { 100 answer.setAsyncDelayed(getAsyncDelayed()); 101 } 102 103 if (getCallerRunsWhenRejected() == null) { 104 // should be true by default 105 answer.setCallerRunsWhenRejected(true); 106 } else { 107 answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); 108 } 109 return answer; 110 } 111 112 private Expression createMaxRequestsPerPeriodExpression(RouteContext routeContext) { 113 if (getExpression() != null) { 114 if (ObjectHelper.isNotEmpty(getExpression().getExpression()) || getExpression().getExpressionValue() != null) { 115 return getExpression().createExpression(routeContext); 116 } 117 } 118 return null; 119 } 120 121 // Fluent API 122 // ------------------------------------------------------------------------- 123 /** 124 * Sets the time period during which the maximum request count is valid for 125 * 126 * @param timePeriodMillis period in millis 127 * @return the builder 128 */ 129 public ThrottleDefinition timePeriodMillis(long timePeriodMillis) { 130 setTimePeriodMillis(timePeriodMillis); 131 return this; 132 } 133 134 /** 135 * Sets the time period during which the maximum request count per period 136 * 137 * @param maximumRequestsPerPeriod the maximum request count number per time period 138 * @return the builder 139 */ 140 public ThrottleDefinition maximumRequestsPerPeriod(Long maximumRequestsPerPeriod) { 141 setExpression(ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod))); 142 return this; 143 } 144 145 /** 146 * Whether or not the caller should run the task when it was rejected by the thread pool. 147 * <p/> 148 * Is by default <tt>true</tt> 149 * 150 * @param callerRunsWhenRejected whether or not the caller should run 151 * @return the builder 152 */ 153 public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { 154 setCallerRunsWhenRejected(callerRunsWhenRejected); 155 return this; 156 } 157 158 /** 159 * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying. 160 * 161 * @return the builder 162 */ 163 public ThrottleDefinition asyncDelayed() { 164 setAsyncDelayed(true); 165 return this; 166 } 167 168 public ThrottleDefinition executorService(ExecutorService executorService) { 169 setExecutorService(executorService); 170 return this; 171 } 172 173 public ThrottleDefinition executorServiceRef(String executorServiceRef) { 174 setExecutorServiceRef(executorServiceRef); 175 return this; 176 } 177 178 // Properties 179 // ------------------------------------------------------------------------- 180 181 public Long getTimePeriodMillis() { 182 return timePeriodMillis; 183 } 184 185 public void setTimePeriodMillis(Long timePeriodMillis) { 186 this.timePeriodMillis = timePeriodMillis; 187 } 188 189 public Boolean getAsyncDelayed() { 190 return asyncDelayed; 191 } 192 193 public void setAsyncDelayed(Boolean asyncDelayed) { 194 this.asyncDelayed = asyncDelayed; 195 } 196 197 public boolean isAsyncDelayed() { 198 return asyncDelayed != null && asyncDelayed; 199 } 200 201 public Boolean getCallerRunsWhenRejected() { 202 return callerRunsWhenRejected; 203 } 204 205 public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { 206 this.callerRunsWhenRejected = callerRunsWhenRejected; 207 } 208 209 public ExecutorService getExecutorService() { 210 return executorService; 211 } 212 213 public void setExecutorService(ExecutorService executorService) { 214 this.executorService = executorService; 215 } 216 217 public String getExecutorServiceRef() { 218 return executorServiceRef; 219 } 220 221 public void setExecutorServiceRef(String executorServiceRef) { 222 this.executorServiceRef = executorServiceRef; 223 } 224 }