001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.EventObject;
020    import java.util.LinkedHashSet;
021    import java.util.Set;
022    import java.util.concurrent.locks.Lock;
023    import java.util.concurrent.locks.ReentrantLock;
024    
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.CamelContextAware;
027    import org.apache.camel.Consumer;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.LoggingLevel;
030    import org.apache.camel.Route;
031    import org.apache.camel.management.event.ExchangeCompletedEvent;
032    import org.apache.camel.support.EventNotifierSupport;
033    import org.apache.camel.util.CamelLogger;
034    import org.apache.camel.util.ObjectHelper;
035    import org.apache.camel.util.ServiceHelper;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic
040     * throttling a route based on number of current inflight exchanges.
041     * <p/>
042     * This implementation supports two scopes {@link ThrottlingScope#Context} and {@link ThrottlingScope#Route} (is default).
043     * If context scope is selected then this implementation will use a {@link org.apache.camel.spi.EventNotifier} to listen
044     * for events when {@link Exchange}s is done, and trigger the {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)}
045     * method. If the route scope is selected then <b>no</b> {@link org.apache.camel.spi.EventNotifier} is in use, as there is already
046     * a {@link org.apache.camel.spi.Synchronization} callback on the current {@link Exchange} which triggers the
047     * {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)} when the current {@link Exchange} is done.
048     *
049     * @version 
050     */
051    public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements CamelContextAware {
052    
053        public enum ThrottlingScope {
054            Context, Route
055        }
056    
057        private final Set<Route> routes = new LinkedHashSet<Route>();
058        private ContextScopedEventNotifier eventNotifier;
059        private CamelContext camelContext;
060        private final Lock lock = new ReentrantLock();
061        private ThrottlingScope scope = ThrottlingScope.Route;
062        private int maxInflightExchanges = 1000;
063        private int resumePercentOfMax = 70;
064        private int resumeInflightExchanges = 700;
065        private LoggingLevel loggingLevel = LoggingLevel.INFO;
066        private CamelLogger logger;
067    
068        public ThrottlingInflightRoutePolicy() {
069        }
070    
071        @Override
072        public String toString() {
073            return "ThrottlingInflightRoutePolicy[" + maxInflightExchanges + " / " + resumePercentOfMax + "% using scope " + scope + "]";
074        }
075    
076        public CamelContext getCamelContext() {
077            return camelContext;
078        }
079    
080        public void setCamelContext(CamelContext camelContext) {
081            this.camelContext = camelContext;
082        }
083    
084        @Override
085        public void onInit(Route route) {
086            // we need to remember the routes we apply for
087            routes.add(route);
088        }
089    
090        @Override
091        public void onExchangeDone(Route route, Exchange exchange) {
092            // if route scoped then throttle directly
093            // as context scoped is handled using an EventNotifier instead
094            if (scope == ThrottlingScope.Route) {
095                throttle(route, exchange);
096            }
097        }
098    
099        /**
100         * Throttles the route when {@link Exchange}s is done.
101         *
102         * @param route  the route
103         * @param exchange the exchange
104         */
105        protected void throttle(Route route, Exchange exchange) {
106            // this works the best when this logic is executed when the exchange is done
107            Consumer consumer = route.getConsumer();
108    
109            int size = getSize(route, exchange);
110            boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges;
111            if (log.isTraceEnabled()) {
112                log.trace("{} > 0 && {} > {} evaluated as {}", new Object[]{maxInflightExchanges, size, maxInflightExchanges, stop});
113            }
114            if (stop) {
115                try {
116                    lock.lock();
117                    stopConsumer(size, consumer);
118                } catch (Exception e) {
119                    handleException(e);
120                } finally {
121                    lock.unlock();
122                }
123            }
124    
125            // reload size in case a race condition with too many at once being invoked
126            // so we need to ensure that we read the most current size and start the consumer if we are already to low
127            size = getSize(route, exchange);
128            boolean start = size <= resumeInflightExchanges;
129            if (log.isTraceEnabled()) {
130                log.trace("{} <= {} evaluated as {}", new Object[]{size, resumeInflightExchanges, start});
131            }
132            if (start) {
133                try {
134                    lock.lock();
135                    startConsumer(size, consumer);
136                } catch (Exception e) {
137                    handleException(e);
138                } finally {
139                    lock.unlock();
140                }
141            }
142        }
143    
144        public int getMaxInflightExchanges() {
145            return maxInflightExchanges;
146        }
147    
148        /**
149         * Sets the upper limit of number of concurrent inflight exchanges at which point reached
150         * the throttler should suspend the route.
151         * <p/>
152         * Is default 1000.
153         *
154         * @param maxInflightExchanges the upper limit of concurrent inflight exchanges
155         */
156        public void setMaxInflightExchanges(int maxInflightExchanges) {
157            this.maxInflightExchanges = maxInflightExchanges;
158            // recalculate, must be at least at 1
159            this.resumeInflightExchanges = Math.max(resumePercentOfMax * maxInflightExchanges / 100, 1);
160        }
161    
162        public int getResumePercentOfMax() {
163            return resumePercentOfMax;
164        }
165    
166        /**
167         * Sets at which percentage of the max the throttler should start resuming the route.
168         * <p/>
169         * Will by default use 70%.
170         *
171         * @param resumePercentOfMax the percentage must be between 0 and 100
172         */
173        public void setResumePercentOfMax(int resumePercentOfMax) {
174            if (resumePercentOfMax < 0 || resumePercentOfMax > 100) {
175                throw new IllegalArgumentException("Must be a percentage between 0 and 100, was: " + resumePercentOfMax);
176            }
177    
178            this.resumePercentOfMax = resumePercentOfMax;
179            // recalculate, must be at least at 1
180            this.resumeInflightExchanges = Math.max(resumePercentOfMax * maxInflightExchanges / 100, 1);
181        }
182    
183        public ThrottlingScope getScope() {
184            return scope;
185        }
186    
187        /**
188         * Sets which scope the throttling should be based upon, either route or total scoped.
189         *
190         * @param scope the scope
191         */
192        public void setScope(ThrottlingScope scope) {
193            this.scope = scope;
194        }
195    
196        public LoggingLevel getLoggingLevel() {
197            return loggingLevel;
198        }
199    
200        public CamelLogger getLogger() {
201            if (logger == null) {
202                logger = createLogger();
203            }
204            return logger;
205        }
206    
207        /**
208         * Sets the logger to use for logging throttling activity.
209         *
210         * @param logger the logger
211         */
212        public void setLogger(CamelLogger logger) {
213            this.logger = logger;
214        }
215    
216        /**
217         * Sets the logging level to report the throttling activity.
218         * <p/>
219         * Is default <tt>INFO</tt> level.
220         *
221         * @param loggingLevel the logging level
222         */
223        public void setLoggingLevel(LoggingLevel loggingLevel) {
224            this.loggingLevel = loggingLevel;
225        }
226    
227        protected CamelLogger createLogger() {
228            return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel());
229        }
230    
231        private int getSize(Route route, Exchange exchange) {
232            if (scope == ThrottlingScope.Context) {
233                return exchange.getContext().getInflightRepository().size();
234            } else {
235                return exchange.getContext().getInflightRepository().size(route.getId());
236            }
237        }
238    
239        private void startConsumer(int size, Consumer consumer) throws Exception {
240            boolean started = super.startConsumer(consumer);
241            if (started) {
242                getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer: " + consumer);
243            }
244        }
245    
246        private void stopConsumer(int size, Consumer consumer) throws Exception {
247            boolean stopped = super.stopConsumer(consumer);
248            if (stopped) {
249                getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer: " + consumer);
250            }
251        }
252    
253        @Override
254        protected void doStart() throws Exception {
255            ObjectHelper.notNull(camelContext, "CamelContext", this);
256            if (scope == ThrottlingScope.Context) {
257                eventNotifier = new ContextScopedEventNotifier();
258                // must start the notifier before it can be used
259                ServiceHelper.startService(eventNotifier);
260                // we are in context scope, so we need to use an event notifier to keep track
261                // when any exchanges is done on the camel context.
262                // This ensures we can trigger accordingly to context scope
263                camelContext.getManagementStrategy().addEventNotifier(eventNotifier);
264            }
265        }
266    
267        @Override
268        protected void doStop() throws Exception {
269            ObjectHelper.notNull(camelContext, "CamelContext", this);
270            if (scope == ThrottlingScope.Context) {
271                camelContext.getManagementStrategy().removeEventNotifier(eventNotifier);
272            }
273        }
274    
275        /**
276         * {@link org.apache.camel.spi.EventNotifier} to keep track on when {@link Exchange}
277         * is done, so we can throttle accordingly.
278         */
279        private class ContextScopedEventNotifier extends EventNotifierSupport {
280    
281            @Override
282            public void notify(EventObject event) throws Exception {
283                ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) event;
284                for (Route route : routes) {
285                    throttle(route, completedEvent.getExchange());
286                }
287            }
288    
289            @Override
290            public boolean isEnabled(EventObject event) {
291                return event instanceof ExchangeCompletedEvent;
292            }
293    
294            @Override
295            protected void doStart() throws Exception {
296                // noop
297            }
298    
299            @Override
300            protected void doStop() throws Exception {
301                // noop
302            }
303    
304            @Override
305            public String toString() {
306                return "ContextScopedEventNotifier";
307            }
308        }
309    
310    }