001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.controlbus; 018 019 import javax.management.MBeanServer; 020 import javax.management.ObjectName; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.CamelContext; 024 import org.apache.camel.Endpoint; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.Expression; 027 import org.apache.camel.Route; 028 import org.apache.camel.ServiceStatus; 029 import org.apache.camel.impl.DefaultAsyncProducer; 030 import org.apache.camel.spi.Language; 031 import org.apache.camel.util.CamelLogger; 032 import org.apache.camel.util.ExchangeHelper; 033 034 /** 035 * The control bus producer. 036 */ 037 public class ControlBusProducer extends DefaultAsyncProducer { 038 039 private final CamelLogger logger; 040 041 public ControlBusProducer(Endpoint endpoint, CamelLogger logger) { 042 super(endpoint); 043 this.logger = logger; 044 } 045 046 @Override 047 public ControlBusEndpoint getEndpoint() { 048 return (ControlBusEndpoint) super.getEndpoint(); 049 } 050 051 @Override 052 public boolean process(Exchange exchange, AsyncCallback callback) { 053 if (getEndpoint().getLanguage() != null) { 054 try { 055 processByLanguage(exchange, getEndpoint().getLanguage()); 056 } catch (Exception e) { 057 exchange.setException(e); 058 } 059 } else if (getEndpoint().getAction() != null) { 060 try { 061 processByAction(exchange); 062 } catch (Exception e) { 063 exchange.setException(e); 064 } 065 } 066 067 callback.done(true); 068 return true; 069 } 070 071 protected void processByLanguage(Exchange exchange, Language language) throws Exception { 072 LanguageTask task = new LanguageTask(exchange, language); 073 if (getEndpoint().isAsync()) { 074 getEndpoint().getComponent().getExecutorService().submit(task); 075 } else { 076 task.run(); 077 } 078 } 079 080 protected void processByAction(Exchange exchange) throws Exception { 081 ActionTask task = new ActionTask(exchange); 082 if (getEndpoint().isAsync()) { 083 getEndpoint().getComponent().getExecutorService().submit(task); 084 } else { 085 task.run(); 086 } 087 } 088 089 /** 090 * Tasks to run when processing by language. 091 */ 092 private final class LanguageTask implements Runnable { 093 094 private final Exchange exchange; 095 private final Language language; 096 097 private LanguageTask(Exchange exchange, Language language) { 098 this.exchange = exchange; 099 this.language = language; 100 } 101 102 @Override 103 public void run() { 104 String task = null; 105 Object result = null; 106 107 try { 108 // create dummy exchange 109 Exchange dummy = ExchangeHelper.createCopy(exchange, true); 110 111 task = dummy.getIn().getMandatoryBody(String.class); 112 if (task != null) { 113 Expression exp = language.createExpression(task); 114 result = exp.evaluate(dummy, Object.class); 115 } 116 117 if (result != null && !getEndpoint().isAsync()) { 118 // can only set result on exchange if sync 119 exchange.getIn().setBody(result); 120 } 121 122 if (task != null) { 123 logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void")); 124 } 125 } catch (Exception e) { 126 logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e); 127 } 128 } 129 } 130 131 /** 132 * Tasks to run when processing by route action. 133 */ 134 private final class ActionTask implements Runnable { 135 136 private final Exchange exchange; 137 138 private ActionTask(Exchange exchange) { 139 this.exchange = exchange; 140 } 141 142 @Override 143 public void run() { 144 String action = getEndpoint().getAction(); 145 String id = getEndpoint().getRouteId(); 146 147 Object result = null; 148 String task = action + " route " + id; 149 150 try { 151 if ("start".equals(action)) { 152 getEndpoint().getCamelContext().startRoute(id); 153 } else if ("stop".equals(action)) { 154 getEndpoint().getCamelContext().stopRoute(id); 155 } else if ("suspend".equals(action)) { 156 getEndpoint().getCamelContext().suspendRoute(id); 157 } else if ("resume".equals(action)) { 158 getEndpoint().getCamelContext().resumeRoute(id); 159 } else if ("status".equals(action)) { 160 ServiceStatus status = getEndpoint().getCamelContext().getRouteStatus(id); 161 if (status != null) { 162 result = status.name(); 163 } 164 } else if ("stats".equals(action)) { 165 166 // camel context or per route 167 String name = getEndpoint().getCamelContext().getManagementName(); 168 if (name == null) { 169 result = "JMX is disabled, cannot get stats"; 170 } else { 171 ObjectName on; 172 String operation; 173 if (id == null) { 174 CamelContext camelContext = getEndpoint().getCamelContext(); 175 on = getEndpoint().getCamelContext().getManagementStrategy().getManagementNamingStrategy().getObjectNameForCamelContext(camelContext); 176 operation = "dumpRoutesStatsAsXml"; 177 } else { 178 Route route = getEndpoint().getCamelContext().getRoute(id); 179 on = getEndpoint().getCamelContext().getManagementStrategy().getManagementNamingStrategy().getObjectNameForRoute(route); 180 operation = "dumpRouteStatsAsXml"; 181 } 182 if (on != null) { 183 MBeanServer server = getEndpoint().getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 184 result = server.invoke(on, operation, new Object[]{true, true}, new String[]{"boolean", "boolean"}); 185 } else { 186 result = "Cannot lookup route with id " + id; 187 } 188 } 189 } 190 191 if (result != null && !getEndpoint().isAsync()) { 192 // can only set result on exchange if sync 193 exchange.getIn().setBody(result); 194 } 195 196 logger.log("ControlBus task done [" + task + "] with result -> " + (result != null ? result : "void")); 197 } catch (Exception e) { 198 logger.log("Error executing ControlBus task [" + task + "]. This exception will be ignored.", e); 199 } 200 } 201 } 202 203 }