001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.util.concurrent.ThreadPoolExecutor;
020    
021    import org.apache.camel.CamelContext;
022    import org.apache.camel.Component;
023    import org.apache.camel.Consumer;
024    import org.apache.camel.DelegateProcessor;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.ErrorHandlerFactory;
027    import org.apache.camel.Processor;
028    import org.apache.camel.Producer;
029    import org.apache.camel.Route;
030    import org.apache.camel.Service;
031    import org.apache.camel.component.bean.BeanProcessor;
032    import org.apache.camel.component.log.LogEndpoint;
033    import org.apache.camel.impl.ScheduledPollConsumer;
034    import org.apache.camel.management.mbean.ManagedBeanProcessor;
035    import org.apache.camel.management.mbean.ManagedBrowsableEndpoint;
036    import org.apache.camel.management.mbean.ManagedCamelContext;
037    import org.apache.camel.management.mbean.ManagedComponent;
038    import org.apache.camel.management.mbean.ManagedConsumer;
039    import org.apache.camel.management.mbean.ManagedDelayer;
040    import org.apache.camel.management.mbean.ManagedEndpoint;
041    import org.apache.camel.management.mbean.ManagedErrorHandler;
042    import org.apache.camel.management.mbean.ManagedEventNotifier;
043    import org.apache.camel.management.mbean.ManagedIdempotentConsumer;
044    import org.apache.camel.management.mbean.ManagedProcessor;
045    import org.apache.camel.management.mbean.ManagedProducer;
046    import org.apache.camel.management.mbean.ManagedRoute;
047    import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
048    import org.apache.camel.management.mbean.ManagedSendProcessor;
049    import org.apache.camel.management.mbean.ManagedService;
050    import org.apache.camel.management.mbean.ManagedSuspendableRoute;
051    import org.apache.camel.management.mbean.ManagedThreadPool;
052    import org.apache.camel.management.mbean.ManagedThrottler;
053    import org.apache.camel.management.mbean.ManagedThroughputLogger;
054    import org.apache.camel.model.ModelCamelContext;
055    import org.apache.camel.model.ProcessorDefinition;
056    import org.apache.camel.processor.Delayer;
057    import org.apache.camel.processor.ErrorHandler;
058    import org.apache.camel.processor.SendProcessor;
059    import org.apache.camel.processor.Throttler;
060    import org.apache.camel.processor.ThroughputLogger;
061    import org.apache.camel.processor.idempotent.IdempotentConsumer;
062    import org.apache.camel.spi.BrowsableEndpoint;
063    import org.apache.camel.spi.EventNotifier;
064    import org.apache.camel.spi.ManagementObjectStrategy;
065    import org.apache.camel.spi.RouteContext;
066    
067    /**
068     *
069     */
070    public class DefaultManagementObjectStrategy implements ManagementObjectStrategy {
071    
072        public Object getManagedObjectForCamelContext(CamelContext context) {
073            ManagedCamelContext mc = new ManagedCamelContext((ModelCamelContext)context);
074            mc.init(context.getManagementStrategy());
075            return mc;
076        }
077    
078        @SuppressWarnings({"deprecation", "unchecked"})
079        public Object getManagedObjectForComponent(CamelContext context, Component component, String name) {
080            if (component instanceof org.apache.camel.spi.ManagementAware) {
081                return ((org.apache.camel.spi.ManagementAware<Component>) component).getManagedObject(component);
082            } else {
083                ManagedComponent mc = new ManagedComponent(name, component);
084                mc.init(context.getManagementStrategy());
085                return mc;
086            }
087        }
088    
089        @SuppressWarnings({"deprecation", "unchecked"})
090        public Object getManagedObjectForEndpoint(CamelContext context, Endpoint endpoint) {
091            // we only want to manage singleton endpoints
092            if (!endpoint.isSingleton()) {
093                return null;
094            }
095    
096            if (endpoint instanceof org.apache.camel.spi.ManagementAware) {
097                return ((org.apache.camel.spi.ManagementAware<Endpoint>) endpoint).getManagedObject(endpoint);
098            } else if (endpoint instanceof BrowsableEndpoint) {
099                ManagedBrowsableEndpoint me = new ManagedBrowsableEndpoint((BrowsableEndpoint) endpoint);
100                me.init(context.getManagementStrategy());
101                return me;
102            } else {
103                ManagedEndpoint me = new ManagedEndpoint(endpoint);
104                me.init(context.getManagementStrategy());
105                return me;
106            }
107        }
108    
109        public Object getManagedObjectForErrorHandler(CamelContext context, RouteContext routeContext,
110                                                      Processor errorHandler, ErrorHandlerFactory errorHandlerBuilder) {
111            ManagedErrorHandler me = new ManagedErrorHandler(routeContext, errorHandler, errorHandlerBuilder);
112            me.init(context.getManagementStrategy());
113            return me;
114        }
115    
116        public Object getManagedObjectForRoute(CamelContext context, Route route) {
117            ManagedRoute mr;
118            if (route.supportsSuspension()) {
119                mr = new ManagedSuspendableRoute((ModelCamelContext)context, route);
120            } else {
121                mr = new ManagedRoute((ModelCamelContext)context, route);
122            }
123            mr.init(context.getManagementStrategy());
124            return mr;
125        }
126    
127        public Object getManagedObjectForThreadPool(CamelContext context, ThreadPoolExecutor threadPool,
128                                                    String id, String sourceId, String routeId, String threadPoolProfileId) {
129            ManagedThreadPool mtp = new ManagedThreadPool(context, threadPool, id, sourceId, routeId, threadPoolProfileId);
130            mtp.init(context.getManagementStrategy());
131            return mtp;
132        }
133    
134        public Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier) {
135            ManagedEventNotifier men = new ManagedEventNotifier(context, eventNotifier);
136            men.init(context.getManagementStrategy());
137            return men;
138        }
139    
140        public Object getManagedObjectForConsumer(CamelContext context, Consumer consumer) {
141            ManagedConsumer mc;
142            if (consumer instanceof ScheduledPollConsumer) {
143                mc = new ManagedScheduledPollConsumer(context, (ScheduledPollConsumer) consumer);
144            } else {
145                mc = new ManagedConsumer(context, consumer);
146            }
147            mc.init(context.getManagementStrategy());
148            return mc;
149        }
150    
151        public Object getManagedObjectForProducer(CamelContext context, Producer producer) {
152            ManagedProducer mp = new ManagedProducer(context, producer);
153            mp.init(context.getManagementStrategy());
154            return mp;
155        }
156    
157        public Object getManagedObjectForService(CamelContext context, Service service) {
158            ManagedService mc = new ManagedService(context, service);
159            mc.init(context.getManagementStrategy());
160            return mc;
161        }
162    
163        @SuppressWarnings({"deprecation", "unchecked"})
164        public Object getManagedObjectForProcessor(CamelContext context, Processor processor,
165                                                   ProcessorDefinition<?> definition, Route route) {
166            ManagedProcessor answer = null;
167    
168            // unwrap delegates as we want the real target processor
169            Processor target = processor;
170            while (target != null) {
171    
172                // skip error handlers
173                if (target instanceof ErrorHandler) {
174                    return false;
175                }
176    
177                // look for specialized processor which we should prefer to use
178                if (target instanceof Delayer) {
179                    answer = new ManagedDelayer(context, (Delayer) target, definition);
180                } else if (target instanceof Throttler) {
181                    answer = new ManagedThrottler(context, (Throttler) target, definition);
182                } else if (target instanceof SendProcessor) {
183                    SendProcessor sp = (SendProcessor) target;
184                    // special for sending to throughput logger
185                    if (sp.getDestination() instanceof LogEndpoint) {
186                        LogEndpoint le = (LogEndpoint) sp.getDestination();
187                        if (le.getLogger() instanceof ThroughputLogger) {
188                            ThroughputLogger tl = (ThroughputLogger) le.getLogger();
189                            answer = new ManagedThroughputLogger(context, tl, definition);
190                        }
191                    }
192                    // regular send processor
193                    if (answer == null) {
194                        answer = new ManagedSendProcessor(context, (SendProcessor) target, definition);
195                    }
196                } else if (target instanceof BeanProcessor) {
197                    answer = new ManagedBeanProcessor(context, (BeanProcessor) target, definition);
198                } else if (target instanceof IdempotentConsumer) {
199                    answer = new ManagedIdempotentConsumer(context, (IdempotentConsumer) target, definition);
200                } else if (target instanceof org.apache.camel.spi.ManagementAware) {
201                    return ((org.apache.camel.spi.ManagementAware<Processor>) target).getManagedObject(processor);
202                }
203    
204                if (answer != null) {
205                    // break out as we found an answer
206                    break;
207                }
208    
209                // no answer yet, so unwrap any delegates and try again
210                if (target instanceof DelegateProcessor) {
211                    target = ((DelegateProcessor) target).getProcessor();
212                } else {
213                    // no delegate so we dont have any target to try next
214                    break;
215                }
216            }
217    
218            if (answer == null) {
219                // fallback to a generic processor
220                answer = new ManagedProcessor(context, target, definition);
221            }
222    
223            answer.setRoute(route);
224            answer.init(context.getManagementStrategy());
225            return answer;
226        }
227    
228    }