001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.util.EventObject;
020    
021    import org.apache.camel.CamelContext;
022    import org.apache.camel.CamelContextAware;
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Exchange;
025    import org.apache.camel.Producer;
026    import org.apache.camel.support.EventNotifierSupport;
027    import org.apache.camel.util.ObjectHelper;
028    import org.apache.camel.util.ServiceHelper;
029    import org.apache.camel.util.URISupport;
030    
031    /**
032     * A {@link org.apache.camel.spi.EventNotifier} which publishes the {@link EventObject} to some
033     * {@link org.apache.camel.Endpoint}.
034     * <p/>
035     * This notifier is only enabled when {@link CamelContext} is started. This avoids problems when
036     * sending notifications during start/shutdown of {@link CamelContext} which causes problems by
037     * sending those events to Camel routes by this notifier.
038     *
039     * @version 
040     */
041    public class PublishEventNotifier extends EventNotifierSupport implements CamelContextAware {
042    
043        private CamelContext camelContext;
044        private Endpoint endpoint;
045        private String endpointUri;
046        private Producer producer;
047    
048        public void notify(EventObject event) throws Exception {
049            // only notify when we are started
050            if (!isStarted()) {
051                log.debug("Cannot publish event as notifier is not started: {}", event);
052                return;
053            }
054    
055            // only notify when camel context is running
056            if (!camelContext.getStatus().isStarted()) {
057                log.debug("Cannot publish event as CamelContext is not started: {}", event);
058                return;
059            }
060    
061            Exchange exchange = producer.createExchange();
062            exchange.getIn().setBody(event);
063    
064            // make sure we don't send out events for this as well
065            // mark exchange as being published to event, to prevent creating new events
066            // for this as well (causing a endless flood of events)
067            exchange.setProperty(Exchange.NOTIFY_EVENT, Boolean.TRUE);
068            try {
069                producer.process(exchange);
070            } finally {
071                // and remove it when its done
072                exchange.removeProperty(Exchange.NOTIFY_EVENT);
073            }
074        }
075    
076        public boolean isEnabled(EventObject event) {
077            return true;
078        }
079    
080        public CamelContext getCamelContext() {
081            return camelContext;
082        }
083    
084        public void setCamelContext(CamelContext camelContext) {
085            this.camelContext = camelContext;
086        }
087    
088        public Endpoint getEndpoint() {
089            return endpoint;
090        }
091    
092        public void setEndpoint(Endpoint endpoint) {
093            this.endpoint = endpoint;
094        }
095    
096        public String getEndpointUri() {
097            return endpointUri;
098        }
099    
100        public void setEndpointUri(String endpointUri) {
101            this.endpointUri = endpointUri;
102        }
103    
104        @Override
105        protected void doStart() throws Exception {
106            ObjectHelper.notNull(camelContext, "camelContext", this);
107            if (endpoint == null && endpointUri == null) {
108                throw new IllegalArgumentException("Either endpoint or endpointUri must be configured");
109            }
110    
111            if (endpoint == null) {
112                endpoint = camelContext.getEndpoint(endpointUri);
113            }
114    
115            producer = endpoint.createProducer();
116            ServiceHelper.startService(producer);
117        }
118    
119        @Override
120        protected void doStop() throws Exception {
121            ServiceHelper.stopService(producer);
122        }
123    
124        @Override
125        public String toString() {
126            return "PublishEventNotifier[" + (endpoint != null ? endpoint : URISupport.sanitizeUri(endpointUri)) + "]";
127        }
128    
129    }