001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Set;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import javax.management.JMException;
029    import javax.management.MalformedObjectNameException;
030    import javax.management.ObjectName;
031    
032    import org.apache.camel.CamelContext;
033    import org.apache.camel.CamelContextAware;
034    import org.apache.camel.Channel;
035    import org.apache.camel.Component;
036    import org.apache.camel.Consumer;
037    import org.apache.camel.Endpoint;
038    import org.apache.camel.ErrorHandlerFactory;
039    import org.apache.camel.ManagementStatisticsLevel;
040    import org.apache.camel.Processor;
041    import org.apache.camel.Producer;
042    import org.apache.camel.Route;
043    import org.apache.camel.Service;
044    import org.apache.camel.StartupListener;
045    import org.apache.camel.TimerListener;
046    import org.apache.camel.VetoCamelContextStartException;
047    import org.apache.camel.api.management.PerformanceCounter;
048    import org.apache.camel.impl.ConsumerCache;
049    import org.apache.camel.impl.DefaultCamelContext;
050    import org.apache.camel.impl.EndpointRegistry;
051    import org.apache.camel.impl.EventDrivenConsumerRoute;
052    import org.apache.camel.impl.ProducerCache;
053    import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
054    import org.apache.camel.management.mbean.ManagedBacklogDebugger;
055    import org.apache.camel.management.mbean.ManagedBacklogTracer;
056    import org.apache.camel.management.mbean.ManagedCamelContext;
057    import org.apache.camel.management.mbean.ManagedConsumerCache;
058    import org.apache.camel.management.mbean.ManagedEndpoint;
059    import org.apache.camel.management.mbean.ManagedEndpointRegistry;
060    import org.apache.camel.management.mbean.ManagedProducerCache;
061    import org.apache.camel.management.mbean.ManagedRoute;
062    import org.apache.camel.management.mbean.ManagedService;
063    import org.apache.camel.management.mbean.ManagedStreamCachingStrategy;
064    import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy;
065    import org.apache.camel.management.mbean.ManagedTracer;
066    import org.apache.camel.management.mbean.ManagedTypeConverterRegistry;
067    import org.apache.camel.model.AOPDefinition;
068    import org.apache.camel.model.InterceptDefinition;
069    import org.apache.camel.model.OnCompletionDefinition;
070    import org.apache.camel.model.OnExceptionDefinition;
071    import org.apache.camel.model.PolicyDefinition;
072    import org.apache.camel.model.ProcessorDefinition;
073    import org.apache.camel.model.ProcessorDefinitionHelper;
074    import org.apache.camel.model.RouteDefinition;
075    import org.apache.camel.processor.CamelInternalProcessor;
076    import org.apache.camel.processor.interceptor.BacklogDebugger;
077    import org.apache.camel.processor.interceptor.BacklogTracer;
078    import org.apache.camel.processor.interceptor.Tracer;
079    import org.apache.camel.spi.EventNotifier;
080    import org.apache.camel.spi.LifecycleStrategy;
081    import org.apache.camel.spi.ManagementAgent;
082    import org.apache.camel.spi.ManagementAware;
083    import org.apache.camel.spi.ManagementNameStrategy;
084    import org.apache.camel.spi.ManagementObjectStrategy;
085    import org.apache.camel.spi.ManagementStrategy;
086    import org.apache.camel.spi.RouteContext;
087    import org.apache.camel.spi.StreamCachingStrategy;
088    import org.apache.camel.spi.TypeConverterRegistry;
089    import org.apache.camel.spi.UnitOfWork;
090    import org.apache.camel.support.ServiceSupport;
091    import org.apache.camel.support.TimerListenerManager;
092    import org.apache.camel.util.KeyValueHolder;
093    import org.apache.camel.util.ObjectHelper;
094    import org.slf4j.Logger;
095    import org.slf4j.LoggerFactory;
096    
097    /**
098     * Default JMX managed lifecycle strategy that registered objects using the configured
099     * {@link org.apache.camel.spi.ManagementStrategy}.
100     *
101     * @see org.apache.camel.spi.ManagementStrategy
102     * @version 
103     */
104    @SuppressWarnings("deprecation")
105    public class DefaultManagementLifecycleStrategy extends ServiceSupport implements LifecycleStrategy, CamelContextAware {
106    
107        private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementLifecycleStrategy.class);
108        // the wrapped processors is for performance counters, which are in use for the created routes
109        // when a route is removed, we should remove the associated processors from this map
110        private final Map<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> wrappedProcessors =
111                new HashMap<Processor, KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>>();
112        private final List<PreRegisterService> preServices = new ArrayList<PreRegisterService>();
113        private final TimerListenerManager timerListenerManager = new TimerListenerManager();
114        private final TimerListenerManagerStartupListener timerManagerStartupListener = new TimerListenerManagerStartupListener();
115        private volatile CamelContext camelContext;
116        private volatile ManagedCamelContext camelContextMBean;
117        private volatile boolean initialized;
118        private final Set<String> knowRouteIds = new HashSet<String>();
119        private final Map<Tracer, ManagedTracer> managedTracers = new HashMap<Tracer, ManagedTracer>();
120        private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<BacklogTracer, ManagedBacklogTracer>();
121        private final Map<BacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<BacklogDebugger, ManagedBacklogDebugger>();
122        private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<ThreadPoolExecutor, Object>();
123    
124        public DefaultManagementLifecycleStrategy() {
125        }
126    
127        public DefaultManagementLifecycleStrategy(CamelContext camelContext) {
128            this.camelContext = camelContext;
129        }
130    
131        public CamelContext getCamelContext() {
132            return camelContext;
133        }
134    
135        public void setCamelContext(CamelContext camelContext) {
136            this.camelContext = camelContext;
137        }
138    
139        public void onContextStart(CamelContext context) throws VetoCamelContextStartException {
140            Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
141    
142            String name = context.getName();
143            String managementName = context.getManagementNameStrategy().getName();
144    
145            try {
146                boolean done = false;
147                while (!done) {
148                    ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(managementName, name);
149                    boolean exists = getManagementStrategy().isManaged(mc, on);
150                    if (!exists) {
151                        done = true;
152                    } else {
153                        // okay there exists already a CamelContext with this name, we can try to fix it by finding a free name
154                        boolean fixed = false;
155                        // if we use the default name strategy we can find a free name to use
156                        String newName = findFreeName(mc, context.getManagementNameStrategy(), name);
157                        if (newName != null) {
158                            // use this as the fixed name
159                            fixed = true;
160                            done = true;
161                            managementName = newName;
162                        }
163                        // we could not fix it so veto starting camel
164                        if (!fixed) {
165                            throw new VetoCamelContextStartException("CamelContext (" + context.getName() + ") with ObjectName[" + on + "] is already registered."
166                                + " Make sure to use unique names on CamelContext when using multiple CamelContexts in the same MBeanServer.", context);
167                        } else {
168                            LOG.warn("This CamelContext(" + context.getName() + ") will be registered using the name: " + managementName
169                                + " due to clash with an existing name already registered in MBeanServer.");
170                        }
171                    }
172                }
173            } catch (VetoCamelContextStartException e) {
174                // rethrow veto
175                throw e;
176            } catch (Exception e) {
177                // must rethrow to allow CamelContext fallback to non JMX agent to allow
178                // Camel to continue to run
179                throw ObjectHelper.wrapRuntimeCamelException(e);
180            }
181    
182            // set the name we are going to use
183            if (context instanceof DefaultCamelContext) {
184                ((DefaultCamelContext) context).setManagementName(managementName);
185            }
186    
187            try {
188                manageObject(mc);
189            } catch (Exception e) {
190                // must rethrow to allow CamelContext fallback to non JMX agent to allow
191                // Camel to continue to run
192                throw ObjectHelper.wrapRuntimeCamelException(e);
193            }
194    
195            // yes we made it and are initialized
196            initialized = true;
197    
198            if (mc instanceof ManagedCamelContext) {
199                camelContextMBean = (ManagedCamelContext) mc;
200            }
201    
202            // register any pre registered now that we are initialized
203            enlistPreRegisteredServices();
204        }
205    
206        private String findFreeName(Object mc, ManagementNameStrategy strategy, String name) throws MalformedObjectNameException {
207            // we cannot find a free name for fixed named strategies
208            if (strategy.isFixedName()) {
209                return null;
210            }
211    
212            // okay try to find a free name
213            boolean done = false;
214            String newName = null;
215            while (!done) {
216                // compute the next name
217                newName = strategy.getNextName();
218                ObjectName on = getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(newName, name);
219                done = !getManagementStrategy().isManaged(mc, on);
220                if (LOG.isTraceEnabled()) {
221                    LOG.trace("Using name: {} in ObjectName[{}] exists? {}", new Object[]{name, on, done});
222                }
223            }
224            return newName;
225        }
226    
227        /**
228         * After {@link CamelContext} has been enlisted in JMX using {@link #onContextStart(org.apache.camel.CamelContext)}
229         * then we can enlist any pre registered services as well, as we had to wait for {@link CamelContext} to be
230         * enlisted first.
231         * <p/>
232         * A component/endpoint/service etc. can be pre registered when using dependency injection and annotations such as
233         * {@link org.apache.camel.Produce}, {@link org.apache.camel.EndpointInject}. Therefore we need to capture those
234         * registrations up front, and then afterwards enlist in JMX when {@link CamelContext} is being started.
235         */
236        private void enlistPreRegisteredServices() {
237            if (preServices.isEmpty()) {
238                return;
239            }
240    
241            LOG.debug("Registering {} pre registered services", preServices.size());
242            for (PreRegisterService pre : preServices) {
243                if (pre.getComponent() != null) {
244                    onComponentAdd(pre.getName(), pre.getComponent());
245                } else if (pre.getEndpoint() != null) {
246                    onEndpointAdd(pre.getEndpoint());
247                } else if (pre.getService() != null) {
248                    onServiceAdd(pre.getCamelContext(), pre.getService(), pre.getRoute());
249                }
250            }
251    
252            // we are done so clear the list
253            preServices.clear();
254        }
255    
256        public void onContextStop(CamelContext context) {
257            // the agent hasn't been started
258            if (!initialized) {
259                return;
260            }
261            try {
262                Object mc = getManagementObjectStrategy().getManagedObjectForCamelContext(context);
263                // the context could have been removed already
264                if (getManagementStrategy().isManaged(mc, null)) {
265                    unmanageObject(mc);
266                }
267            } catch (Exception e) {
268                LOG.warn("Could not unregister CamelContext MBean", e);
269            }
270    
271            camelContextMBean = null;
272        }
273    
274        public void onComponentAdd(String name, Component component) {
275            // always register components as there are only a few of those
276            if (!initialized) {
277                // pre register so we can register later when we have been initialized
278                PreRegisterService pre = new PreRegisterService();
279                pre.onComponentAdd(name, component);
280                preServices.add(pre);
281                return;
282            }
283            try {
284                Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
285                manageObject(mc);
286            } catch (Exception e) {
287                LOG.warn("Could not register Component MBean", e);
288            }
289        }
290    
291        public void onComponentRemove(String name, Component component) {
292            // the agent hasn't been started
293            if (!initialized) {
294                return;
295            }
296            try {
297                Object mc = getManagementObjectStrategy().getManagedObjectForComponent(camelContext, component, name);
298                unmanageObject(mc);
299            } catch (Exception e) {
300                LOG.warn("Could not unregister Component MBean", e);
301            }
302        }
303    
304        /**
305         * If the endpoint is an instance of ManagedResource then register it with the
306         * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
307         * register that with the mbean server.
308         *
309         * @param endpoint the Endpoint attempted to be added
310         */
311        public void onEndpointAdd(Endpoint endpoint) {
312            if (!initialized) {
313                // pre register so we can register later when we have been initialized
314                PreRegisterService pre = new PreRegisterService();
315                pre.onEndpointAdd(endpoint);
316                preServices.add(pre);
317                return;
318            }
319    
320            if (!shouldRegister(endpoint, null)) {
321                // avoid registering if not needed
322                return;
323            }
324    
325            try {
326                Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
327                if (me == null) {
328                    // endpoint should not be managed
329                    return;
330                }
331                manageObject(me);
332            } catch (Exception e) {
333                LOG.warn("Could not register Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
334            }
335        }
336    
337        public void onEndpointRemove(Endpoint endpoint) {
338            // the agent hasn't been started
339            if (!initialized) {
340                return;
341            }
342    
343            try {
344                Object me = getManagementObjectStrategy().getManagedObjectForEndpoint(camelContext, endpoint);
345                unmanageObject(me);
346            } catch (Exception e) {
347                LOG.warn("Could not unregister Endpoint MBean for endpoint: " + endpoint + ". This exception will be ignored.", e);
348            }
349        }
350    
351        public void onServiceAdd(CamelContext context, Service service, Route route) {
352            if (!initialized) {
353                // pre register so we can register later when we have been initialized
354                PreRegisterService pre = new PreRegisterService();
355                pre.onServiceAdd(context, service, route);
356                preServices.add(pre);
357                return;
358            }
359    
360            // services can by any kind of misc type but also processors
361            // so we have special logic when its a processor
362    
363            if (!shouldRegister(service, route)) {
364                // avoid registering if not needed
365                return;
366            }
367    
368            Object managedObject = getManagedObjectForService(context, service, route);
369            if (managedObject == null) {
370                // service should not be managed
371                return;
372            }
373    
374            // skip already managed services, for example if a route has been restarted
375            if (getManagementStrategy().isManaged(managedObject, null)) {
376                LOG.trace("The service is already managed: {}", service);
377                return;
378            }
379    
380            try {
381                manageObject(managedObject);
382            } catch (Exception e) {
383                LOG.warn("Could not register service: " + service + " as Service MBean.", e);
384            }
385        }
386    
387        public void onServiceRemove(CamelContext context, Service service, Route route) {
388            // the agent hasn't been started
389            if (!initialized) {
390                return;
391            }
392    
393            Object managedObject = getManagedObjectForService(context, service, route);
394            if (managedObject != null) {
395                try {
396                    unmanageObject(managedObject);
397                } catch (Exception e) {
398                    LOG.warn("Could not unregister service: " + service + " as Service MBean.", e);
399                }
400            }
401        }
402    
403        @SuppressWarnings("unchecked")
404        private Object getManagedObjectForService(CamelContext context, Service service, Route route) {
405            // skip channel, UoW and dont double wrap instrumentation
406            if (service instanceof Channel || service instanceof UnitOfWork || service instanceof InstrumentationProcessor) {
407                return null;
408            }
409    
410            Object answer = null;
411    
412            if (service instanceof ManagementAware) {
413                return ((ManagementAware<Service>) service).getManagedObject(service);
414            } else if (service instanceof Tracer) {
415                // special for tracer
416                Tracer tracer = (Tracer) service;
417                ManagedTracer mt = managedTracers.get(tracer);
418                if (mt == null) {
419                    mt = new ManagedTracer(context, tracer);
420                    mt.init(getManagementStrategy());
421                    managedTracers.put(tracer, mt);
422                }
423                return mt;
424            } else if (service instanceof BacklogTracer) {
425                // special for backlog tracer
426                BacklogTracer backlogTracer = (BacklogTracer) service;
427                ManagedBacklogTracer mt = managedBacklogTracers.get(backlogTracer);
428                if (mt == null) {
429                    mt = new ManagedBacklogTracer(context, backlogTracer);
430                    mt.init(getManagementStrategy());
431                    managedBacklogTracers.put(backlogTracer, mt);
432                }
433                return mt;
434            } else if (service instanceof BacklogDebugger) {
435                // special for backlog debugger
436                BacklogDebugger backlogDebugger = (BacklogDebugger) service;
437                ManagedBacklogDebugger md = managedBacklogDebuggers.get(backlogDebugger);
438                if (md == null) {
439                    md = new ManagedBacklogDebugger(context, backlogDebugger);
440                    md.init(getManagementStrategy());
441                    managedBacklogDebuggers.put(backlogDebugger, md);
442                }
443                return md;
444            } else if (service instanceof EventNotifier) {
445                answer = getManagementObjectStrategy().getManagedObjectForEventNotifier(context, (EventNotifier) service);
446            } else if (service instanceof Producer) {
447                answer = getManagementObjectStrategy().getManagedObjectForProducer(context, (Producer) service);
448            } else if (service instanceof Consumer) {
449                answer = getManagementObjectStrategy().getManagedObjectForConsumer(context, (Consumer) service);
450            } else if (service instanceof Processor) {
451                // special for processors as we need to do some extra work
452                return getManagedObjectForProcessor(context, (Processor) service, route);
453            } else if (service instanceof ThrottlingInflightRoutePolicy) {
454                answer = new ManagedThrottlingInflightRoutePolicy(context, (ThrottlingInflightRoutePolicy) service);
455            } else if (service instanceof ConsumerCache) {
456                answer = new ManagedConsumerCache(context, (ConsumerCache) service);
457            } else if (service instanceof ProducerCache) {
458                answer = new ManagedProducerCache(context, (ProducerCache) service);
459            } else if (service instanceof EndpointRegistry) {
460                answer = new ManagedEndpointRegistry(context, (EndpointRegistry) service);
461            } else if (service instanceof TypeConverterRegistry) {
462                answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
463            } else if (service instanceof StreamCachingStrategy) {
464                answer = new ManagedStreamCachingStrategy(context, (StreamCachingStrategy) service);
465            } else if (service != null) {
466                // fallback as generic service
467                answer = getManagementObjectStrategy().getManagedObjectForService(context, service);
468            }
469    
470            if (answer != null && answer instanceof ManagedService) {
471                ManagedService ms = (ManagedService) answer;
472                ms.setRoute(route);
473                ms.init(getManagementStrategy());
474            }
475    
476            return answer;
477        }
478    
479        private Object getManagedObjectForProcessor(CamelContext context, Processor processor, Route route) {
480            // a bit of magic here as the processors we want to manage have already been registered
481            // in the wrapped processors map when Camel have instrumented the route on route initialization
482            // so the idea is now to only manage the processors from the map
483            KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = wrappedProcessors.get(processor);
484            if (holder == null) {
485                // skip as its not an well known processor we want to manage anyway, such as Channel/UnitOfWork/Pipeline etc.
486                return null;
487            }
488    
489            // get the managed object as it can be a specialized type such as a Delayer/Throttler etc.
490            Object managedObject = getManagementObjectStrategy().getManagedObjectForProcessor(context, processor, holder.getKey(), route);
491            // only manage if we have a name for it as otherwise we do not want to manage it anyway
492            if (managedObject != null) {
493                // is it a performance counter then we need to set our counter
494                if (managedObject instanceof PerformanceCounter) {
495                    InstrumentationProcessor counter = holder.getValue();
496                    if (counter != null) {
497                        // change counter to us
498                        counter.setCounter(managedObject);
499                    }
500                }
501            }
502    
503            return managedObject;
504        }
505    
506        public void onRoutesAdd(Collection<Route> routes) {
507            for (Route route : routes) {
508    
509                // if we are starting CamelContext or either of the two options has been
510                // enabled, then enlist the route as a known route
511                if (getCamelContext().getStatus().isStarting()
512                    || getManagementStrategy().getManagementAgent().getRegisterAlways()
513                    || getManagementStrategy().getManagementAgent().getRegisterNewRoutes()) {
514                    // register as known route id
515                    knowRouteIds.add(route.getId());
516                }
517    
518                if (!shouldRegister(route, route)) {
519                    // avoid registering if not needed, skip to next route
520                    continue;
521                }
522    
523                Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
524    
525                // skip already managed routes, for example if the route has been restarted
526                if (getManagementStrategy().isManaged(mr, null)) {
527                    LOG.trace("The route is already managed: {}", route);
528                    continue;
529                }
530    
531                // get the wrapped instrumentation processor from this route
532                // and set me as the counter
533                if (route instanceof EventDrivenConsumerRoute) {
534                    EventDrivenConsumerRoute edcr = (EventDrivenConsumerRoute) route;
535                    Processor processor = edcr.getProcessor();
536                    if (processor instanceof CamelInternalProcessor && mr instanceof ManagedRoute) {
537                        CamelInternalProcessor internal = (CamelInternalProcessor) processor;
538                        ManagedRoute routeMBean = (ManagedRoute) mr;
539    
540                        CamelInternalProcessor.InstrumentationAdvice task = internal.getAdvice(CamelInternalProcessor.InstrumentationAdvice.class);
541                        if (task != null) {
542                            // we need to wrap the counter with the camel context so we get stats updated on the context as well
543                            if (camelContextMBean != null) {
544                                CompositePerformanceCounter wrapper = new CompositePerformanceCounter(routeMBean, camelContextMBean);
545                                task.setCounter(wrapper);
546                            } else {
547                                task.setCounter(routeMBean);
548                            }
549                        }
550                    }
551                }
552    
553                try {
554                    manageObject(mr);
555                } catch (JMException e) {
556                    LOG.warn("Could not register Route MBean", e);
557                } catch (Exception e) {
558                    LOG.warn("Could not create Route MBean", e);
559                }
560            }
561        }
562    
563        public void onRoutesRemove(Collection<Route> routes) {
564            // the agent hasn't been started
565            if (!initialized) {
566                return;
567            }
568    
569            for (Route route : routes) {
570                Object mr = getManagementObjectStrategy().getManagedObjectForRoute(camelContext, route);
571    
572                // skip unmanaged routes
573                if (!getManagementStrategy().isManaged(mr, null)) {
574                    LOG.trace("The route is not managed: {}", route);
575                    continue;
576                }
577    
578                try {
579                    unmanageObject(mr);
580                } catch (Exception e) {
581                    LOG.warn("Could not unregister Route MBean", e);
582                }
583    
584                // remove from known routes ids, as the route has been removed
585                knowRouteIds.remove(route.getId());
586            }
587    
588            // after the routes has been removed, we should clear the wrapped processors as we no longer need them
589            // as they were just a provisional map used during creation of routes
590            removeWrappedProcessorsForRoutes(routes);
591        }
592    
593        public void onErrorHandlerAdd(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
594            if (!shouldRegister(errorHandler, null)) {
595                // avoid registering if not needed
596                return;
597            }
598    
599            Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
600    
601            // skip already managed services, for example if a route has been restarted
602            if (getManagementStrategy().isManaged(me, null)) {
603                LOG.trace("The error handler builder is already managed: {}", errorHandlerBuilder);
604                return;
605            }
606    
607            try {
608                manageObject(me);
609            } catch (Exception e) {
610                LOG.warn("Could not register error handler builder: " + errorHandlerBuilder + " as ErrorHandler MBean.", e);
611            }
612        }
613    
614        public void onErrorHandlerRemove(RouteContext routeContext, Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
615            if (!initialized) {
616                return;
617            }
618    
619            Object me = getManagementObjectStrategy().getManagedObjectForErrorHandler(camelContext, routeContext, errorHandler, errorHandlerBuilder);
620            if (me != null) {
621                try {
622                    unmanageObject(me);
623                } catch (Exception e) {
624                    LOG.warn("Could not unregister error handler: " + me + " as ErrorHandler MBean.", e);
625                }
626            }
627        }
628    
629        public void onThreadPoolAdd(CamelContext camelContext, ThreadPoolExecutor threadPool, String id,
630                                    String sourceId, String routeId, String threadPoolProfileId) {
631    
632            if (!shouldRegister(threadPool, null)) {
633                // avoid registering if not needed
634                return;
635            }
636    
637            Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
638    
639            // skip already managed services, for example if a route has been restarted
640            if (getManagementStrategy().isManaged(mtp, null)) {
641                LOG.trace("The thread pool is already managed: {}", threadPool);
642                return;
643            }
644    
645            try {
646                manageObject(mtp);
647                // store a reference so we can unmanage from JMX when the thread pool is removed
648                // we need to keep track here, as we cannot re-construct the thread pool ObjectName when removing the thread pool
649                managedThreadPools.put(threadPool, mtp);
650            } catch (Exception e) {
651                LOG.warn("Could not register thread pool: " + threadPool + " as ThreadPool MBean.", e);
652            }
653        }
654    
655        public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool) {
656            if (!initialized) {
657                return;
658            }
659    
660            // lookup the thread pool and remove it from JMX
661            Object mtp = managedThreadPools.remove(threadPool);
662            if (mtp != null) {
663                // skip unmanaged routes
664                if (!getManagementStrategy().isManaged(mtp, null)) {
665                    LOG.trace("The thread pool is not managed: {}", threadPool);
666                    return;
667                }
668    
669                try {
670                    unmanageObject(mtp);
671                } catch (Exception e) {
672                    LOG.warn("Could not unregister ThreadPool MBean", e);
673                }
674            }
675        }
676    
677        public void onRouteContextCreate(RouteContext routeContext) {
678            if (!initialized) {
679                return;
680            }
681    
682            // Create a map (ProcessorType -> PerformanceCounter)
683            // to be passed to InstrumentationInterceptStrategy.
684            Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters =
685                    new HashMap<ProcessorDefinition<?>, PerformanceCounter>();
686    
687            // Each processor in a route will have its own performance counter.
688            // These performance counter will be embedded to InstrumentationProcessor
689            // and wrap the appropriate processor by InstrumentationInterceptStrategy.
690            RouteDefinition route = routeContext.getRoute();
691    
692            // register performance counters for all processors and its children
693            for (ProcessorDefinition<?> processor : route.getOutputs()) {
694                registerPerformanceCounters(routeContext, processor, registeredCounters);
695            }
696    
697            // set this managed intercept strategy that executes the JMX instrumentation for performance metrics
698            // so our registered counters can be used for fine grained performance instrumentation
699            routeContext.setManagedInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters, wrappedProcessors));
700        }
701    
702        /**
703         * Removes the wrapped processors for the given routes, as they are no longer in use.
704         * <p/>
705         * This is needed to avoid accumulating memory, if a lot of routes is being added and removed.
706         *
707         * @param routes the routes
708         */
709        private void removeWrappedProcessorsForRoutes(Collection<Route> routes) {
710            // loop the routes, and remove the route associated wrapped processors, as they are no longer in use
711            for (Route route : routes) {
712                String id = route.getId();
713    
714                Iterator<KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor>> it = wrappedProcessors.values().iterator();
715                while (it.hasNext()) {
716                    KeyValueHolder<ProcessorDefinition<?>, InstrumentationProcessor> holder = it.next();
717                    RouteDefinition def = ProcessorDefinitionHelper.getRoute(holder.getKey());
718                    if (def != null && id.equals(def.getId())) {
719                        it.remove();
720                    }
721                }
722            }
723            
724        }
725    
726        private void registerPerformanceCounters(RouteContext routeContext, ProcessorDefinition<?> processor,
727                                                 Map<ProcessorDefinition<?>, PerformanceCounter> registeredCounters) {
728    
729            // traverse children if any exists
730            List<ProcessorDefinition<?>> children = processor.getOutputs();
731            for (ProcessorDefinition<?> child : children) {
732                registerPerformanceCounters(routeContext, child, registeredCounters);
733            }
734    
735            // skip processors that should not be registered
736            if (!registerProcessor(processor)) {
737                return;
738            }
739    
740            // okay this is a processor we would like to manage so create the
741            // a delegate performance counter that acts as the placeholder in the interceptor
742            // that then delegates to the real mbean which we register later in the onServiceAdd method
743            DelegatePerformanceCounter pc = new DelegatePerformanceCounter();
744            // set statistics enabled depending on the option
745            boolean enabled = camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.All;
746            pc.setStatisticsEnabled(enabled);
747    
748            // and add it as a a registered counter that will be used lazy when Camel
749            // does the instrumentation of the route and adds the InstrumentationProcessor
750            // that does the actual performance metrics gatherings at runtime
751            registeredCounters.put(processor, pc);
752        }
753    
754        /**
755         * Should the given processor be registered.
756         */
757        protected boolean registerProcessor(ProcessorDefinition<?> processor) {
758            // skip on exception
759            if (processor instanceof OnExceptionDefinition) {
760                return false;
761            }
762            // skip on completion
763            if (processor instanceof OnCompletionDefinition) {
764                return false;
765            }
766            // skip intercept
767            if (processor instanceof InterceptDefinition) {
768                return false;
769            }
770            // skip aop
771            if (processor instanceof AOPDefinition) {
772                return false;
773            }
774            // skip policy
775            if (processor instanceof PolicyDefinition) {
776                return false;
777            }
778    
779            // only if custom id assigned
780            if (getManagementStrategy().isOnlyManageProcessorWithCustomId()) {
781                return processor.hasCustomIdAssigned();
782            }
783    
784            // use customer filter
785            return getManagementStrategy().manageProcessor(processor);
786        }
787    
788        private ManagementStrategy getManagementStrategy() {
789            ObjectHelper.notNull(camelContext, "CamelContext");
790            return camelContext.getManagementStrategy();
791        }
792    
793        private ManagementObjectStrategy getManagementObjectStrategy() {
794            ObjectHelper.notNull(camelContext, "CamelContext");
795            return camelContext.getManagementStrategy().getManagementObjectStrategy();
796        }
797    
798        /**
799         * Strategy for managing the object
800         *
801         * @param me the managed object
802         * @throws Exception is thrown if error registering the object for management
803         */
804        protected void manageObject(Object me) throws Exception {
805            getManagementStrategy().manageObject(me);
806            if (timerListenerManager != null && me instanceof TimerListener) {
807                TimerListener timer = (TimerListener) me;
808                timerListenerManager.addTimerListener(timer);
809            }
810        }
811    
812        /**
813         * Un-manages the object.
814         *
815         * @param me the managed object
816         * @throws Exception is thrown if error unregistering the managed object
817         */
818        protected void unmanageObject(Object me) throws Exception {
819            if (timerListenerManager != null && me instanceof TimerListener) {
820                TimerListener timer = (TimerListener) me;
821                timerListenerManager.removeTimerListener(timer);
822            }
823            getManagementStrategy().unmanageObject(me);
824        }
825    
826        /**
827         * Whether or not to register the mbean.
828         * <p/>
829         * The {@link ManagementAgent} has options which controls when to register.
830         * This allows us to only register mbeans accordingly. For example by default any
831         * dynamic endpoints is not registered. This avoids to register excessive mbeans, which
832         * most often is not desired.
833         *
834         * @param service the object to register
835         * @param route   an optional route the mbean is associated with, can be <tt>null</tt>
836         * @return <tt>true</tt> to register, <tt>false</tt> to skip registering
837         */
838        protected boolean shouldRegister(Object service, Route route) {
839            // the agent hasn't been started
840            if (!initialized) {
841                return false;
842            }
843    
844            LOG.trace("Checking whether to register {} from route: {}", service, route);
845    
846            ManagementAgent agent = getManagementStrategy().getManagementAgent();
847            if (agent == null) {
848                // do not register if no agent
849                return false;
850            }
851    
852            // always register if we are starting CamelContext
853            if (getCamelContext().getStatus().isStarting()) {
854                return true;
855            }
856    
857            // register if always is enabled
858            if (agent.getRegisterAlways()) {
859                return true;
860            }
861    
862            // is it a known route then always accept
863            if (route != null && knowRouteIds.contains(route.getId())) {
864                return true;
865            }
866    
867            // only register if we are starting a new route, and current thread is in starting routes mode
868            if (agent.getRegisterNewRoutes()) {
869                // no specific route, then fallback to see if this thread is starting routes
870                // which is kept as state on the camel context
871                return getCamelContext().isStartingRoutes();
872            }
873    
874            return false;
875        }
876    
877        @Override
878        protected void doStart() throws Exception {
879            ObjectHelper.notNull(camelContext, "CamelContext");
880    
881            // defer starting the timer manager until CamelContext has been fully started
882            camelContext.addStartupListener(timerManagerStartupListener);
883        }
884    
885        private final class TimerListenerManagerStartupListener implements StartupListener {
886    
887            @Override
888            public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
889                // we are disabled either if configured explicit, or if level is off
890                boolean disabled = !camelContext.getManagementStrategy().isLoadStatisticsEnabled()
891                        || camelContext.getManagementStrategy().getStatisticsLevel() == ManagementStatisticsLevel.Off;
892    
893                LOG.debug("Load performance statistics {}", disabled ? "disabled" : "enabled");
894                if (!disabled) {
895                    // must use 1 sec interval as the load statistics is based on 1 sec calculations
896                    timerListenerManager.setInterval(1000);
897                    // we have to defer enlisting timer lister manager as a service until CamelContext has been started
898                    getCamelContext().addService(timerListenerManager);
899                }
900            }
901        }
902    
903        @Override
904        protected void doStop() throws Exception {
905            initialized = false;
906            knowRouteIds.clear();
907            preServices.clear();
908            wrappedProcessors.clear();
909            managedTracers.clear();
910            managedBacklogTracers.clear();
911            managedBacklogDebuggers.clear();
912            managedThreadPools.clear();
913        }
914    
915        /**
916         * Class which holds any pre registration details.
917         *
918         * @see org.apache.camel.management.DefaultManagementLifecycleStrategy#enlistPreRegisteredServices()
919         */
920        private static final class PreRegisterService {
921    
922            private String name;
923            private Component component;
924            private Endpoint endpoint;
925            private CamelContext camelContext;
926            private Service service;
927            private Route route;
928    
929            public void onComponentAdd(String name, Component component) {
930                this.name = name;
931                this.component = component;
932            }
933    
934            public void onEndpointAdd(Endpoint endpoint) {
935                this.endpoint = endpoint;
936            }
937    
938            public void onServiceAdd(CamelContext camelContext, Service service, Route route) {
939                this.camelContext = camelContext;
940                this.service = service;
941                this.route = route;
942            }
943    
944            public String getName() {
945                return name;
946            }
947    
948            public Component getComponent() {
949                return component;
950            }
951    
952            public Endpoint getEndpoint() {
953                return endpoint;
954            }
955    
956            public CamelContext getCamelContext() {
957                return camelContext;
958            }
959    
960            public Service getService() {
961                return service;
962            }
963    
964            public Route getRoute() {
965                return route;
966            }
967        }
968    
969    }
970