001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.support;
018    
019    import java.util.LinkedHashSet;
020    import java.util.Set;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.ScheduledFuture;
023    import java.util.concurrent.TimeUnit;
024    
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.CamelContextAware;
027    import org.apache.camel.StaticService;
028    import org.apache.camel.TimerListener;
029    import org.apache.camel.util.ObjectHelper;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * A {@link TimerListener} manager which triggers the
035     * {@link org.apache.camel.TimerListener} listeners once every second.
036     * <p/>
037     * Also ensure when adding and remove listeners, that they are correctly removed to avoid
038     * leaking memory.
039     * <p/>
040     * From Camel 2.13 onwards the {@link TimerListenerManager} is only enabled if
041     * {@link org.apache.camel.spi.ManagementStrategy#isLoadStatisticsEnabled()} is enabled.
042     *
043     * @see TimerListener
044     */
045    public class TimerListenerManager extends ServiceSupport implements Runnable, CamelContextAware, StaticService {
046    
047        private static final Logger LOG = LoggerFactory.getLogger(TimerListenerManager.class);
048        private final Set<TimerListener> listeners = new LinkedHashSet<TimerListener>();
049        private CamelContext camelContext;
050        private ScheduledExecutorService executorService;
051        private volatile ScheduledFuture<?> task;
052        private long interval = 1000L;
053    
054        public TimerListenerManager() {
055        }
056    
057        @Override
058        public void setCamelContext(CamelContext camelContext) {
059            this.camelContext = camelContext;
060        }
061    
062        @Override
063        public CamelContext getCamelContext() {
064            return camelContext;
065        }
066    
067        /**
068         * Gets the interval in millis.
069         * <p/>
070         * The default interval is 1000 millis.
071         *
072         * @return interval in millis.
073         */
074        public long getInterval() {
075            return interval;
076        }
077    
078        /**
079         * Sets the interval in millis.
080         *
081         * @param interval interval in millis.
082         */
083        public void setInterval(long interval) {
084            this.interval = interval;
085        }
086    
087        @Override
088        public void run() {
089            LOG.trace("Running scheduled TimerListener task");
090    
091            if (!isRunAllowed()) {
092                LOG.debug("TimerListener task cannot run as its not allowed");
093                return;
094            }
095    
096            for (TimerListener listener : listeners) {
097                try {
098                    LOG.trace("Invoking onTimer on {}", listener);
099                    listener.onTimer();
100                } catch (Throwable e) {
101                    // ignore
102                    LOG.debug("Error occurred during onTimer for TimerListener: " + listener + ". This exception will be ignored.", e);
103                }
104            }
105        }
106    
107        /**
108         * Adds the listener.
109         * <p/>
110         * It may be important to implement {@link #equals(Object)} and {@link #hashCode()} for the listener
111         * to ensure that we can remove the same listener again, when invoking remove.
112         * 
113         * @param listener listener
114         */
115        public void addTimerListener(TimerListener listener) {
116            listeners.add(listener);
117            LOG.debug("Added TimerListener: {}", listener);
118        }
119    
120        /**
121         * Removes the listener.
122         * <p/>
123         * It may be important to implement {@link #equals(Object)} and {@link #hashCode()} for the listener
124         * to ensure that we can remove the same listener again, when invoking remove.
125         *
126         * @param listener listener.
127         */
128        public void removeTimerListener(TimerListener listener) {
129            listeners.remove(listener);
130            LOG.debug("Removed TimerListener: {}", listener);
131        }
132    
133        @Override
134        protected void doStart() throws Exception {
135            ObjectHelper.notNull(camelContext, "camelContext", this);
136    
137            // create scheduled thread pool to trigger the task to run every interval
138            executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "ManagementLoadTask");
139            task = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS);
140            LOG.debug("Started scheduled TimerListener task to run with interval {} ms", interval);
141        }
142    
143        @Override
144        protected void doStop() throws Exception {
145            // executor service will be shutdown by CamelContext
146            if (task != null) {
147                task.cancel(true);
148                task = null;
149            }
150        }
151    
152        @Override
153        protected void doShutdown() throws Exception {
154            super.doShutdown();
155            // shutdown thread pool when we are shutting down
156            camelContext.getExecutorServiceManager().shutdownNow(executorService);
157            executorService = null;
158            listeners.clear();
159        }
160    }
161