001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.concurrent.ExecutorService;
020    import java.util.concurrent.ScheduledExecutorService;
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlAttribute;
024    import javax.xml.bind.annotation.XmlRootElement;
025    import javax.xml.bind.annotation.XmlTransient;
026    
027    import org.apache.camel.Expression;
028    import org.apache.camel.Processor;
029    import org.apache.camel.builder.ExpressionBuilder;
030    import org.apache.camel.processor.Throttler;
031    import org.apache.camel.spi.RouteContext;
032    import org.apache.camel.util.ObjectHelper;
033    
034    /**
035     * Represents an XML <throttle/> element
036     *
037     * @version 
038     */
039    @XmlRootElement(name = "throttle")
040    @XmlAccessorType(XmlAccessType.FIELD)
041    public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
042        // TODO: Camel 3.0 Should not support outputs
043    
044        @XmlTransient
045        private ExecutorService executorService;
046        @XmlAttribute
047        private String executorServiceRef;
048        @XmlAttribute
049        private Long timePeriodMillis;
050        @XmlAttribute
051        private Boolean asyncDelayed;
052        @XmlAttribute
053        private Boolean callerRunsWhenRejected;
054        
055        public ThrottleDefinition() {
056        }
057    
058        public ThrottleDefinition(Expression maximumRequestsPerPeriod) {
059            super(maximumRequestsPerPeriod);
060        }
061    
062        @Override
063        public String toString() {
064            return "Throttle[" + description() + " -> " + getOutputs() + "]";
065        }
066        
067        protected String description() {
068            return getExpression() + " request per " + getTimePeriodMillis() + " millis";
069        }
070    
071        @Override
072        public String getShortName() {
073            return "throttle";
074        }
075    
076        @Override
077        public String getLabel() {
078            return "throttle[" + description() + "]";
079        }
080    
081        @Override
082        public Processor createProcessor(RouteContext routeContext) throws Exception {
083            Processor childProcessor = this.createChildProcessor(routeContext, true);
084    
085            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
086            ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
087    
088            // should be default 1000 millis
089            long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
090    
091            // max requests per period is mandatory
092            Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
093            if (maxRequestsExpression == null) {
094                throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
095            }
096    
097            Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
098    
099            if (getAsyncDelayed() != null) {
100                answer.setAsyncDelayed(getAsyncDelayed());
101            }
102            
103            if (getCallerRunsWhenRejected() == null) {
104                // should be true by default
105                answer.setCallerRunsWhenRejected(true);
106            } else {
107                answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
108            }
109            return answer;
110        }
111    
112        private Expression createMaxRequestsPerPeriodExpression(RouteContext routeContext) {
113            if (getExpression() != null) {
114                if (ObjectHelper.isNotEmpty(getExpression().getExpression()) || getExpression().getExpressionValue() != null) {
115                    return getExpression().createExpression(routeContext);
116                } 
117            } 
118            return null;
119        }
120        
121        // Fluent API
122        // -------------------------------------------------------------------------
123        /**
124         * Sets the time period during which the maximum request count is valid for
125         *
126         * @param timePeriodMillis  period in millis
127         * @return the builder
128         */
129        public ThrottleDefinition timePeriodMillis(long timePeriodMillis) {
130            setTimePeriodMillis(timePeriodMillis);
131            return this;
132        }
133        
134        /**
135         * Sets the time period during which the maximum request count per period
136         *
137         * @param maximumRequestsPerPeriod  the maximum request count number per time period
138         * @return the builder
139         */
140        public ThrottleDefinition maximumRequestsPerPeriod(Long maximumRequestsPerPeriod) {
141            setExpression(ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod)));
142            return this;
143        }
144    
145        /**
146         * Whether or not the caller should run the task when it was rejected by the thread pool.
147         * <p/>
148         * Is by default <tt>true</tt>
149         *
150         * @param callerRunsWhenRejected whether or not the caller should run
151         * @return the builder
152         */
153        public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
154            setCallerRunsWhenRejected(callerRunsWhenRejected);
155            return this;
156        }
157    
158        /**
159         * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying.
160         *
161         * @return the builder
162         */
163        public ThrottleDefinition asyncDelayed() {
164            setAsyncDelayed(true);
165            return this;
166        }
167    
168        public ThrottleDefinition executorService(ExecutorService executorService) {
169            setExecutorService(executorService);
170            return this;
171        }
172    
173        public ThrottleDefinition executorServiceRef(String executorServiceRef) {
174            setExecutorServiceRef(executorServiceRef);
175            return this;
176        }
177    
178        // Properties
179        // -------------------------------------------------------------------------
180    
181        public Long getTimePeriodMillis() {
182            return timePeriodMillis;
183        }
184    
185        public void setTimePeriodMillis(Long timePeriodMillis) {
186            this.timePeriodMillis = timePeriodMillis;
187        }
188    
189        public Boolean getAsyncDelayed() {
190            return asyncDelayed;
191        }
192    
193        public void setAsyncDelayed(Boolean asyncDelayed) {
194            this.asyncDelayed = asyncDelayed;
195        }
196    
197        public boolean isAsyncDelayed() {
198            return asyncDelayed != null && asyncDelayed;
199        }
200    
201        public Boolean getCallerRunsWhenRejected() {
202            return callerRunsWhenRejected;
203        }
204    
205        public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
206            this.callerRunsWhenRejected = callerRunsWhenRejected;
207        }
208    
209        public ExecutorService getExecutorService() {
210            return executorService;
211        }
212    
213        public void setExecutorService(ExecutorService executorService) {
214            this.executorService = executorService;
215        }
216    
217        public String getExecutorServiceRef() {
218            return executorServiceRef;
219        }
220    
221        public void setExecutorServiceRef(String executorServiceRef) {
222            this.executorServiceRef = executorServiceRef;
223        }
224    }