001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.LinkedHashSet; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 028 import org.apache.camel.CamelContext; 029 import org.apache.camel.Channel; 030 import org.apache.camel.Consumer; 031 import org.apache.camel.Processor; 032 import org.apache.camel.Route; 033 import org.apache.camel.RouteAware; 034 import org.apache.camel.Service; 035 import org.apache.camel.model.OnCompletionDefinition; 036 import org.apache.camel.model.OnExceptionDefinition; 037 import org.apache.camel.model.ProcessorDefinition; 038 import org.apache.camel.model.RouteDefinition; 039 import org.apache.camel.processor.ErrorHandler; 040 import org.apache.camel.spi.LifecycleStrategy; 041 import org.apache.camel.spi.RouteContext; 042 import org.apache.camel.spi.RoutePolicy; 043 import org.apache.camel.support.ChildServiceSupport; 044 import org.apache.camel.util.EventHelper; 045 import org.apache.camel.util.ServiceHelper; 046 import org.slf4j.Logger; 047 import org.slf4j.LoggerFactory; 048 049 /** 050 * Represents the runtime objects for a given {@link RouteDefinition} so that it can be stopped independently 051 * of other routes 052 * 053 * @version 054 */ 055 public class RouteService extends ChildServiceSupport { 056 057 private static final Logger LOG = LoggerFactory.getLogger(RouteService.class); 058 059 private final DefaultCamelContext camelContext; 060 private final RouteDefinition routeDefinition; 061 private final List<RouteContext> routeContexts; 062 private final List<Route> routes; 063 private final String id; 064 private boolean removingRoutes; 065 private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>(); 066 private final AtomicBoolean warmUpDone = new AtomicBoolean(false); 067 private final AtomicBoolean endpointDone = new AtomicBoolean(false); 068 069 public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) { 070 this.camelContext = camelContext; 071 this.routeDefinition = routeDefinition; 072 this.routeContexts = routeContexts; 073 this.routes = routes; 074 this.id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory()); 075 } 076 077 public String getId() { 078 return id; 079 } 080 081 public CamelContext getCamelContext() { 082 return camelContext; 083 } 084 085 public List<RouteContext> getRouteContexts() { 086 return routeContexts; 087 } 088 089 public RouteDefinition getRouteDefinition() { 090 return routeDefinition; 091 } 092 093 public Collection<Route> getRoutes() { 094 return routes; 095 } 096 097 /** 098 * Gets the inputs to the routes. 099 * 100 * @return list of {@link Consumer} as inputs for the routes 101 */ 102 public Map<Route, Consumer> getInputs() { 103 return inputs; 104 } 105 106 public boolean isRemovingRoutes() { 107 return removingRoutes; 108 } 109 110 public void setRemovingRoutes(boolean removingRoutes) { 111 this.removingRoutes = removingRoutes; 112 } 113 114 public synchronized void warmUp() throws Exception { 115 if (endpointDone.compareAndSet(false, true)) { 116 // endpoints should only be started once as they can be reused on other routes 117 // and whatnot, thus their lifecycle is to start once, and only to stop when Camel shutdown 118 for (Route route : routes) { 119 // ensure endpoint is started first (before the route services, such as the consumer) 120 ServiceHelper.startService(route.getEndpoint()); 121 } 122 } 123 124 if (warmUpDone.compareAndSet(false, true)) { 125 126 for (Route route : routes) { 127 // warm up the route first 128 route.warmUp(); 129 130 LOG.debug("Starting services on route: {}", route.getId()); 131 List<Service> services = route.getServices(); 132 133 // callback that we are staring these services 134 route.onStartingServices(services); 135 136 // gather list of services to start as we need to start child services as well 137 Set<Service> list = new LinkedHashSet<Service>(); 138 for (Service service : services) { 139 list.addAll(ServiceHelper.getChildServices(service)); 140 } 141 142 // split into consumers and child services as we need to start the consumers 143 // afterwards to avoid them being active while the others start 144 List<Service> childServices = new ArrayList<Service>(); 145 for (Service service : list) { 146 147 // inject the route 148 if (service instanceof RouteAware) { 149 ((RouteAware) service).setRoute(route); 150 } 151 152 if (service instanceof Consumer) { 153 inputs.put(route, (Consumer) service); 154 } else { 155 childServices.add(service); 156 } 157 } 158 startChildService(route, childServices); 159 } 160 161 // ensure lifecycle strategy is invoked which among others enlist the route in JMX 162 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 163 strategy.onRoutesAdd(routes); 164 } 165 166 // add routes to camel context 167 camelContext.addRouteCollection(routes); 168 } 169 } 170 171 protected void doStart() throws Exception { 172 // ensure we are warmed up before starting the route 173 warmUp(); 174 175 for (Route route : routes) { 176 // start the route itself 177 ServiceHelper.startService(route); 178 179 // invoke callbacks on route policy 180 if (route.getRouteContext().getRoutePolicyList() != null) { 181 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 182 routePolicy.onStart(route); 183 } 184 } 185 186 // fire event 187 EventHelper.notifyRouteStarted(camelContext, route); 188 } 189 } 190 191 protected void doStop() throws Exception { 192 193 // if we are stopping CamelContext then we are shutting down 194 boolean isShutdownCamelContext = camelContext.isStopping(); 195 196 if (isShutdownCamelContext || isRemovingRoutes()) { 197 // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown 198 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 199 strategy.onRoutesRemove(routes); 200 } 201 } 202 203 for (Route route : routes) { 204 LOG.debug("Stopping services on route: {}", route.getId()); 205 206 // gather list of services to stop as we need to start child services as well 207 List<Service> services = new ArrayList<Service>(); 208 services.addAll(route.getServices()); 209 // also get route scoped services 210 doGetRouteScopedServices(services, route); 211 Set<Service> list = new LinkedHashSet<Service>(); 212 for (Service service : services) { 213 list.addAll(ServiceHelper.getChildServices(service)); 214 } 215 // also get route scoped error handler (which must be done last) 216 doGetRouteScopedErrorHandler(list, route); 217 218 // stop services 219 stopChildService(route, list, isShutdownCamelContext); 220 221 // stop the route itself 222 if (isShutdownCamelContext) { 223 ServiceHelper.stopAndShutdownServices(route); 224 } else { 225 ServiceHelper.stopServices(route); 226 } 227 228 // invoke callbacks on route policy 229 if (route.getRouteContext().getRoutePolicyList() != null) { 230 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 231 routePolicy.onStop(route); 232 } 233 } 234 // fire event 235 EventHelper.notifyRouteStopped(camelContext, route); 236 } 237 if (isRemovingRoutes()) { 238 camelContext.removeRouteCollection(routes); 239 } 240 // need to warm up again 241 warmUpDone.set(false); 242 } 243 244 @Override 245 protected void doShutdown() throws Exception { 246 for (Route route : routes) { 247 LOG.debug("Shutting down services on route: {}", route.getId()); 248 249 // gather list of services to stop as we need to start child services as well 250 List<Service> services = new ArrayList<Service>(); 251 services.addAll(route.getServices()); 252 // also get route scoped services 253 doGetRouteScopedServices(services, route); 254 Set<Service> list = new LinkedHashSet<Service>(); 255 for (Service service : services) { 256 list.addAll(ServiceHelper.getChildServices(service)); 257 } 258 // also get route scoped error handler (which must be done last) 259 doGetRouteScopedErrorHandler(list, route); 260 261 // shutdown services 262 stopChildService(route, list, true); 263 264 // shutdown the route itself 265 ServiceHelper.stopAndShutdownServices(route); 266 267 // endpoints should only be stopped when Camel is shutting down 268 // see more details in the warmUp method 269 ServiceHelper.stopAndShutdownServices(route.getEndpoint()); 270 // invoke callbacks on route policy 271 if (route.getRouteContext().getRoutePolicyList() != null) { 272 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 273 routePolicy.onRemove(route); 274 } 275 } 276 } 277 278 // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown 279 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 280 strategy.onRoutesRemove(routes); 281 } 282 283 // remove the routes from the inflight registry 284 for (Route route : routes) { 285 camelContext.getInflightRepository().removeRoute(route.getId()); 286 } 287 288 // remove the routes from the collections 289 camelContext.removeRouteCollection(routes); 290 291 // clear inputs on shutdown 292 inputs.clear(); 293 warmUpDone.set(false); 294 endpointDone.set(false); 295 } 296 297 @Override 298 protected void doSuspend() throws Exception { 299 // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy 300 // to safely suspend and resume 301 for (Route route : routes) { 302 if (route.getRouteContext().getRoutePolicyList() != null) { 303 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 304 routePolicy.onSuspend(route); 305 } 306 } 307 } 308 } 309 310 @Override 311 protected void doResume() throws Exception { 312 // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy 313 // to safely suspend and resume 314 for (Route route : routes) { 315 if (route.getRouteContext().getRoutePolicyList() != null) { 316 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 317 routePolicy.onResume(route); 318 } 319 } 320 } 321 } 322 323 protected void startChildService(Route route, List<Service> services) throws Exception { 324 for (Service service : services) { 325 LOG.debug("Starting child service on route: {} -> {}", route.getId(), service); 326 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 327 strategy.onServiceAdd(camelContext, service, route); 328 } 329 ServiceHelper.startService(service); 330 addChildService(service); 331 } 332 } 333 334 protected void stopChildService(Route route, Set<Service> services, boolean shutdown) throws Exception { 335 for (Service service : services) { 336 LOG.debug("{} child service on route: {} -> {}", new Object[]{shutdown ? "Shutting down" : "Stopping", route.getId(), service}); 337 if (service instanceof ErrorHandler) { 338 // special for error handlers 339 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 340 strategy.onErrorHandlerRemove(route.getRouteContext(), (Processor) service, route.getRouteContext().getRoute().getErrorHandlerBuilder()); 341 } 342 } else { 343 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 344 strategy.onServiceRemove(camelContext, service, route); 345 } 346 } 347 if (shutdown) { 348 ServiceHelper.stopAndShutdownService(service); 349 } else { 350 ServiceHelper.stopService(service); 351 } 352 removeChildService(service); 353 } 354 } 355 356 /** 357 * Gather the route scoped error handler from the given route 358 */ 359 private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) { 360 // only include error handlers if they are route scoped 361 boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext()); 362 List<Service> extra = new ArrayList<Service>(); 363 if (includeErrorHandler) { 364 for (Service service : services) { 365 if (service instanceof Channel) { 366 Processor eh = ((Channel) service).getErrorHandler(); 367 if (eh != null && eh instanceof Service) { 368 extra.add((Service) eh); 369 } 370 } 371 } 372 } 373 if (!extra.isEmpty()) { 374 services.addAll(extra); 375 } 376 } 377 378 /** 379 * Gather all other kind of route scoped services from the given route, except error handler 380 */ 381 private void doGetRouteScopedServices(List<Service> services, Route route) { 382 for (ProcessorDefinition<?> output : route.getRouteContext().getRoute().getOutputs()) { 383 if (output instanceof OnExceptionDefinition) { 384 OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output; 385 if (onExceptionDefinition.isRouteScoped()) { 386 Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId()); 387 if (errorHandler != null && errorHandler instanceof Service) { 388 services.add((Service) errorHandler); 389 } 390 } 391 } else if (output instanceof OnCompletionDefinition) { 392 OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output; 393 if (onCompletionDefinition.isRouteScoped()) { 394 Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId()); 395 if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) { 396 services.add((Service) onCompletionProcessor); 397 } 398 } 399 } 400 } 401 } 402 403 }