001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.management; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.HashSet; 023 import java.util.Iterator; 024 import java.util.List; 025 import java.util.Map; 026 import java.util.Set; 027 import java.util.concurrent.ThreadPoolExecutor; 028 import javax.management.JMException; 029 import javax.management.MalformedObjectNameException; 030 import javax.management.ObjectName; 031 032 import org.apache.camel.CamelContext; 033 import org.apache.camel.CamelContextAware; 034 import org.apache.camel.Channel; 035 import org.apache.camel.Component; 036 import org.apache.camel.Consumer; 037 import org.apache.camel.Endpoint; 038 import org.apache.camel.ErrorHandlerFactory; 039 import org.apache.camel.ManagementStatisticsLevel; 040 import org.apache.camel.Processor; 041 import org.apache.camel.Producer; 042 import org.apache.camel.Route; 043 import org.apache.camel.Service; 044 import org.apache.camel.StartupListener; 045 import org.apache.camel.TimerListener; 046 import org.apache.camel.VetoCamelContextStartException; 047 import org.apache.camel.api.management.PerformanceCounter; 048 import org.apache.camel.impl.ConsumerCache; 049 import org.apache.camel.impl.DefaultCamelContext; 050 import org.apache.camel.impl.EndpointRegistry; 051 import org.apache.camel.impl.EventDrivenConsumerRoute; 052 import org.apache.camel.impl.ProducerCache; 053 import org.apache.camel.impl.ThrottlingInflightRoutePolicy; 054 import org.apache.camel.management.mbean.ManagedBacklogDebugger; 055 import org.apache.camel.management.mbean.ManagedBacklogTracer; 056 import org.apache.camel.management.mbean.ManagedCamelContext; 057 import org.apache.camel.management.mbean.ManagedConsumerCache; 058 import org.apache.camel.management.mbean.ManagedEndpoint; 059 import org.apache.camel.management.mbean.ManagedEndpointRegistry; 060 import org.apache.camel.management.mbean.ManagedProducerCache; 061 import org.apache.camel.management.mbean.ManagedRoute; 062 import org.apache.camel.management.mbean.ManagedService; 063 import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; 064 import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; 065 import org.apache.camel.management.mbean.ManagedTracer; 066 import org.apache.camel.management.mbean.ManagedTypeConverterRegistry; 067 import org.apache.camel.model.AOPDefinition; 068 import org.apache.camel.model.InterceptDefinition; 069 import org.apache.camel.model.OnCompletionDefinition; 070 import org.apache.camel.model.OnExceptionDefinition; 071 import org.apache.camel.model.PolicyDefinition; 072 import org.apache.camel.model.ProcessorDefinition; 073 import org.apache.camel.model.ProcessorDefinitionHelper; 074 import org.apache.camel.model.RouteDefinition; 075 import org.apache.camel.processor.CamelInternalProcessor; 076 import org.apache.camel.processor.interceptor.BacklogDebugger; 077 import org.apache.camel.processor.interceptor.BacklogTracer; 078 import org.apache.camel.processor.interceptor.Tracer; 079 import org.apache.camel.spi.EventNotifier; 080 import org.apache.camel.spi.LifecycleStrategy; 081 import org.apache.camel.spi.ManagementAgent; 082 import org.apache.camel.spi.ManagementAware; 083 import org.apache.camel.spi.ManagementNameStrategy; 084 import org.apache.camel.spi.ManagementObjectStrategy; 085 import org.apache.camel.spi.ManagementStrategy; 086 import org.apache.camel.spi.RouteContext; 087 import org.apache.camel.spi.StreamCachingStrategy; 088 import org.apache.camel.spi.TypeConverterRegistry; 089 import org.apache.camel.spi.UnitOfWork; 090 import org.apache.camel.support.ServiceSupport; 091 import org.apache.camel.support.TimerListenerManager; 092 import org.apache.camel.util.KeyValueHolder; 093 import org.apache.camel.util.ObjectHelper; 094 import org.slf4j.Logger; 095 import org.slf4j.LoggerFactory; 096 097 /** 098 * Default JMX managed lifecycle strategy that registered objects using the configured 099 * {@link org.apache.camel.spi.ManagementStrategy}. 100 * 101 * @see org.apache.camel.spi.ManagementStrategy 102 * @version 103 */ 104 @SuppressWarnings("deprecation") 105 public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware { 106 107 private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class); 108 // the wrapped processors is for performance counters, which are in use for the created routes 109 // when a route is removed, we should remove the associated processors from this map 110 private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors = 111 new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>(); 112 private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>(); 113 private final TimerListenerManager timerListenerManager = new TimerListenerManager(); 114 private final TimerListenerManagerStartupListener timerManagerStartupListener = new TimerListenerManagerStartupListener(); 115 private volatile CamelContext camelContext; 116 private volatile ManagedCamelContext camelContextMBean; 117 private volatile boolean initialized; 118 private final Set<String> knowRouteIds = new HashSet<String>(); 119 private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>(); 120 private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<BacklogTracer, ManagedBacklogTracer>(); 121 private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<BacklogDebugger, ManagedBacklogDebugger>(); 122 private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>(); 123 124 public DefaultManagementLifecycleStrategy() { 125 } 126 127 public DefaultManagementLifecycleStrategy(CamelContext camelContext) { 128 this.camelContext = camelContext; 129 } 130 131 public CamelContext getCamelContext() { 132 return camelContext; 133 } 134 135 public void setCamelContext(CamelContext camelContext) { 136 this.camelContext = camelContext; 137 } 138 139 public void onContextStart(CamelContext context) throws VetoCamelContextStartException { 140 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 141 142 String name = context.getName(); 143 String managementName = context.getManagementNameStrategy().getName(); 144 145 try { 146 boolean done = false; 147 while (!done) { 148 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name); 149 boolean exists = getManagementStrategy().isManaged(mc, on); 150 if (!exists) { 151 done = true; 152 } else { 153 // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name 154 boolean fixed = false; 155 // if we use the default name strategy we can find a free name to use 156 String newName = findFreeName(mc, context.getManagementNameStrategy(), name); 157 if (newName != null) { 158 // use this as the fixed name 159 fixed = true; 160 done = true; 161 managementName = newName; 162 } 163 // we could not fix it so veto starting camel 164 if (!fixed) { 165 throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered." 166 + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context); 167 } else { 168 LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName 169 + " due to clash with an existing name already registered in MBeanServer."); 170 } 171 } 172 } 173 } catch (VetoCamelContextStartException e) { 174 // rethrow veto 175 throw e; 176 } catch (Exception e) { 177 // must rethrow to allow CamelContext fallback to non JMX agent to allow 178 // Camel to continue to run 179 throw ObjectHelper.wrapRuntimeCamelException(e); 180 } 181 182 // set the name we are going to use 183 if (context instanceof DefaultCamelContext) { 184 ((DefaultCamelContext) context).setManagementName(managementName); 185 } 186 187 try { 188 manageObject(mc); 189 } catch (Exception e) { 190 // must rethrow to allow CamelContext fallback to non JMX agent to allow 191 // Camel to continue to run 192 throw ObjectHelper.wrapRuntimeCamelException(e); 193 } 194 195 // yes we made it and are initialized 196 initialized = true; 197 198 if (mc instanceof ManagedCamelContext) { 199 camelContextMBean = (ManagedCamelContext) mc; 200 } 201 202 // register any pre registered now that we are initialized 203 enlistPreRegisteredServices(); 204 } 205 206 private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException { 207 // we cannot find a free name for fixed named strategies 208 if (strategy.isFixedName()) { 209 return null; 210 } 211 212 // okay try to find a free name 213 boolean done = false; 214 String newName = null; 215 while (!done) { 216 // compute the next name 217 newName = strategy.getNextName(); 218 ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name); 219 done = !getManagementStrategy().isManaged(mc, on); 220 if (LOG.isTraceEnabled()) { 221 LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done}); 222 } 223 } 224 return newName; 225 } 226 227 /** 228 * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)} 229 * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be 230 * enlisted first. 231 * <p/> 232 * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as 233 * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those 234 * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started. 235 */ 236 private void enlistPreRegisteredServices() { 237 if (preServices.isEmpty()) { 238 return; 239 } 240 241 LOG.debug("Registering {} pre registered services", preServices.size()); 242 for (PreRegisterService pre : preServices) { 243 if (pre.getComponent() != null) { 244 onComponentAdd(pre.getName(), pre.getComponent()); 245 } else if (pre.getEndpoint() != null) { 246 onEndpointAdd(pre.getEndpoint()); 247 } else if (pre.getService() != null) { 248 onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute()); 249 } 250 } 251 252 // we are done so clear the list 253 preServices.clear(); 254 } 255 256 public void onContextStop(CamelContext context) { 257 // the agent hasn't been started 258 if (!initialized) { 259 return; 260 } 261 try { 262 Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context); 263 // the context could have been removed already 264 if (getManagementStrategy().isManaged(mc, null)) { 265 unmanageObject(mc); 266 } 267 } catch (Exception e) { 268 LOG.warn("Could not unregister CamelContext MBean", e); 269 } 270 271 camelContextMBean = null; 272 } 273 274 public void onComponentAdd(String name, Component component) { 275 // always register components as there are only a few of those 276 if (!initialized) { 277 // pre register so we can register later when we have been initialized 278 PreRegisterService pre = new PreRegisterService(); 279 pre.onComponentAdd(name, component); 280 preServices.add(pre); 281 return; 282 } 283 try { 284 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 285 manageObject(mc); 286 } catch (Exception e) { 287 LOG.warn("Could not register Component MBean", e); 288 } 289 } 290 291 public void onComponentRemove(String name, Component component) { 292 // the agent hasn't been started 293 if (!initialized) { 294 return; 295 } 296 try { 297 Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name); 298 unmanageObject(mc); 299 } catch (Exception e) { 300 LOG.warn("Could not unregister Component MBean", e); 301 } 302 } 303 304 /** 305 * If the endpoint is an instance of ManagedResource then register it with the 306 * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and 307 * register that with the mbean server. 308 * 309 * @param endpoint the Endpoint attempted to be added 310 */ 311 public void onEndpointAdd(Endpoint endpoint) { 312 if (!initialized) { 313 // pre register so we can register later when we have been initialized 314 PreRegisterService pre = new PreRegisterService(); 315 pre.onEndpointAdd(endpoint); 316 preServices.add(pre); 317 return; 318 } 319 320 if (!shouldRegister(endpoint, null)) { 321 // avoid registering if not needed 322 return; 323 } 324 325 try { 326 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 327 if (me == null) { 328 // endpoint should not be managed 329 return; 330 } 331 manageObject(me); 332 } catch (Exception e) { 333 LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 334 } 335 } 336 337 public void onEndpointRemove(Endpoint endpoint) { 338 // the agent hasn't been started 339 if (!initialized) { 340 return; 341 } 342 343 try { 344 Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint); 345 unmanageObject(me); 346 } catch (Exception e) { 347 LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e); 348 } 349 } 350 351 public void onServiceAdd(CamelContext context, Service service, Route route) { 352 if (!initialized) { 353 // pre register so we can register later when we have been initialized 354 PreRegisterService pre = new PreRegisterService(); 355 pre.onServiceAdd(context, service, route); 356 preServices.add(pre); 357 return; 358 } 359 360 // services can by any kind of misc type but also processors 361 // so we have special logic when its a processor 362 363 if (!shouldRegister(service, route)) { 364 // avoid registering if not needed 365 return; 366 } 367 368 Object managedObject = getManagedObjectForService(context, service, route); 369 if (managedObject == null) { 370 // service should not be managed 371 return; 372 } 373 374 // skip already managed services, for example if a route has been restarted 375 if (getManagementStrategy().isManaged(managedObject, null)) { 376 LOG.trace("The service is already managed: {}", service); 377 return; 378 } 379 380 try { 381 manageObject(managedObject); 382 } catch (Exception e) { 383 LOG.warn("Could not register service: " + service + " as Service MBean.", e); 384 } 385 } 386 387 public void onServiceRemove(CamelContext context, Service service, Route route) { 388 // the agent hasn't been started 389 if (!initialized) { 390 return; 391 } 392 393 Object managedObject = getManagedObjectForService(context, service, route); 394 if (managedObject != null) { 395 try { 396 unmanageObject(managedObject); 397 } catch (Exception e) { 398 LOG.warn("Could not unregister service: " + service + " as Service MBean.", e); 399 } 400 } 401 } 402 403 @SuppressWarnings("unchecked") 404 private Object getManagedObjectForService(CamelContext context, Service service, Route route) { 405 // skip channel, UoW and dont double wrap instrumentation 406 if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) { 407 return null; 408 } 409 410 Object answer = null; 411 412 if (service instanceof ManagementAware) { 413 return ((ManagementAware<Service>) service).getManagedObject(service); 414 } else if (service instanceof Tracer) { 415 // special for tracer 416 Tracer tracer = (Tracer) service; 417 ManagedTracer mt = managedTracers.get(tracer); 418 if (mt == null) { 419 mt = new ManagedTracer(context, tracer); 420 mt.init(getManagementStrategy()); 421 managedTracers.put(tracer, mt); 422 } 423 return mt; 424 } else if (service instanceof BacklogTracer) { 425 // special for backlog tracer 426 BacklogTracer backlogTracer = (BacklogTracer) service; 427 ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer); 428 if (mt == null) { 429 mt = new ManagedBacklogTracer(context, backlogTracer); 430 mt.init(getManagementStrategy()); 431 managedBacklogTracers.put(backlogTracer, mt); 432 } 433 return mt; 434 } else if (service instanceof BacklogDebugger) { 435 // special for backlog debugger 436 BacklogDebugger backlogDebugger = (BacklogDebugger) service; 437 ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger); 438 if (md == null) { 439 md = new ManagedBacklogDebugger(context, backlogDebugger); 440 md.init(getManagementStrategy()); 441 managedBacklogDebuggers.put(backlogDebugger, md); 442 } 443 return md; 444 } else if (service instanceof EventNotifier) { 445 answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service); 446 } else if (service instanceof Producer) { 447 answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service); 448 } else if (service instanceof Consumer) { 449 answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service); 450 } else if (service instanceof Processor) { 451 // special for processors as we need to do some extra work 452 return getManagedObjectForProcessor(context, (Processor) service, route); 453 } else if (service instanceof ThrottlingInflightRoutePolicy) { 454 answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service); 455 } else if (service instanceof ConsumerCache) { 456 answer = new ManagedConsumerCache(context, (ConsumerCache) service); 457 } else if (service instanceof ProducerCache) { 458 answer = new ManagedProducerCache(context, (ProducerCache) service); 459 } else if (service instanceof EndpointRegistry) { 460 answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service); 461 } else if (service instanceof TypeConverterRegistry) { 462 answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service); 463 } else if (service instanceof StreamCachingStrategy) { 464 answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service); 465 } else if (service != null) { 466 // fallback as generic service 467 answer = getManagementObjectStrategy().getManagedObjectForService(context, service); 468 } 469 470 if (answer != null && answer instanceof ManagedService) { 471 ManagedService ms = (ManagedService) answer; 472 ms.setRoute(route); 473 ms.init(getManagementStrategy()); 474 } 475 476 return answer; 477 } 478 479 private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) { 480 // a bit of magic here as the processors we want to manage have already been registered 481 // in the wrapped processors map when Camel have instrumented the route on route initialization 482 // so the idea is now to only manage the processors from the map 483 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor); 484 if (holder == null) { 485 // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc. 486 return null; 487 } 488 489 // get the managed object as it can be a specialized type such as a Delayer/Throttler etc. 490 Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route); 491 // only manage if we have a name for it as otherwise we do not want to manage it anyway 492 if (managedObject != null) { 493 // is it a performance counter then we need to set our counter 494 if (managedObject instanceof PerformanceCounter) { 495 InstrumentationProcessor counter = holder.getValue(); 496 if (counter != null) { 497 // change counter to us 498 counter.setCounter(managedObject); 499 } 500 } 501 } 502 503 return managedObject; 504 } 505 506 public void onRoutesAdd(Collection<Route> routes) { 507 for (Route route : routes) { 508 509 // if we are starting CamelContext or either of the two options has been 510 // enabled, then enlist the route as a known route 511 if (getCamelContext().getStatus().isStarting() 512 || getManagementStrategy().getManagementAgent().getRegisterAlways() 513 || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) { 514 // register as known route id 515 knowRouteIds.add(route.getId()); 516 } 517 518 if (!shouldRegister(route, route)) { 519 // avoid registering if not needed, skip to next route 520 continue; 521 } 522 523 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 524 525 // skip already managed routes, for example if the route has been restarted 526 if (getManagementStrategy().isManaged(mr, null)) { 527 LOG.trace("The route is already managed: {}", route); 528 continue; 529 } 530 531 // get the wrapped instrumentation processor from this route 532 // and set me as the counter 533 if (route instanceof EventDrivenConsumerRoute) { 534 EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route; 535 Processor processor = edcr.getProcessor(); 536 if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) { 537 CamelInternalProcessor internal = (CamelInternalProcessor) processor; 538 ManagedRoute routeMBean = (ManagedRoute) mr; 539 540 CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class); 541 if (task != null) { 542 // we need to wrap the counter with the camel context so we get stats updated on the context as well 543 if (camelContextMBean != null) { 544 CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean); 545 task.setCounter(wrapper); 546 } else { 547 task.setCounter(routeMBean); 548 } 549 } 550 } 551 } 552 553 try { 554 manageObject(mr); 555 } catch (JMException e) { 556 LOG.warn("Could not register Route MBean", e); 557 } catch (Exception e) { 558 LOG.warn("Could not create Route MBean", e); 559 } 560 } 561 } 562 563 public void onRoutesRemove(Collection<Route> routes) { 564 // the agent hasn't been started 565 if (!initialized) { 566 return; 567 } 568 569 for (Route route : routes) { 570 Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route); 571 572 // skip unmanaged routes 573 if (!getManagementStrategy().isManaged(mr, null)) { 574 LOG.trace("The route is not managed: {}", route); 575 continue; 576 } 577 578 try { 579 unmanageObject(mr); 580 } catch (Exception e) { 581 LOG.warn("Could not unregister Route MBean", e); 582 } 583 584 // remove from known routes ids, as the route has been removed 585 knowRouteIds.remove(route.getId()); 586 } 587 588 // after the routes has been removed, we should clear the wrapped processors as we no longer need them 589 // as they were just a provisional map used during creation of routes 590 removeWrappedProcessorsForRoutes(routes); 591 } 592 593 public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 594 if (!shouldRegister(errorHandler, null)) { 595 // avoid registering if not needed 596 return; 597 } 598 599 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 600 601 // skip already managed services, for example if a route has been restarted 602 if (getManagementStrategy().isManaged(me, null)) { 603 LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder); 604 return; 605 } 606 607 try { 608 manageObject(me); 609 } catch (Exception e) { 610 LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e); 611 } 612 } 613 614 public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) { 615 if (!initialized) { 616 return; 617 } 618 619 Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder); 620 if (me != null) { 621 try { 622 unmanageObject(me); 623 } catch (Exception e) { 624 LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e); 625 } 626 } 627 } 628 629 public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id, 630 String sourceId, String routeId, String threadPoolProfileId) { 631 632 if (!shouldRegister(threadPool, null)) { 633 // avoid registering if not needed 634 return; 635 } 636 637 Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 638 639 // skip already managed services, for example if a route has been restarted 640 if (getManagementStrategy().isManaged(mtp, null)) { 641 LOG.trace("The thread pool is already managed: {}", threadPool); 642 return; 643 } 644 645 try { 646 manageObject(mtp); 647 // store a reference so we can unmanage from JMX when the thread pool is removed 648 // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool 649 managedThreadPools.put(threadPool, mtp); 650 } catch (Exception e) { 651 LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e); 652 } 653 } 654 655 public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) { 656 if (!initialized) { 657 return; 658 } 659 660 // lookup the thread pool and remove it from JMX 661 Object mtp = managedThreadPools.remove(threadPool); 662 if (mtp != null) { 663 // skip unmanaged routes 664 if (!getManagementStrategy().isManaged(mtp, null)) { 665 LOG.trace("The thread pool is not managed: {}", threadPool); 666 return; 667 } 668 669 try { 670 unmanageObject(mtp); 671 } catch (Exception e) { 672 LOG.warn("Could not unregister ThreadPool MBean", e); 673 } 674 } 675 } 676 677 public void onRouteContextCreate(RouteContext routeContext) { 678 if (!initialized) { 679 return; 680 } 681 682 // Create a map (ProcessorType -> PerformanceCounter) 683 // to be passed to InstrumentationInterceptStrategy. 684 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters = 685 new HashMap<ProcessorDefinition<?>, PerformanceCounter>(); 686 687 // Each processor in a route will have its own performance counter. 688 // These performance counter will be embedded to InstrumentationProcessor 689 // and wrap the appropriate processor by InstrumentationInterceptStrategy. 690 RouteDefinition route = routeContext.getRoute(); 691 692 // register performance counters for all processors and its children 693 for (ProcessorDefinition<?> processor : route.getOutputs()) { 694 registerPerformanceCounters(routeContext, processor, registeredCounters); 695 } 696 697 // set this managed intercept strategy that executes the JMX instrumentation for performance metrics 698 // so our registered counters can be used for fine grained performance instrumentation 699 routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors)); 700 } 701 702 /** 703 * Removes the wrapped processors for the given routes, as they are no longer in use. 704 * <p/> 705 * This is needed to avoid accumulating memory, if a lot of routes is being added and removed. 706 * 707 * @param routes the routes 708 */ 709 private void removeWrappedProcessorsForRoutes(Collection<Route> routes) { 710 // loop the routes, and remove the route associated wrapped processors, as they are no longer in use 711 for (Route route : routes) { 712 String id = route.getId(); 713 714 Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator(); 715 while (it.hasNext()) { 716 KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next(); 717 RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey()); 718 if (def != null && id.equals(def.getId())) { 719 it.remove(); 720 } 721 } 722 } 723 724 } 725 726 private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor, 727 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) { 728 729 // traverse children if any exists 730 List<ProcessorDefinition<?>> children = processor.getOutputs(); 731 for (ProcessorDefinition<?> child : children) { 732 registerPerformanceCounters(routeContext, child, registeredCounters); 733 } 734 735 // skip processors that should not be registered 736 if (!registerProcessor(processor)) { 737 return; 738 } 739 740 // okay this is a processor we would like to manage so create the 741 // a delegate performance counter that acts as the placeholder in the interceptor 742 // that then delegates to the real mbean which we register later in the onServiceAdd method 743 DelegatePerformanceCounter pc = new DelegatePerformanceCounter(); 744 // set statistics enabled depending on the option 745 boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.All; 746 pc.setStatisticsEnabled(enabled); 747 748 // and add it as a a registered counter that will be used lazy when Camel 749 // does the instrumentation of the route and adds the InstrumentationProcessor 750 // that does the actual performance metrics gatherings at runtime 751 registeredCounters.put(processor, pc); 752 } 753 754 /** 755 * Should the given processor be registered. 756 */ 757 protected boolean registerProcessor(ProcessorDefinition<?> processor) { 758 // skip on exception 759 if (processor instanceof OnExceptionDefinition) { 760 return false; 761 } 762 // skip on completion 763 if (processor instanceof OnCompletionDefinition) { 764 return false; 765 } 766 // skip intercept 767 if (processor instanceof InterceptDefinition) { 768 return false; 769 } 770 // skip aop 771 if (processor instanceof AOPDefinition) { 772 return false; 773 } 774 // skip policy 775 if (processor instanceof PolicyDefinition) { 776 return false; 777 } 778 779 // only if custom id assigned 780 if (getManagementStrategy().isOnlyManageProcessorWithCustomId()) { 781 return processor.hasCustomIdAssigned(); 782 } 783 784 // use customer filter 785 return getManagementStrategy().manageProcessor(processor); 786 } 787 788 private ManagementStrategy getManagementStrategy() { 789 ObjectHelper.notNull(camelContext, "CamelContext"); 790 return camelContext.getManagementStrategy(); 791 } 792 793 private ManagementObjectStrategy getManagementObjectStrategy() { 794 ObjectHelper.notNull(camelContext, "CamelContext"); 795 return camelContext.getManagementStrategy().getManagementObjectStrategy(); 796 } 797 798 /** 799 * Strategy for managing the object 800 * 801 * @param me the managed object 802 * @throws Exception is thrown if error registering the object for management 803 */ 804 protected void manageObject(Object me) throws Exception { 805 getManagementStrategy().manageObject(me); 806 if (timerListenerManager != null && me instanceof TimerListener) { 807 TimerListener timer = (TimerListener) me; 808 timerListenerManager.addTimerListener(timer); 809 } 810 } 811 812 /** 813 * Un-manages the object. 814 * 815 * @param me the managed object 816 * @throws Exception is thrown if error unregistering the managed object 817 */ 818 protected void unmanageObject(Object me) throws Exception { 819 if (timerListenerManager != null && me instanceof TimerListener) { 820 TimerListener timer = (TimerListener) me; 821 timerListenerManager.removeTimerListener(timer); 822 } 823 getManagementStrategy().unmanageObject(me); 824 } 825 826 /** 827 * Whether or not to register the mbean. 828 * <p/> 829 * The {@link ManagementAgent} has options which controls when to register. 830 * This allows us to only register mbeans accordingly. For example by default any 831 * dynamic endpoints is not registered. This avoids to register excessive mbeans, which 832 * most often is not desired. 833 * 834 * @param service the object to register 835 * @param route an optional route the mbean is associated with, can be <tt>null</tt> 836 * @return <tt>true</tt> to register, <tt>false</tt> to skip registering 837 */ 838 protected boolean shouldRegister(Object service, Route route) { 839 // the agent hasn't been started 840 if (!initialized) { 841 return false; 842 } 843 844 LOG.trace("Checking whether to register {} from route: {}", service, route); 845 846 ManagementAgent agent = getManagementStrategy().getManagementAgent(); 847 if (agent == null) { 848 // do not register if no agent 849 return false; 850 } 851 852 // always register if we are starting CamelContext 853 if (getCamelContext().getStatus().isStarting()) { 854 return true; 855 } 856 857 // register if always is enabled 858 if (agent.getRegisterAlways()) { 859 return true; 860 } 861 862 // is it a known route then always accept 863 if (route != null && knowRouteIds.contains(route.getId())) { 864 return true; 865 } 866 867 // only register if we are starting a new route, and current thread is in starting routes mode 868 if (agent.getRegisterNewRoutes()) { 869 // no specific route, then fallback to see if this thread is starting routes 870 // which is kept as state on the camel context 871 return getCamelContext().isStartingRoutes(); 872 } 873 874 return false; 875 } 876 877 @Override 878 protected void doStart() throws Exception { 879 ObjectHelper.notNull(camelContext, "CamelContext"); 880 881 // defer starting the timer manager until CamelContext has been fully started 882 camelContext.addStartupListener(timerManagerStartupListener); 883 } 884 885 private final class TimerListenerManagerStartupListener implements StartupListener { 886 887 @Override 888 public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { 889 // we are disabled either if configured explicit, or if level is off 890 boolean disabled = !camelContext.getManagementStrategy().isLoadStatisticsEnabled() 891 || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off; 892 893 LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled"); 894 if (!disabled) { 895 // must use 1 sec interval as the load statistics is based on 1 sec calculations 896 timerListenerManager.setInterval(1000); 897 // we have to defer enlisting timer lister manager as a service until CamelContext has been started 898 getCamelContext().addService(timerListenerManager); 899 } 900 } 901 } 902 903 @Override 904 protected void doStop() throws Exception { 905 initialized = false; 906 knowRouteIds.clear(); 907 preServices.clear(); 908 wrappedProcessors.clear(); 909 managedTracers.clear(); 910 managedBacklogTracers.clear(); 911 managedBacklogDebuggers.clear(); 912 managedThreadPools.clear(); 913 } 914 915 /** 916 * Class which holds any pre registration details. 917 * 918 * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices() 919 */ 920 private static final class PreRegisterService { 921 922 private String name; 923 private Component component; 924 private Endpoint endpoint; 925 private CamelContext camelContext; 926 private Service service; 927 private Route route; 928 929 public void onComponentAdd(String name, Component component) { 930 this.name = name; 931 this.component = component; 932 } 933 934 public void onEndpointAdd(Endpoint endpoint) { 935 this.endpoint = endpoint; 936 } 937 938 public void onServiceAdd(CamelContext camelContext, Service service, Route route) { 939 this.camelContext = camelContext; 940 this.service = service; 941 this.route = route; 942 } 943 944 public String getName() { 945 return name; 946 } 947 948 public Component getComponent() { 949 return component; 950 } 951 952 public Endpoint getEndpoint() { 953 return endpoint; 954 } 955 956 public CamelContext getCamelContext() { 957 return camelContext; 958 } 959 960 public Service getService() { 961 return service; 962 } 963 964 public Route getRoute() { 965 return route; 966 } 967 } 968 969 } 970