001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.management.mbean; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Comparator; 022 import java.util.HashMap; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.concurrent.TimeUnit; 027 import javax.management.AttributeValueExp; 028 import javax.management.MBeanServer; 029 import javax.management.MBeanServerInvocationHandler; 030 import javax.management.ObjectName; 031 import javax.management.Query; 032 import javax.management.QueryExp; 033 import javax.management.StringValueExp; 034 035 import org.apache.camel.CamelContext; 036 import org.apache.camel.ManagementStatisticsLevel; 037 import org.apache.camel.Route; 038 import org.apache.camel.ServiceStatus; 039 import org.apache.camel.TimerListener; 040 import org.apache.camel.api.management.ManagedResource; 041 import org.apache.camel.api.management.mbean.ManagedProcessorMBean; 042 import org.apache.camel.api.management.mbean.ManagedRouteMBean; 043 import org.apache.camel.model.ModelCamelContext; 044 import org.apache.camel.model.ModelHelper; 045 import org.apache.camel.model.RouteDefinition; 046 import org.apache.camel.spi.RoutePolicy; 047 import org.apache.camel.util.ObjectHelper; 048 049 @ManagedResource(description = "Managed Route") 050 public class ManagedRoute extends ManagedPerformanceCounter implements TimerListener, ManagedRouteMBean { 051 public static final String VALUE_UNKNOWN = "Unknown"; 052 protected final Route route; 053 protected final String description; 054 protected final ModelCamelContext context; 055 private final LoadTriplet load = new LoadTriplet(); 056 057 public ManagedRoute(ModelCamelContext context, Route route) { 058 this.route = route; 059 this.context = context; 060 this.description = route.toString(); 061 boolean enabled = context.getManagementStrategy().getStatisticsLevel() != ManagementStatisticsLevel.Off; 062 setStatisticsEnabled(enabled); 063 } 064 065 public Route getRoute() { 066 return route; 067 } 068 069 public CamelContext getContext() { 070 return context; 071 } 072 073 public String getRouteId() { 074 String id = route.getId(); 075 if (id == null) { 076 id = VALUE_UNKNOWN; 077 } 078 return id; 079 } 080 081 public String getDescription() { 082 return description; 083 } 084 085 @Override 086 public String getEndpointUri() { 087 if (route.getEndpoint() != null) { 088 return route.getEndpoint().getEndpointUri(); 089 } 090 return VALUE_UNKNOWN; 091 } 092 093 public String getState() { 094 // must use String type to be sure remote JMX can read the attribute without requiring Camel classes. 095 ServiceStatus status = context.getRouteStatus(route.getId()); 096 // if no status exists then its stopped 097 if (status == null) { 098 status = ServiceStatus.Stopped; 099 } 100 return status.name(); 101 } 102 103 public Integer getInflightExchanges() { 104 return context.getInflightRepository().size(route.getId()); 105 } 106 107 public String getCamelId() { 108 return context.getName(); 109 } 110 111 public String getCamelManagementName() { 112 return context.getManagementName(); 113 } 114 115 public Boolean getTracing() { 116 return route.getRouteContext().isTracing(); 117 } 118 119 public void setTracing(Boolean tracing) { 120 route.getRouteContext().setTracing(tracing); 121 } 122 123 public Boolean getMessageHistory() { 124 return route.getRouteContext().isMessageHistory(); 125 } 126 127 public String getRoutePolicyList() { 128 List<RoutePolicy> policyList = route.getRouteContext().getRoutePolicyList(); 129 130 if (policyList == null || policyList.isEmpty()) { 131 // return an empty string to have it displayed nicely in JMX consoles 132 return ""; 133 } 134 135 StringBuilder sb = new StringBuilder(); 136 for (int i = 0; i < policyList.size(); i++) { 137 RoutePolicy policy = policyList.get(i); 138 sb.append(policy.getClass().getSimpleName()); 139 sb.append("(").append(ObjectHelper.getIdentityHashCode(policy)).append(")"); 140 if (i < policyList.size() - 1) { 141 sb.append(", "); 142 } 143 } 144 return sb.toString(); 145 } 146 147 public String getLoad01() { 148 double load1 = load.getLoad1(); 149 if (Double.isNaN(load1)) { 150 // empty string if load statistics is disabled 151 return ""; 152 } else { 153 return String.format("%.2f", load1); 154 } 155 } 156 157 public String getLoad05() { 158 double load5 = load.getLoad5(); 159 if (Double.isNaN(load5)) { 160 // empty string if load statistics is disabled 161 return ""; 162 } else { 163 return String.format("%.2f", load5); 164 } 165 } 166 167 public String getLoad15() { 168 double load15 = load.getLoad15(); 169 if (Double.isNaN(load15)) { 170 // empty string if load statistics is disabled 171 return ""; 172 } else { 173 return String.format("%.2f", load15); 174 } 175 } 176 177 @Override 178 public void onTimer() { 179 load.update(getInflightExchanges()); 180 } 181 182 public void start() throws Exception { 183 if (!context.getStatus().isStarted()) { 184 throw new IllegalArgumentException("CamelContext is not started"); 185 } 186 context.startRoute(getRouteId()); 187 } 188 189 public void stop() throws Exception { 190 if (!context.getStatus().isStarted()) { 191 throw new IllegalArgumentException("CamelContext is not started"); 192 } 193 context.stopRoute(getRouteId()); 194 } 195 196 public void stop(long timeout) throws Exception { 197 if (!context.getStatus().isStarted()) { 198 throw new IllegalArgumentException("CamelContext is not started"); 199 } 200 context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS); 201 } 202 203 public boolean stop(Long timeout, Boolean abortAfterTimeout) throws Exception { 204 if (!context.getStatus().isStarted()) { 205 throw new IllegalArgumentException("CamelContext is not started"); 206 } 207 return context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS, abortAfterTimeout); 208 } 209 210 public void shutdown() throws Exception { 211 if (!context.getStatus().isStarted()) { 212 throw new IllegalArgumentException("CamelContext is not started"); 213 } 214 String routeId = getRouteId(); 215 context.stopRoute(routeId); 216 context.removeRoute(routeId); 217 } 218 219 public void shutdown(long timeout) throws Exception { 220 if (!context.getStatus().isStarted()) { 221 throw new IllegalArgumentException("CamelContext is not started"); 222 } 223 String routeId = getRouteId(); 224 context.stopRoute(routeId, timeout, TimeUnit.SECONDS); 225 context.removeRoute(routeId); 226 } 227 228 public boolean remove() throws Exception { 229 if (!context.getStatus().isStarted()) { 230 throw new IllegalArgumentException("CamelContext is not started"); 231 } 232 return context.removeRoute(getRouteId()); 233 } 234 235 public String dumpRouteAsXml() throws Exception { 236 String id = route.getId(); 237 RouteDefinition def = context.getRouteDefinition(id); 238 if (def != null) { 239 return ModelHelper.dumpModelAsXml(def); 240 } 241 return null; 242 } 243 244 public void updateRouteFromXml(String xml) throws Exception { 245 // convert to model from xml 246 RouteDefinition def = ModelHelper.createModelFromXml(xml, RouteDefinition.class); 247 if (def == null) { 248 return; 249 } 250 251 // if the xml does not contain the route-id then we fix this by adding the actual route id 252 // this may be needed if the route-id was auto-generated, as the intend is to update this route 253 // and not add a new route, adding a new route, use the MBean operation on ManagedCamelContext instead. 254 if (ObjectHelper.isEmpty(def.getId())) { 255 def.setId(getRouteId()); 256 } else if (!def.getId().equals(getRouteId())) { 257 throw new IllegalArgumentException("Cannot update route from XML as routeIds does not match. routeId: " 258 + getRouteId() + ", routeId from XML: " + def.getId()); 259 } 260 261 // add will remove existing route first 262 context.addRouteDefinition(def); 263 } 264 265 public String dumpRouteStatsAsXml(boolean fullStats, boolean includeProcessors) throws Exception { 266 // in this logic we need to calculate the accumulated processing time for the processor in the route 267 // and hence why the logic is a bit more complicated to do this, as we need to calculate that from 268 // the bottom -> top of the route but this information is valuable for profiling routes 269 StringBuilder sb = new StringBuilder(); 270 271 // need to calculate this value first, as we need that value for the route stat 272 Long processorAccumulatedTime = 0L; 273 274 // gather all the processors for this route, which requires JMX 275 if (includeProcessors) { 276 sb.append(" <processorStats>\n"); 277 MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 278 if (server != null) { 279 // get all the processor mbeans and sort them accordingly to their index 280 String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; 281 ObjectName query = ObjectName.getInstance("org.apache.camel:context=" + prefix + getContext().getManagementName() + ",type=processors,*"); 282 Set<ObjectName> names = server.queryNames(query, null); 283 List<ManagedProcessorMBean> mps = new ArrayList<ManagedProcessorMBean>(); 284 for (ObjectName on : names) { 285 ManagedProcessorMBean processor = MBeanServerInvocationHandler.newProxyInstance(server, on, ManagedProcessorMBean.class, true); 286 287 // the processor must belong to this route 288 if (getRouteId().equals(processor.getRouteId())) { 289 mps.add(processor); 290 } 291 } 292 Collections.sort(mps, new OrderProcessorMBeans()); 293 294 // walk the processors in reverse order, and calculate the accumulated total time 295 Map<String, Long> accumulatedTimes = new HashMap<String, Long>(); 296 Collections.reverse(mps); 297 for (ManagedProcessorMBean processor : mps) { 298 processorAccumulatedTime += processor.getTotalProcessingTime(); 299 accumulatedTimes.put(processor.getProcessorId(), processorAccumulatedTime); 300 } 301 // and reverse back again 302 Collections.reverse(mps); 303 304 // and now add the sorted list of processors to the xml output 305 for (ManagedProcessorMBean processor : mps) { 306 sb.append(" <processorStat").append(String.format(" id=\"%s\" index=\"%s\"", processor.getProcessorId(), processor.getIndex())); 307 // do we have an accumulated time then append that 308 Long accTime = accumulatedTimes.get(processor.getProcessorId()); 309 if (accTime != null) { 310 sb.append(" accumulatedProcessingTime=\"").append(accTime).append("\""); 311 } 312 // use substring as we only want the attributes 313 sb.append(" ").append(processor.dumpStatsAsXml(fullStats).substring(7)).append("\n"); 314 } 315 } 316 sb.append(" </processorStats>\n"); 317 } 318 319 // route self time is route total - processor accumulated total) 320 long routeSelfTime = getTotalProcessingTime() - processorAccumulatedTime; 321 if (routeSelfTime < 0) { 322 // ensure we don't calculate that as negative 323 routeSelfTime = 0; 324 } 325 326 StringBuilder answer = new StringBuilder(); 327 answer.append("<routeStat").append(String.format(" id=\"%s\"", route.getId())); 328 // use substring as we only want the attributes 329 String stat = dumpStatsAsXml(fullStats); 330 answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\""); 331 answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n"); 332 333 if (includeProcessors) { 334 answer.append(sb); 335 } 336 337 answer.append("</routeStat>"); 338 return answer.toString(); 339 } 340 341 public void reset(boolean includeProcessors) throws Exception { 342 reset(); 343 344 // and now reset all processors for this route 345 if (includeProcessors) { 346 MBeanServer server = getContext().getManagementStrategy().getManagementAgent().getMBeanServer(); 347 if (server != null) { 348 // get all the processor mbeans and sort them accordingly to their index 349 String prefix = getContext().getManagementStrategy().getManagementAgent().getIncludeHostName() ? "*/" : ""; 350 ObjectName query = ObjectName.getInstance("org.apache.camel:context=" + prefix + getContext().getManagementName() + ",type=processors,*"); 351 QueryExp queryExp = Query.match(new AttributeValueExp("RouteId"), new StringValueExp(getRouteId())); 352 Set<ObjectName> names = server.queryNames(query, queryExp); 353 for (ObjectName name : names) { 354 server.invoke(name, "reset", null, null); 355 } 356 } 357 } 358 } 359 360 public String createRouteStaticEndpointJson() { 361 return getContext().createRouteStaticEndpointJson(getRouteId()); 362 } 363 364 @Override 365 public boolean equals(Object o) { 366 return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute)o).route)); 367 } 368 369 @Override 370 public int hashCode() { 371 return route.hashCode(); 372 } 373 374 /** 375 * Used for sorting the processor mbeans accordingly to their index. 376 */ 377 private static final class OrderProcessorMBeans implements Comparator<ManagedProcessorMBean> { 378 379 @Override 380 public int compare(ManagedProcessorMBean o1, ManagedProcessorMBean o2) { 381 return o1.getIndex().compareTo(o2.getIndex()); 382 } 383 } 384 385 }