001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.io.IOException;
020    import java.lang.management.ManagementFactory;
021    import java.net.InetAddress;
022    import java.net.UnknownHostException;
023    import java.rmi.NoSuchObjectException;
024    import java.rmi.RemoteException;
025    import java.rmi.registry.LocateRegistry;
026    import java.rmi.registry.Registry;
027    import java.rmi.server.UnicastRemoteObject;
028    import java.util.List;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.ConcurrentMap;
031    import javax.management.JMException;
032    import javax.management.MBeanServer;
033    import javax.management.MBeanServerFactory;
034    import javax.management.NotCompliantMBeanException;
035    import javax.management.ObjectInstance;
036    import javax.management.ObjectName;
037    import javax.management.remote.JMXConnectorServer;
038    import javax.management.remote.JMXConnectorServerFactory;
039    import javax.management.remote.JMXServiceURL;
040    
041    import org.apache.camel.CamelContext;
042    import org.apache.camel.CamelContextAware;
043    import org.apache.camel.spi.ManagementAgent;
044    import org.apache.camel.spi.ManagementMBeanAssembler;
045    import org.apache.camel.support.ServiceSupport;
046    import org.apache.camel.util.ObjectHelper;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * Default implementation of the Camel JMX service agent
052     */
053    public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent, CamelContextAware {
054    
055        public static final String DEFAULT_DOMAIN = "org.apache.camel";
056        public static final String DEFAULT_HOST = "localhost";
057        public static final int DEFAULT_REGISTRY_PORT = 1099;
058        public static final int DEFAULT_CONNECTION_PORT = -1;
059        public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
060        private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementAgent.class);
061    
062        private CamelContext camelContext;
063        private MBeanServer server;
064        // need a name -> actual name mapping as some servers changes the names (such as WebSphere)
065        private final ConcurrentMap<ObjectName, ObjectName> mbeansRegistered = new ConcurrentHashMap<ObjectName, ObjectName>();
066        private JMXConnectorServer cs;
067        private Registry registry;
068    
069        private Integer registryPort;
070        private Integer connectorPort;
071        private String mBeanServerDefaultDomain;
072        private String mBeanObjectDomainName;
073        private String serviceUrlPath;
074        private Boolean usePlatformMBeanServer = true;
075        private Boolean createConnector;
076        private Boolean onlyRegisterProcessorWithCustomId;
077        private Boolean registerAlways;
078        private Boolean registerNewRoutes = true;
079        private Boolean mask;
080        private Boolean includeHostName;
081    
082        public DefaultManagementAgent() {
083        }
084    
085        public DefaultManagementAgent(CamelContext camelContext) {
086            this.camelContext = camelContext;
087        }
088    
089        protected void finalizeSettings() {
090            if (registryPort == null) {
091                registryPort = Integer.getInteger(JmxSystemPropertyKeys.REGISTRY_PORT, DEFAULT_REGISTRY_PORT);
092            }
093            if (connectorPort == null) {
094                connectorPort = Integer.getInteger(JmxSystemPropertyKeys.CONNECTOR_PORT, DEFAULT_CONNECTION_PORT);
095            }
096            if (mBeanServerDefaultDomain == null) {
097                mBeanServerDefaultDomain = System.getProperty(JmxSystemPropertyKeys.DOMAIN, DEFAULT_DOMAIN);
098            }
099            if (mBeanObjectDomainName == null) {
100                mBeanObjectDomainName = System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN, DEFAULT_DOMAIN);
101            }
102            if (serviceUrlPath == null) {
103                serviceUrlPath = System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH, DEFAULT_SERVICE_URL_PATH);
104            }
105            if (createConnector == null) {
106                createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
107            }
108            if (onlyRegisterProcessorWithCustomId == null) {
109                onlyRegisterProcessorWithCustomId = Boolean.getBoolean(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID);
110            }
111            // "Use platform mbean server" is true by default
112            if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null) {
113                usePlatformMBeanServer = Boolean.getBoolean(JmxSystemPropertyKeys.USE_PLATFORM_MBS);
114            }
115            if (System.getProperty(JmxSystemPropertyKeys.REGISTER_ALWAYS) != null) {
116                registerAlways = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_ALWAYS);
117            }
118            if (System.getProperty(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES) != null) {
119                registerNewRoutes = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES);
120            }
121            if (System.getProperty(JmxSystemPropertyKeys.MASK) != null) {
122                mask = Boolean.getBoolean(JmxSystemPropertyKeys.MASK);
123            }
124            if (System.getProperty(JmxSystemPropertyKeys.INCLUDE_HOST_NAME) != null) {
125                includeHostName = Boolean.getBoolean(JmxSystemPropertyKeys.INCLUDE_HOST_NAME);
126            }
127            if (System.getProperty(JmxSystemPropertyKeys.CREATE_CONNECTOR) != null) {
128                createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
129            }
130        }
131    
132        public void setRegistryPort(Integer port) {
133            registryPort = port;
134        }
135    
136        public Integer getRegistryPort() {
137            return registryPort;
138        }
139    
140        public void setConnectorPort(Integer port) {
141            connectorPort = port;
142        }
143    
144        public Integer getConnectorPort() {
145            return connectorPort;
146        }
147    
148        public void setMBeanServerDefaultDomain(String domain) {
149            mBeanServerDefaultDomain = domain;
150        }
151    
152        public String getMBeanServerDefaultDomain() {
153            return mBeanServerDefaultDomain;
154        }
155    
156        public void setMBeanObjectDomainName(String domainName) {
157            mBeanObjectDomainName = domainName;
158        }
159    
160        public String getMBeanObjectDomainName() {
161            return mBeanObjectDomainName;
162        }
163    
164        public void setServiceUrlPath(String url) {
165            serviceUrlPath = url;
166        }
167    
168        public String getServiceUrlPath() {
169            return serviceUrlPath;
170        }
171    
172        public void setCreateConnector(Boolean flag) {
173            createConnector = flag;
174        }
175    
176        public Boolean getCreateConnector() {
177            return createConnector;
178        }
179    
180        public void setUsePlatformMBeanServer(Boolean flag) {
181            usePlatformMBeanServer = flag;
182        }
183    
184        public Boolean getUsePlatformMBeanServer() {
185            return usePlatformMBeanServer;
186        }
187    
188        public Boolean getOnlyRegisterProcessorWithCustomId() {
189            return onlyRegisterProcessorWithCustomId;
190        }
191    
192        public void setOnlyRegisterProcessorWithCustomId(Boolean onlyRegisterProcessorWithCustomId) {
193            this.onlyRegisterProcessorWithCustomId = onlyRegisterProcessorWithCustomId;
194        }
195    
196        public void setMBeanServer(MBeanServer mbeanServer) {
197            server = mbeanServer;
198        }
199    
200        public MBeanServer getMBeanServer() {
201            return server;
202        }
203    
204        public Boolean getRegisterAlways() {
205            return registerAlways != null && registerAlways;
206        }
207    
208        public void setRegisterAlways(Boolean registerAlways) {
209            this.registerAlways = registerAlways;
210        }
211    
212        public Boolean getRegisterNewRoutes() {
213            return registerNewRoutes != null && registerNewRoutes;
214        }
215    
216        public void setRegisterNewRoutes(Boolean registerNewRoutes) {
217            this.registerNewRoutes = registerNewRoutes;
218        }
219    
220        public Boolean getMask() {
221            return mask != null && mask;
222        }
223    
224        public void setMask(Boolean mask) {
225            this.mask = mask;
226        }
227    
228        public Boolean getIncludeHostName() {
229            return includeHostName != null && includeHostName;
230        }
231    
232        public void setIncludeHostName(Boolean includeHostName) {
233            this.includeHostName = includeHostName;
234        }
235    
236        public CamelContext getCamelContext() {
237            return camelContext;
238        }
239    
240        public void setCamelContext(CamelContext camelContext) {
241            this.camelContext = camelContext;
242        }
243    
244        public void register(Object obj, ObjectName name) throws JMException {
245            register(obj, name, false);
246        }
247    
248        public void register(Object obj, ObjectName name, boolean forceRegistration) throws JMException {
249            try {
250                registerMBeanWithServer(obj, name, forceRegistration);
251            } catch (NotCompliantMBeanException e) {
252                // If this is not a "normal" MBean, then try to deploy it using JMX annotations
253                ManagementMBeanAssembler assembler = camelContext.getManagementMBeanAssembler();
254                ObjectHelper.notNull(assembler, "ManagementMBeanAssembler", camelContext);
255                Object mbean = assembler.assemble(server, obj, name);
256                if (mbean != null) {
257                    // and register the mbean
258                    registerMBeanWithServer(mbean, name, forceRegistration);
259                }
260            }
261        }
262    
263        public void unregister(ObjectName name) throws JMException {
264            if (isRegistered(name)) {
265                ObjectName on = mbeansRegistered.remove(name);
266                server.unregisterMBean(on);
267                LOG.debug("Unregistered MBean with ObjectName: {}", name);
268            } else {
269                mbeansRegistered.remove(name);
270            }
271        }
272    
273        public boolean isRegistered(ObjectName name) {
274            ObjectName on = mbeansRegistered.get(name);
275            return (on != null && server.isRegistered(on))
276                    || server.isRegistered(name);
277        }
278    
279        protected void doStart() throws Exception {
280            ObjectHelper.notNull(camelContext, "CamelContext");
281    
282            // create mbean server if is has not be injected.
283            if (server == null) {
284                finalizeSettings();
285                createMBeanServer();
286            }
287    
288            LOG.debug("Starting JMX agent on server: {}", getMBeanServer());
289        }
290    
291        protected void doStop() throws Exception {
292            // close JMX Connector, if it was created
293            if (cs != null) {
294                try {
295                    cs.stop();
296                    LOG.debug("Stopped JMX Connector");
297                } catch (IOException e) {
298                    LOG.debug("Error occurred during stopping JMXConnectorService: "
299                            + cs + ". This exception will be ignored.");
300                }
301                cs = null;
302            }
303    
304            // Unexport JMX RMI registry, if it was created
305            if (registry != null) {
306                try {
307                    UnicastRemoteObject.unexportObject(registry, true);
308                    LOG.debug("Unexported JMX RMI Registry");
309                } catch (NoSuchObjectException e) {
310                    LOG.debug("Error occurred while unexporting JMX RMI registry. This exception will be ignored.");
311                }
312            }
313    
314            if (mbeansRegistered.isEmpty()) {
315                return;
316            }
317    
318            // Using the array to hold the busMBeans to avoid the CurrentModificationException
319            ObjectName[] mBeans = mbeansRegistered.keySet().toArray(new ObjectName[mbeansRegistered.size()]);
320            int caught = 0;
321            for (ObjectName name : mBeans) {
322                try {
323                    unregister(name);
324                } catch (Exception e) {
325                    LOG.info("Exception unregistering MBean with name " + name, e);
326                    caught++;
327                }
328            }
329            if (caught > 0) {
330                LOG.warn("A number of " + caught
331                         + " exceptions caught while unregistering MBeans during stop operation."
332                         + " See INFO log for details.");
333            }
334        }
335    
336        private void registerMBeanWithServer(Object obj, ObjectName name, boolean forceRegistration)
337            throws JMException {
338    
339            // have we already registered the bean, there can be shared instances in the camel routes
340            boolean exists = isRegistered(name);
341            if (exists) {
342                if (forceRegistration) {
343                    LOG.info("ForceRegistration enabled, unregistering existing MBean with ObjectName: {}", name);
344                    server.unregisterMBean(name);
345                } else {
346                    // okay ignore we do not want to force it and it could be a shared instance
347                    LOG.debug("MBean already registered with ObjectName: {}", name);
348                }
349            }
350    
351            // register bean if by force or not exists
352            ObjectInstance instance = null;
353            if (forceRegistration || !exists) {
354                LOG.trace("Registering MBean with ObjectName: {}", name);
355                instance = server.registerMBean(obj, name);
356            }
357    
358            // need to use the name returned from the server as some JEE servers may modify the name
359            if (instance != null) {
360                ObjectName registeredName = instance.getObjectName();
361                LOG.debug("Registered MBean with ObjectName: {}", registeredName);
362                mbeansRegistered.put(name, registeredName);
363            }
364        }
365    
366        protected void createMBeanServer() {
367            String hostName;
368            boolean canAccessSystemProps = true;
369            try {
370                // we'll do it this way mostly to determine if we should lookup the hostName
371                SecurityManager sm = System.getSecurityManager();
372                if (sm != null) {
373                    sm.checkPropertiesAccess();
374                }
375            } catch (SecurityException se) {
376                canAccessSystemProps = false;
377            }
378    
379            if (canAccessSystemProps) {
380                try {
381                    hostName = InetAddress.getLocalHost().getHostName();
382                } catch (UnknownHostException uhe) {
383                    LOG.info("Cannot determine localhost name. Using default: " + DEFAULT_REGISTRY_PORT, uhe);
384                    hostName = DEFAULT_HOST;
385                }
386            } else {
387                hostName = DEFAULT_HOST;
388            }
389    
390            server = findOrCreateMBeanServer();
391    
392            try {
393                // Create the connector if we need
394                if (createConnector) {
395                    createJmxConnector(hostName);
396                }
397            } catch (IOException ioe) {
398                LOG.warn("Could not create and start JMX connector.", ioe);
399            }
400        }
401        
402        protected MBeanServer findOrCreateMBeanServer() {
403    
404            // return platform mbean server if the option is specified.
405            if (usePlatformMBeanServer) {
406                return ManagementFactory.getPlatformMBeanServer();
407            }
408    
409            // look for the first mbean server that has match default domain name
410            List<MBeanServer> servers = MBeanServerFactory.findMBeanServer(null);
411    
412            for (MBeanServer server : servers) {
413                LOG.debug("Found MBeanServer with default domain {}", server.getDefaultDomain());
414    
415                if (mBeanServerDefaultDomain.equals(server.getDefaultDomain())) {
416                    return server;
417                }
418            }
419    
420            // create a mbean server with the given default domain name
421            return MBeanServerFactory.createMBeanServer(mBeanServerDefaultDomain);
422        }
423    
424        protected void createJmxConnector(String host) throws IOException {
425            ObjectHelper.notEmpty(serviceUrlPath, "serviceUrlPath");
426            ObjectHelper.notNull(registryPort, "registryPort");
427    
428            try {
429                registry = LocateRegistry.createRegistry(registryPort);
430                LOG.debug("Created JMXConnector RMI registry on port {}", registryPort);
431            } catch (RemoteException ex) {
432                // The registry may had been created, we could get the registry instead
433            }
434    
435            // must start with leading slash
436            String path = serviceUrlPath.startsWith("/") ? serviceUrlPath : "/" + serviceUrlPath;
437            // Create an RMI connector and start it
438            final JMXServiceURL url;
439            if (connectorPort > 0) {
440                url = new JMXServiceURL("service:jmx:rmi://" + host + ":" + connectorPort + "/jndi/rmi://" + host
441                                        + ":" + registryPort + path);
442            } else {
443                url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + registryPort + path);
444            }
445    
446            cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, server);
447    
448            // use async thread for starting the JMX Connector
449            // (no need to use a thread pool or enlist in JMX as this thread is terminated when the JMX connector has been started)
450            String threadName = camelContext.getExecutorServiceManager().resolveThreadName("JMXConnector: " + url);
451            Thread thread = getCamelContext().getExecutorServiceManager().newThread(threadName, new Runnable() {
452                public void run() {
453                    try {
454                        LOG.debug("Staring JMX Connector thread to listen at: {}", url);
455                        cs.start();
456                        LOG.info("JMX Connector thread started and listening at: {}", url);
457                    } catch (IOException ioe) {
458                        LOG.warn("Could not start JMXConnector thread at: " + url + ". JMX Connector not in use.", ioe);
459                    }
460                }
461            });
462            thread.start();
463        }
464    
465    }