001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.controlbus;
018    
019    import javax.management.MBeanServer;
020    import javax.management.ObjectName;
021    
022    import org.apache.camel.AsyncCallback;
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.Endpoint;
025    import org.apache.camel.Exchange;
026    import org.apache.camel.Expression;
027    import org.apache.camel.Route;
028    import org.apache.camel.ServiceStatus;
029    import org.apache.camel.impl.DefaultAsyncProducer;
030    import org.apache.camel.spi.Language;
031    import org.apache.camel.util.CamelLogger;
032    import org.apache.camel.util.ExchangeHelper;
033    
034    /**
035     * The control bus producer.
036     */
037    public class ControlBusProducer extends DefaultAsyncProducer {
038    
039        private final CamelLogger logger;
040    
041        public ControlBusProducer(Endpoint endpoint, CamelLogger logger) {
042            super(endpoint);
043            this.logger = logger;
044        }
045    
046        @Override
047        public ControlBusEndpoint getEndpoint() {
048            return (ControlBusEndpoint) super.getEndpoint();
049        }
050    
051        @Override
052        public boolean process(Exchange exchange, AsyncCallback callback) {
053            if (getEndpoint().getLanguage() != null) {
054                try {
055                    processByLanguage(exchange, getEndpoint().getLanguage());
056                } catch (Exception e) {
057                    exchange.setException(e);
058                }
059            } else if (getEndpoint().getAction() != null) {
060                try {
061                    processByAction(exchange);
062                } catch (Exception e) {
063                    exchange.setException(e);
064                }
065            }
066    
067            callback.done(true);
068            return true;
069        }
070    
071        protected void processByLanguage(Exchange exchange, Language language) throws Exception {
072            LanguageTask task = new LanguageTask(exchange, language);
073            if (getEndpoint().isAsync()) {
074                getEndpoint().getComponent().getExecutorService().submit(task);
075            } else {
076                task.run();
077            }
078        }
079    
080        protected void processByAction(Exchange exchange) throws Exception {
081            ActionTask task = new ActionTask(exchange);
082            if (getEndpoint().isAsync()) {
083                getEndpoint().getComponent().getExecutorService().submit(task);
084            } else {
085                task.run();
086            }
087        }
088    
089        /**
090         * Tasks to run when processing by language.
091         */
092        private final class LanguageTask implements Runnable {
093    
094            private final Exchange exchange;
095            private final Language language;
096    
097            private LanguageTask(Exchange exchange, Language language) {
098                this.exchange = exchange;
099                this.language = language;
100            }
101    
102            @Override
103            public void run() {
104                String task = null;
105                Object result = null;
106    
107                try {
108                    // create dummy exchange
109                    Exchange dummy = ExchangeHelper.createCopy(exchange, true);
110    
111                    task = dummy.getIn().getMandatoryBody(String.class);
112                    if (task != null) {
113                        Expression exp = language.createExpression(task);
114                        result = exp.evaluate(dummy, Object.class);
115                    }
116    
117                    if (result != null && !getEndpoint().isAsync()) {
118                        // can only set result on exchange if sync
119                        exchange.getIn().setBody(result);
120                    }
121    
122                    if (task != null) {
123                        logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
124                    }
125                } catch (Exception e) {
126                    logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
127                }
128            }
129        }
130    
131        /**
132         * Tasks to run when processing by route action.
133         */
134        private final class ActionTask implements Runnable {
135    
136            private final Exchange exchange;
137    
138            private ActionTask(Exchange exchange) {
139                this.exchange = exchange;
140            }
141    
142            @Override
143            public void run() {
144                String action = getEndpoint().getAction();
145                String id = getEndpoint().getRouteId();
146    
147                Object result = null;
148                String task = action + " route " + id;
149    
150                try {
151                    if ("start".equals(action)) {
152                        getEndpoint().getCamelContext().startRoute(id);
153                    } else if ("stop".equals(action)) {
154                        getEndpoint().getCamelContext().stopRoute(id);
155                    } else if ("suspend".equals(action)) {
156                        getEndpoint().getCamelContext().suspendRoute(id);
157                    } else if ("resume".equals(action)) {
158                        getEndpoint().getCamelContext().resumeRoute(id);
159                    } else if ("status".equals(action)) {
160                        ServiceStatus status = getEndpoint().getCamelContext().getRouteStatus(id);
161                        if (status != null) {
162                            result = status.name();
163                        }
164                    } else if ("stats".equals(action)) {
165    
166                        // camel context or per route
167                        String name = getEndpoint().getCamelContext().getManagementName();
168                        if (name == null) {
169                            result = "JMX is disabled, cannot get stats";
170                        } else {
171                            ObjectName on;
172                            String operation;
173                            if (id == null) {
174                                CamelContext camelContext = getEndpoint().getCamelContext();
175                                on = getEndpoint().getCamelContext().getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(camelContext);
176                                operation = "dumpRoutesStatsAsXml";
177                            } else {
178                                Route route = getEndpoint().getCamelContext().getRoute(id);
179                                on = getEndpoint().getCamelContext().getManagementStrategy().getManagementNamingStrategy().getObjectNameForRoute(route);
180                                operation = "dumpRouteStatsAsXml";
181                            }
182                            if (on != null) {
183                                MBeanServer server = getEndpoint().getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer();
184                                result = server.invoke(on, operation, new Object[]{true, true}, new String[]{"boolean", "boolean"});
185                            } else {
186                                result = "Cannot lookup route with id " + id;
187                            }
188                        }
189                    }
190    
191                    if (result != null && !getEndpoint().isAsync()) {
192                        // can only set result on exchange if sync
193                        exchange.getIn().setBody(result);
194                    }
195    
196                    logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void"));
197                } catch (Exception e) {
198                    logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e);
199                }
200            }
201        }
202    
203    }