001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.LinkedHashSet;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import org.apache.camel.CamelContext;
029    import org.apache.camel.Channel;
030    import org.apache.camel.Consumer;
031    import org.apache.camel.Processor;
032    import org.apache.camel.Route;
033    import org.apache.camel.RouteAware;
034    import org.apache.camel.Service;
035    import org.apache.camel.model.OnCompletionDefinition;
036    import org.apache.camel.model.OnExceptionDefinition;
037    import org.apache.camel.model.ProcessorDefinition;
038    import org.apache.camel.model.RouteDefinition;
039    import org.apache.camel.processor.ErrorHandler;
040    import org.apache.camel.spi.LifecycleStrategy;
041    import org.apache.camel.spi.RouteContext;
042    import org.apache.camel.spi.RoutePolicy;
043    import org.apache.camel.support.ChildServiceSupport;
044    import org.apache.camel.util.EventHelper;
045    import org.apache.camel.util.ServiceHelper;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * Represents the runtime objects for a given {@link RouteDefinition} so that it can be stopped independently
051     * of other routes
052     *
053     * @version 
054     */
055    public class RouteService extends ChildServiceSupport {
056    
057        private static final Logger LOG = LoggerFactory.getLogger(RouteService.class);
058    
059        private final DefaultCamelContext camelContext;
060        private final RouteDefinition routeDefinition;
061        private final List<RouteContext> routeContexts;
062        private final List<Route> routes;
063        private final String id;
064        private boolean removingRoutes;
065        private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>();
066        private final AtomicBoolean warmUpDone = new AtomicBoolean(false);
067        private final AtomicBoolean endpointDone = new AtomicBoolean(false);
068    
069        public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) {
070            this.camelContext = camelContext;
071            this.routeDefinition = routeDefinition;
072            this.routeContexts = routeContexts;
073            this.routes = routes;
074            this.id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory());
075        }
076    
077        public String getId() {
078            return id;
079        }
080    
081        public CamelContext getCamelContext() {
082            return camelContext;
083        }
084    
085        public List<RouteContext> getRouteContexts() {
086            return routeContexts;
087        }
088    
089        public RouteDefinition getRouteDefinition() {
090            return routeDefinition;
091        }
092    
093        public Collection<Route> getRoutes() {
094            return routes;
095        }
096    
097        /**
098         * Gets the inputs to the routes.
099         *
100         * @return list of {@link Consumer} as inputs for the routes
101         */
102        public Map<Route, Consumer> getInputs() {
103            return inputs;
104        }
105    
106        public boolean isRemovingRoutes() {
107            return removingRoutes;
108        }
109    
110        public void setRemovingRoutes(boolean removingRoutes) {
111            this.removingRoutes = removingRoutes;
112        }
113    
114        public synchronized void warmUp() throws Exception {
115            if (endpointDone.compareAndSet(false, true)) {
116                // endpoints should only be started once as they can be reused on other routes
117                // and whatnot, thus their lifecycle is to start once, and only to stop when Camel shutdown
118                for (Route route : routes) {
119                    // ensure endpoint is started first (before the route services, such as the consumer)
120                    ServiceHelper.startService(route.getEndpoint());
121                }
122            }
123    
124            if (warmUpDone.compareAndSet(false, true)) {
125    
126                for (Route route : routes) {
127                    // warm up the route first
128                    route.warmUp();
129    
130                    LOG.debug("Starting services on route: {}", route.getId());
131                    List<Service> services = route.getServices();
132    
133                    // callback that we are staring these services
134                    route.onStartingServices(services);
135    
136                    // gather list of services to start as we need to start child services as well
137                    Set<Service> list = new LinkedHashSet<Service>();
138                    for (Service service : services) {
139                        list.addAll(ServiceHelper.getChildServices(service));
140                    }
141    
142                    // split into consumers and child services as we need to start the consumers
143                    // afterwards to avoid them being active while the others start
144                    List<Service> childServices = new ArrayList<Service>();
145                    for (Service service : list) {
146    
147                        // inject the route
148                        if (service instanceof RouteAware) {
149                            ((RouteAware) service).setRoute(route);
150                        }
151    
152                        if (service instanceof Consumer) {
153                            inputs.put(route, (Consumer) service);
154                        } else {
155                            childServices.add(service);
156                        }
157                    }
158                    startChildService(route, childServices);
159                }
160    
161                // ensure lifecycle strategy is invoked which among others enlist the route in JMX
162                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
163                    strategy.onRoutesAdd(routes);
164                }
165    
166                // add routes to camel context
167                camelContext.addRouteCollection(routes);
168            }
169        }
170    
171        protected void doStart() throws Exception {
172            // ensure we are warmed up before starting the route
173            warmUp();
174    
175            for (Route route : routes) {
176                // start the route itself
177                ServiceHelper.startService(route);
178    
179                // invoke callbacks on route policy
180                if (route.getRouteContext().getRoutePolicyList() != null) {
181                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
182                        routePolicy.onStart(route);
183                    }
184                }
185    
186                // fire event
187                EventHelper.notifyRouteStarted(camelContext, route);
188            }
189        }
190    
191        protected void doStop() throws Exception {
192    
193            // if we are stopping CamelContext then we are shutting down
194            boolean isShutdownCamelContext = camelContext.isStopping();
195    
196            if (isShutdownCamelContext || isRemovingRoutes()) {
197                // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown
198                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
199                    strategy.onRoutesRemove(routes);
200                }
201            }
202            
203            for (Route route : routes) {
204                LOG.debug("Stopping services on route: {}", route.getId());
205    
206                // gather list of services to stop as we need to start child services as well
207                List<Service> services = new ArrayList<Service>();
208                services.addAll(route.getServices());
209                // also get route scoped services
210                doGetRouteScopedServices(services, route);
211                Set<Service> list = new LinkedHashSet<Service>();
212                for (Service service : services) {
213                    list.addAll(ServiceHelper.getChildServices(service));
214                }
215                // also get route scoped error handler (which must be done last)
216                doGetRouteScopedErrorHandler(list, route);
217    
218                // stop services
219                stopChildService(route, list, isShutdownCamelContext);
220    
221                // stop the route itself
222                if (isShutdownCamelContext) {
223                    ServiceHelper.stopAndShutdownServices(route);
224                } else {
225                    ServiceHelper.stopServices(route);
226                }
227    
228                // invoke callbacks on route policy
229                if (route.getRouteContext().getRoutePolicyList() != null) {
230                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
231                        routePolicy.onStop(route);
232                    }
233                }
234                // fire event
235                EventHelper.notifyRouteStopped(camelContext, route);
236            }
237            if (isRemovingRoutes()) {
238                camelContext.removeRouteCollection(routes);
239            }
240            // need to warm up again
241            warmUpDone.set(false);
242        }
243    
244        @Override
245        protected void doShutdown() throws Exception {
246            for (Route route : routes) {
247                LOG.debug("Shutting down services on route: {}", route.getId());
248    
249                // gather list of services to stop as we need to start child services as well
250                List<Service> services = new ArrayList<Service>();
251                services.addAll(route.getServices());
252                // also get route scoped services
253                doGetRouteScopedServices(services, route);
254                Set<Service> list = new LinkedHashSet<Service>();
255                for (Service service : services) {
256                    list.addAll(ServiceHelper.getChildServices(service));
257                }
258                // also get route scoped error handler (which must be done last)
259                doGetRouteScopedErrorHandler(list, route);
260    
261                // shutdown services
262                stopChildService(route, list, true);
263    
264                // shutdown the route itself
265                ServiceHelper.stopAndShutdownServices(route);
266    
267                // endpoints should only be stopped when Camel is shutting down
268                // see more details in the warmUp method
269                ServiceHelper.stopAndShutdownServices(route.getEndpoint());
270                // invoke callbacks on route policy
271                if (route.getRouteContext().getRoutePolicyList() != null) {
272                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
273                        routePolicy.onRemove(route);
274                    }
275                }
276            }
277    
278            // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown
279            for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
280                strategy.onRoutesRemove(routes);
281            }
282            
283            // remove the routes from the inflight registry
284            for (Route route : routes) {
285                camelContext.getInflightRepository().removeRoute(route.getId());
286            }
287    
288            // remove the routes from the collections
289            camelContext.removeRouteCollection(routes);
290            
291            // clear inputs on shutdown
292            inputs.clear();
293            warmUpDone.set(false);
294            endpointDone.set(false);
295        }
296    
297        @Override
298        protected void doSuspend() throws Exception {
299            // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
300            // to safely suspend and resume
301            for (Route route : routes) {
302                if (route.getRouteContext().getRoutePolicyList() != null) {
303                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
304                        routePolicy.onSuspend(route);
305                    }
306                }
307            }
308        }
309    
310        @Override
311        protected void doResume() throws Exception {
312            // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
313            // to safely suspend and resume
314            for (Route route : routes) {
315                if (route.getRouteContext().getRoutePolicyList() != null) {
316                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
317                        routePolicy.onResume(route);
318                    }
319                }
320            }
321        }
322    
323        protected void startChildService(Route route, List<Service> services) throws Exception {
324            for (Service service : services) {
325                LOG.debug("Starting child service on route: {} -> {}", route.getId(), service);
326                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
327                    strategy.onServiceAdd(camelContext, service, route);
328                }
329                ServiceHelper.startService(service);
330                addChildService(service);
331            }
332        }
333    
334        protected void stopChildService(Route route, Set<Service> services, boolean shutdown) throws Exception {
335            for (Service service : services) {
336                LOG.debug("{} child service on route: {} -> {}", new Object[]{shutdown ? "Shutting down" : "Stopping", route.getId(), service});
337                if (service instanceof ErrorHandler) {
338                    // special for error handlers
339                    for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
340                        strategy.onErrorHandlerRemove(route.getRouteContext(), (Processor) service, route.getRouteContext().getRoute().getErrorHandlerBuilder());
341                    }
342                } else {
343                    for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
344                        strategy.onServiceRemove(camelContext, service, route);
345                    }
346                }
347                if (shutdown) {
348                    ServiceHelper.stopAndShutdownService(service);
349                } else {
350                    ServiceHelper.stopService(service);
351                }
352                removeChildService(service);
353            }
354        }
355    
356        /**
357         * Gather the route scoped error handler from the given route
358         */
359        private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) {
360            // only include error handlers if they are route scoped
361            boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext());
362            List<Service> extra = new ArrayList<Service>();
363            if (includeErrorHandler) {
364                for (Service service : services) {
365                    if (service instanceof Channel) {
366                        Processor eh = ((Channel) service).getErrorHandler();
367                        if (eh != null && eh instanceof Service) {
368                            extra.add((Service) eh);
369                        }
370                    }
371                }
372            }
373            if (!extra.isEmpty()) {
374                services.addAll(extra);
375            }
376        }
377    
378        /**
379         * Gather all other kind of route scoped services from the given route, except error handler
380         */
381        private void doGetRouteScopedServices(List<Service> services, Route route) {
382            for (ProcessorDefinition<?> output : route.getRouteContext().getRoute().getOutputs()) {
383                if (output instanceof OnExceptionDefinition) {
384                    OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output;
385                    if (onExceptionDefinition.isRouteScoped()) {
386                        Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId());
387                        if (errorHandler != null && errorHandler instanceof Service) {
388                            services.add((Service) errorHandler);
389                        }
390                    }
391                } else if (output instanceof OnCompletionDefinition) {
392                    OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output;
393                    if (onCompletionDefinition.isRouteScoped()) {
394                        Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId());
395                        if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) {
396                            services.add((Service) onCompletionProcessor);
397                        }
398                    }
399                }
400            }
401        }
402    
403    }