001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.ScheduledExecutorService;
020    import java.util.concurrent.atomic.AtomicLong;
021    
022    import org.apache.camel.CamelContext;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Expression;
025    import org.apache.camel.Processor;
026    import org.apache.camel.RuntimeExchangeException;
027    import org.apache.camel.Traceable;
028    import org.apache.camel.util.ObjectHelper;
029    
030    /**
031     * A <a href="http://camel.apache.org/throttler.html">Throttler</a>
032     * will set a limit on the maximum number of message exchanges which can be sent
033     * to a processor within a specific time period. <p/> This pattern can be
034     * extremely useful if you have some external system which meters access; such
035     * as only allowing 100 requests per second; or if huge load can cause a
036     * particular system to malfunction or to reduce its throughput you might want
037     * to introduce some throttling.
038     *
039     * @version
040     */
041    public class Throttler extends DelayProcessorSupport implements Traceable {
042        private volatile long maximumRequestsPerPeriod;
043        private Expression maxRequestsPerPeriodExpression;
044        private AtomicLong timePeriodMillis = new AtomicLong(1000);
045        private volatile TimeSlot slot;
046    
047        public Throttler(CamelContext camelContext, Processor processor, Expression maxRequestsPerPeriodExpression, long timePeriodMillis,
048                         ScheduledExecutorService executorService, boolean shutdownExecutorService) {
049            super(camelContext, processor, executorService, shutdownExecutorService);
050    
051            ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression");
052            this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
053    
054            if (timePeriodMillis <= 0) {
055                throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
056            }
057            this.timePeriodMillis.set(timePeriodMillis);
058        }
059    
060        @Override
061        public String toString() {
062            return "Throttler[requests: " + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + " (ms) to: "
063                   + getProcessor() + "]";
064        }
065    
066        public String getTraceLabel() {
067            return "throttle[" + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + "]";
068        }
069    
070        // Properties
071        // -----------------------------------------------------------------------
072    
073        /**
074         * Sets the maximum number of requests per time period expression
075         */
076        public void setMaximumRequestsPerPeriodExpression(Expression maxRequestsPerPeriodExpression) {
077            this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
078        }
079    
080        public Expression getMaximumRequestsPerPeriodExpression() {
081            return maxRequestsPerPeriodExpression;
082        }
083    
084        public long getTimePeriodMillis() {
085            return timePeriodMillis.get();
086        }
087    
088        /**
089         * Gets the current maximum request per period value.
090         */
091        public long getCurrentMaximumRequestsPerPeriod() {
092            return maximumRequestsPerPeriod;
093        }
094    
095        /**
096         * Sets the time period during which the maximum number of requests apply
097         */
098        public void setTimePeriodMillis(long timePeriodMillis) {
099            this.timePeriodMillis.set(timePeriodMillis);
100        }
101    
102        // Implementation methods
103        // -----------------------------------------------------------------------
104    
105        protected long calculateDelay(Exchange exchange) {
106            // evaluate as Object first to see if we get any result at all
107            Object result = maxRequestsPerPeriodExpression.evaluate(exchange, Object.class);
108            if (result == null) {
109                throw new RuntimeExchangeException("The max requests per period expression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
110            }
111    
112            // then must convert value to long
113            Long longValue = exchange.getContext().getTypeConverter().convertTo(Long.class, result);
114            if (longValue != null) {
115                // log if we changed max period after initial setting
116                if (maximumRequestsPerPeriod > 0 && longValue.longValue() != maximumRequestsPerPeriod) {
117                    log.debug("Throttler changed maximum requests per period from {} to {}", maximumRequestsPerPeriod, longValue);
118                }
119                if (maximumRequestsPerPeriod > longValue) {
120                    slot.capacity = 0;
121                }
122                maximumRequestsPerPeriod = longValue;
123            }
124    
125            if (maximumRequestsPerPeriod <= 0) {
126                throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + maximumRequestsPerPeriod);
127            }
128    
129            TimeSlot slot = nextSlot();
130            if (!slot.isActive()) {
131                long delay = slot.startTime - currentSystemTime();
132                return delay;
133            } else {
134                return 0;
135            }
136        }
137    
138        /*
139         * Determine what the next available time slot is for handling an Exchange
140         */
141        protected synchronized TimeSlot nextSlot() {
142            if (slot == null) {
143                slot = new TimeSlot();
144            }
145            if (slot.isFull() || !slot.isPast()) {
146                slot = slot.next();
147            }
148            slot.assign();
149            return slot;
150        }
151    
152        /*
153        * A time slot is capable of handling a number of exchanges within a certain period of time.
154        */
155        protected class TimeSlot {
156    
157            private volatile long capacity = Throttler.this.maximumRequestsPerPeriod;
158            private final long duration = Throttler.this.timePeriodMillis.get();
159            private final long startTime;
160    
161            protected TimeSlot() {
162                this(System.currentTimeMillis());
163            }
164    
165            protected TimeSlot(long startTime) {
166                this.startTime = startTime;
167            }
168    
169            protected void assign() {
170                capacity--;
171            }
172    
173            /*
174             * Start the next time slot either now or in the future
175             * (no time slots are being created in the past)
176             */
177            protected TimeSlot next() {
178                return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
179            }
180    
181            protected boolean isPast() {
182                long current = System.currentTimeMillis();
183                return current < (startTime + duration);
184            }
185    
186            protected boolean isActive() {
187                long current = System.currentTimeMillis();
188                return startTime <= current && current < (startTime + duration);
189            }
190    
191            protected boolean isFull() {
192                return capacity <= 0;
193            }
194        }
195    
196        TimeSlot getSlot() {
197            return this.slot;
198        }
199    }