001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.text.NumberFormat;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.atomic.AtomicInteger;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.AsyncProcessor;
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.support.ServiceSupport;
029    import org.apache.camel.util.AsyncProcessorHelper;
030    import org.apache.camel.util.CamelLogger;
031    import org.apache.camel.util.ObjectHelper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * A logger for logging message throughput.
037     *
038     * @version 
039     */
040    public class ThroughputLogger extends ServiceSupport implements AsyncProcessor {
041        private static final Logger LOG = LoggerFactory.getLogger(ThroughputLogger.class);
042    
043        private final AtomicInteger receivedCounter = new AtomicInteger();
044        private NumberFormat numberFormat = NumberFormat.getNumberInstance();
045        private long groupReceivedCount;
046        private boolean groupActiveOnly;
047        private Integer groupSize;
048        private long groupDelay = 1000;
049        private Long groupInterval;
050        private long startTime;
051        private long groupStartTime;
052        private String action = "Received";
053        private CamelContext camelContext;
054        private ScheduledExecutorService logSchedulerService;
055        private CamelLogger log;
056        private String lastLogMessage;
057        private double rate;
058        private double average;
059    
060        public ThroughputLogger(CamelLogger log) {
061            this.log = log;
062        }
063    
064        public ThroughputLogger(CamelLogger log, Integer groupSize) {
065            this(log);
066            setGroupSize(groupSize);
067        }
068    
069        public ThroughputLogger(CamelLogger log, CamelContext camelContext, Long groupInterval, Long groupDelay, Boolean groupActiveOnly) {
070            this(log);
071            this.camelContext = camelContext;
072            setGroupInterval(groupInterval);
073            setGroupActiveOnly(groupActiveOnly);
074            if (groupDelay != null) {
075                setGroupDelay(groupDelay);
076            }
077        }
078    
079        public void process(Exchange exchange) throws Exception {
080            AsyncProcessorHelper.process(this, exchange);
081        }
082    
083        public boolean process(Exchange exchange, AsyncCallback callback) {
084            if (startTime == 0) {
085                startTime = System.currentTimeMillis();
086            }
087            int receivedCount = receivedCounter.incrementAndGet();
088    
089            //only process if groupSize is set...otherwise we're in groupInterval mode
090            if (groupSize != null) {
091                if (receivedCount % groupSize == 0) {
092                    lastLogMessage = createLogMessage(exchange, receivedCount);
093                    log.log(lastLogMessage);
094                }
095            }
096    
097            callback.done(true);
098            return true;
099        }
100    
101        public Integer getGroupSize() {
102            return groupSize;
103        }
104    
105        public void setGroupSize(Integer groupSize) {
106            if (groupSize == null || groupSize <= 0) {
107                throw new IllegalArgumentException("groupSize must be positive, was: " + groupSize);
108            }
109            this.groupSize = groupSize;
110        }
111    
112        public Long getGroupInterval() {
113            return groupInterval;
114        }
115    
116        public void setGroupInterval(Long groupInterval) {
117            if (groupInterval == null || groupInterval <= 0) {
118                throw new IllegalArgumentException("groupInterval must be positive, was: " + groupInterval);
119            }
120            this.groupInterval = groupInterval;
121        }
122    
123        public long getGroupDelay() {
124            return groupDelay;
125        }
126    
127        public void setGroupDelay(long groupDelay) {
128            this.groupDelay = groupDelay;
129        }
130    
131        public boolean getGroupActiveOnly() {
132            return groupActiveOnly;
133        }
134    
135        private void setGroupActiveOnly(boolean groupActiveOnly) {
136            this.groupActiveOnly = groupActiveOnly;
137        }
138    
139        public NumberFormat getNumberFormat() {
140            return numberFormat;
141        }
142    
143        public void setNumberFormat(NumberFormat numberFormat) {
144            this.numberFormat = numberFormat;
145        }
146    
147        public String getAction() {
148            return action;
149        }
150    
151        public void setAction(String action) {
152            this.action = action;
153        }
154    
155        public void reset() {
156            startTime = 0;
157            receivedCounter.set(0);
158            groupStartTime = 0;
159            groupReceivedCount = 0;
160            average = 0.0d;
161            rate = 0.0d;
162            lastLogMessage = null;
163        }
164    
165        public double getRate() {
166            return rate;
167        }
168    
169        public double getAverage() {
170            return average;
171        }
172    
173        public int getReceivedCounter() {
174            return receivedCounter.get();
175        }
176    
177        public String getLastLogMessage() {
178            return lastLogMessage;
179        }
180    
181        @Override
182        public void doStart() throws Exception {
183            // if an interval was specified, create a background thread
184            if (groupInterval != null) {
185                ObjectHelper.notNull(camelContext, "CamelContext", this);
186    
187                logSchedulerService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ThroughputLogger", 1);
188                Runnable scheduledLogTask = new ScheduledLogTask();
189                LOG.info("Scheduling throughput log to run every " + groupInterval + " millis.");
190                // must use fixed rate to have it trigger at every X interval
191                logSchedulerService.scheduleAtFixedRate(scheduledLogTask, groupDelay, groupInterval, TimeUnit.MILLISECONDS);
192            }
193        }
194    
195        @Override
196        public void doStop() throws Exception {
197            if (logSchedulerService != null) {
198                camelContext.getExecutorServiceManager().shutdown(logSchedulerService);
199                logSchedulerService = null;
200            }
201        }
202    
203        protected String createLogMessage(Exchange exchange, int receivedCount) {
204            long time = System.currentTimeMillis();
205            if (groupStartTime == 0) {
206                groupStartTime = startTime;
207            }
208    
209            rate = messagesPerSecond(groupSize, groupStartTime, time);
210            average = messagesPerSecond(receivedCount, startTime, time);
211    
212            long duration = time - groupStartTime;
213            groupStartTime = time;
214    
215            return getAction() + ": " + receivedCount + " messages so far. Last group took: " + duration
216                    + " millis which is: " + numberFormat.format(rate)
217                    + " messages per second. average: " + numberFormat.format(average);
218        }
219    
220        /**
221         * Background task that logs throughput stats.
222         */
223        private final class ScheduledLogTask implements Runnable {
224    
225            public void run() {
226                // only run if CamelContext has been fully started
227                if (!camelContext.getStatus().isStarted()) {
228                    LOG.trace("ThroughputLogger cannot start because CamelContext({}) has not been started yet", camelContext.getName());
229                    return;
230                }
231    
232                createGroupIntervalLogMessage();
233            }
234        }
235    
236        protected void createGroupIntervalLogMessage() {
237            
238            // this indicates that no messages have been received yet...don't log yet
239            if (startTime == 0) {
240                return;
241            }
242            
243            int receivedCount = receivedCounter.get();
244    
245            // if configured, hide log messages when no new messages have been received
246            if (groupActiveOnly && receivedCount == groupReceivedCount) {
247                return;
248            }
249    
250            long time = System.currentTimeMillis();
251            if (groupStartTime == 0) {
252                groupStartTime = startTime;
253            }
254    
255            long duration = time - groupStartTime;
256            long currentCount = receivedCount - groupReceivedCount;
257            rate = messagesPerSecond(currentCount, groupStartTime, time);
258            average = messagesPerSecond(receivedCount, startTime, time);
259    
260            groupStartTime = time;
261            groupReceivedCount = receivedCount;
262    
263            lastLogMessage = getAction() + ": " + currentCount + " new messages, with total " + receivedCount + " so far. Last group took: " + duration
264                    + " millis which is: " + numberFormat.format(rate)
265                    + " messages per second. average: " + numberFormat.format(average);
266            log.log(lastLogMessage);
267        }
268    
269        protected double messagesPerSecond(long messageCount, long startTime, long endTime) {
270            // timeOneMessage = elapsed / messageCount
271            // messagePerSend = 1000 / timeOneMessage
272            double rate = messageCount * 1000.0;
273            rate /= endTime - startTime;
274            return rate;
275        }
276    
277    }