001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.util;
020
021import com.google.common.collect.Maps;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.oozie.action.hadoop.PasswordMasker;
024import org.apache.oozie.service.ConfigurationService;
025import org.apache.oozie.service.Services;
026
027import java.util.ArrayList;
028import java.util.Collection;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.LinkedHashMap;
032import java.util.LinkedHashSet;
033import java.util.List;
034import java.util.Map;
035import java.util.Set;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.ScheduledExecutorService;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicLong;
040import java.util.concurrent.locks.Lock;
041import java.util.concurrent.locks.ReentrantLock;
042
043/**
044 * Instrumentation framework that supports Timers, Counters, Variables and Sampler instrumentation elements. <p> All
045 * instrumentation elements have a group and a name.
046 */
047public class Instrumentation {
048    private ScheduledExecutorService scheduler;
049    private Lock counterLock;
050    private Lock timerLock;
051    private Lock variableLock;
052    private Lock samplerLock;
053    private Map<String, Map<String, Map<String, Object>>> all;
054    private Map<String, Map<String, Element<Long>>> counters;
055    private Map<String, Map<String, Element<Timer>>> timers;
056    private Map<String, Map<String, Element<Variable>>> variables;
057    private Map<String, Map<String, Element<Double>>> samplers;
058
059    /**
060     * Instrumentation constructor.
061     */
062    @SuppressWarnings("unchecked")
063    public Instrumentation() {
064        counterLock = new ReentrantLock();
065        timerLock = new ReentrantLock();
066        variableLock = new ReentrantLock();
067        samplerLock = new ReentrantLock();
068        all = new LinkedHashMap<String, Map<String, Map<String, Object>>>();
069        counters = new ConcurrentHashMap<String, Map<String, Element<Long>>>();
070        timers = new ConcurrentHashMap<String, Map<String, Element<Timer>>>();
071        variables = new ConcurrentHashMap<String, Map<String, Element<Variable>>>();
072        samplers = new ConcurrentHashMap<String, Map<String, Element<Double>>>();
073        all.put("variables", (Map<String, Map<String, Object>>) (Object) variables);
074        all.put("samplers", (Map<String, Map<String, Object>>) (Object) samplers);
075        all.put("counters", (Map<String, Map<String, Object>>) (Object) counters);
076        all.put("timers", (Map<String, Map<String, Object>>) (Object) timers);
077    }
078
079    /**
080     * Set the scheduler instance to handle the samplers.
081     *
082     * @param scheduler scheduler instance.
083     */
084    public void setScheduler(ScheduledExecutorService scheduler) {
085        this.scheduler = scheduler;
086    }
087
088    /**
089     * Cron is a stopwatch that can be started/stopped several times. <p> This class is not thread safe, it does not
090     * need to be. <p> It keeps track of the total time (first start to last stop) and the running time (total time
091     * minus the stopped intervals). <p> Once a Cron is complete it must be added to the corresponding group/name in a
092     * Instrumentation instance.
093     */
094    public static class Cron {
095        private long start;
096        private long end;
097        private long lapStart;
098        private long own;
099        private long total;
100        private boolean running;
101
102        /**
103         * Creates new Cron, stopped, in zero.
104         */
105        public Cron() {
106            running = false;
107        }
108
109        /**
110         * Start the cron. It cannot be already started.
111         */
112        public void start() {
113            if (!running) {
114                if (lapStart == 0) {
115                    lapStart = System.currentTimeMillis();
116                    if (start == 0) {
117                        start = lapStart;
118                        end = start;
119                    }
120                }
121                running = true;
122            }
123        }
124
125        /**
126         * Stops the cron. It cannot be already stopped.
127         */
128        public void stop() {
129            if (running) {
130                end = System.currentTimeMillis();
131                if (start == 0) {
132                    start = end;
133                }
134                total = end - start;
135                if (lapStart > 0) {
136                    own += end - lapStart;
137                    lapStart = 0;
138                }
139                running = false;
140            }
141        }
142
143        /**
144         * Return the start time of the cron. It must be stopped.
145         *
146         * @return the start time of the cron.
147         */
148        public long getStart() {
149            if (running) {
150                throw new IllegalStateException("Timer running");
151            }
152            return start;
153        }
154
155        /**
156         * Return the end time of the cron.  It must be stopped.
157         *
158         * @return the end time of the cron.
159         */
160        public long getEnd() {
161            if (running) {
162                throw new IllegalStateException("Timer running");
163            }
164            return end;
165        }
166
167        /**
168         * Return the total time of the cron. It must be stopped.
169         *
170         * @return the total time of the cron.
171         */
172        public long getTotal() {
173            if (running) {
174                throw new IllegalStateException("Timer running");
175            }
176            return total;
177        }
178
179        /**
180         * Return the own time of the cron. It must be stopped.
181         *
182         * @return the own time of the cron.
183         */
184        public long getOwn() {
185            if (running) {
186                throw new IllegalStateException("Timer running");
187            }
188            return own;
189        }
190
191    }
192
193    /**
194     * Gives access to a snapshot of an Instrumentation element (Counter, Timer). <p> Instrumentation element snapshots
195     * are returned by the {@link Instrumentation#getCounters()} and {@link Instrumentation#getTimers()} ()} methods.
196     */
197    public interface Element<T> {
198
199        /**
200         * Return the snapshot value of the Intrumentation element.
201         *
202         * @return the snapshot value of the Intrumentation element.
203         */
204        T getValue();
205    }
206
207    /**
208     * Counter Instrumentation element.
209     */
210    private static class Counter extends AtomicLong implements Element<Long> {
211
212        /**
213         * Return the counter snapshot.
214         *
215         * @return the counter snapshot.
216         */
217        public Long getValue() {
218            return get();
219        }
220
221        /**
222         * Return the String representation of the counter value.
223         *
224         * @return the String representation of the counter value.
225         */
226        public String toString() {
227            return Long.toString(get());
228        }
229
230    }
231
232    /**
233     * Timer Instrumentation element.
234     */
235    public static class Timer implements Element<Timer> {
236        Lock lock = new ReentrantLock();
237        private long ownTime;
238        private long totalTime;
239        private long ticks;
240        private long ownSquareTime;
241        private long totalSquareTime;
242        private long ownMinTime;
243        private long ownMaxTime;
244        private long totalMinTime;
245        private long totalMaxTime;
246
247        /**
248         * Timer constructor. <p> It is project private for test purposes.
249         */
250        Timer() {
251        }
252
253        /**
254         * Return the String representation of the timer value.
255         *
256         * @return the String representation of the timer value.
257         */
258        public String toString() {
259            return XLog.format("ticks[{0}] totalAvg[{1}] ownAvg[{2}]", ticks, getTotalAvg(), getOwnAvg());
260        }
261
262        /**
263         * Return the timer snapshot.
264         *
265         * @return the timer snapshot.
266         */
267        public Timer getValue() {
268            try {
269                lock.lock();
270                Timer timer = new Timer();
271                timer.ownTime = ownTime;
272                timer.totalTime = totalTime;
273                timer.ticks = ticks;
274                timer.ownSquareTime = ownSquareTime;
275                timer.totalSquareTime = totalSquareTime;
276                timer.ownMinTime = ownMinTime;
277                timer.ownMaxTime = ownMaxTime;
278                timer.totalMinTime = totalMinTime;
279                timer.totalMaxTime = totalMaxTime;
280                return timer;
281            }
282            finally {
283                lock.unlock();
284            }
285        }
286
287        /**
288         * Add a cron to a timer. <p> It is project private for test purposes.
289         *
290         * @param cron Cron to add.
291         */
292        void addCron(Cron cron) {
293            try {
294                lock.lock();
295                long own = cron.getOwn();
296                long total = cron.getTotal();
297                ownTime += own;
298                totalTime += total;
299                ticks++;
300                ownSquareTime += own * own;
301                totalSquareTime += total * total;
302                if (ticks == 1) {
303                    ownMinTime = own;
304                    ownMaxTime = own;
305                    totalMinTime = total;
306                    totalMaxTime = total;
307                }
308                else {
309                    ownMinTime = Math.min(ownMinTime, own);
310                    ownMaxTime = Math.max(ownMaxTime, own);
311                    totalMinTime = Math.min(totalMinTime, total);
312                    totalMaxTime = Math.max(totalMaxTime, total);
313                }
314            }
315            finally {
316                lock.unlock();
317            }
318        }
319
320        /**
321         * Return the own accumulated computing time by the timer.
322         *
323         * @return own accumulated computing time by the timer.
324         */
325        public long getOwn() {
326            return ownTime;
327        }
328
329        /**
330         * Return the total accumulated computing time by the timer.
331         *
332         * @return total accumulated computing time by the timer.
333         */
334        public long getTotal() {
335            return totalTime;
336        }
337
338        /**
339         * Return the number of times a cron was added to the timer.
340         *
341         * @return the number of times a cron was added to the timer.
342         */
343        public long getTicks() {
344            return ticks;
345        }
346
347        /**
348         * Return the sum of the square own times. <p> It can be used to calculate the standard deviation.
349         *
350         * @return the sum of the square own timer.
351         */
352        public long getOwnSquareSum() {
353            return ownSquareTime;
354        }
355
356        /**
357         * Return the sum of the square total times. <p> It can be used to calculate the standard deviation.
358         *
359         * @return the sum of the square own timer.
360         */
361        public long getTotalSquareSum() {
362            return totalSquareTime;
363        }
364
365        /**
366         * Returns the own minimum time.
367         *
368         * @return the own minimum time.
369         */
370        public long getOwnMin() {
371            return ownMinTime;
372        }
373
374        /**
375         * Returns the own maximum time.
376         *
377         * @return the own maximum time.
378         */
379        public long getOwnMax() {
380            return ownMaxTime;
381        }
382
383        /**
384         * Returns the total minimum time.
385         *
386         * @return the total minimum time.
387         */
388        public long getTotalMin() {
389            return totalMinTime;
390        }
391
392        /**
393         * Returns the total maximum time.
394         *
395         * @return the total maximum time.
396         */
397        public long getTotalMax() {
398            return totalMaxTime;
399        }
400
401        /**
402         * Returns the own average time.
403         *
404         * @return the own average time.
405         */
406        public long getOwnAvg() {
407            return (ticks != 0) ? ownTime / ticks : 0;
408        }
409
410        /**
411         * Returns the total average time.
412         *
413         * @return the total average time.
414         */
415        public long getTotalAvg() {
416            return (ticks != 0) ? totalTime / ticks : 0;
417        }
418
419        /**
420         * Returns the total time standard deviation.
421         *
422         * @return the total time standard deviation.
423         */
424        public double getTotalStdDev() {
425            return evalStdDev(ticks, totalTime, totalSquareTime);
426        }
427
428        /**
429         * Returns the own time standard deviation.
430         *
431         * @return the own time standard deviation.
432         */
433        public double getOwnStdDev() {
434            return evalStdDev(ticks, ownTime, ownSquareTime);
435        }
436
437        private double evalStdDev(long n, long sn, long ssn) {
438            return (n < 2) ? -1 : Math.sqrt((n * ssn - sn * sn) / (n * (n - 1)));
439        }
440
441    }
442
443    /**
444     * Add a cron to an instrumentation timer. The timer is created if it does not exists. <p> This method is thread
445     * safe.
446     *
447     * @param group timer group.
448     * @param name timer name.
449     * @param cron cron to add to the timer.
450     */
451    public void addCron(String group, String name, Cron cron) {
452        Map<String, Element<Timer>> map = timers.get(group);
453        if (map == null) {
454            try {
455                timerLock.lock();
456                map = timers.get(group);
457                if (map == null) {
458                    map = new HashMap<String, Element<Timer>>();
459                    timers.put(group, map);
460                }
461            }
462            finally {
463                timerLock.unlock();
464            }
465        }
466        Timer timer = (Timer) map.get(name);
467        if (timer == null) {
468            try {
469                timerLock.lock();
470                timer = (Timer) map.get(name);
471                if (timer == null) {
472                    timer = new Timer();
473                    map.put(name, timer);
474                }
475            }
476            finally {
477                timerLock.unlock();
478            }
479        }
480        timer.addCron(cron);
481    }
482
483    /**
484     * Increment an instrumentation counter. The counter is created if it does not exists. <p> This method is thread
485     * safe.
486     *
487     * @param group counter group.
488     * @param name counter name.
489     * @param count increment to add to the counter.
490     */
491    public void incr(String group, String name, long count) {
492        Map<String, Element<Long>> map = counters.get(group);
493        if (map == null) {
494            try {
495                counterLock.lock();
496                map = counters.get(group);
497                if (map == null) {
498                    map = new HashMap<String, Element<Long>>();
499                    counters.put(group, map);
500                }
501            }
502            finally {
503                counterLock.unlock();
504            }
505        }
506        Counter counter = (Counter) map.get(name);
507        if (counter == null) {
508            try {
509                counterLock.lock();
510                counter = (Counter) map.get(name);
511                if (counter == null) {
512                    counter = new Counter();
513                    map.put(name, counter);
514                }
515            }
516            finally {
517                counterLock.unlock();
518            }
519        }
520        counter.addAndGet(count);
521    }
522
523    /**
524     * Decrement an instrumentation counter. The counter is created if it does not exists. <p> This method is thread
525     * safe.
526     *
527     * @param group counter group.
528     * @param name counter name.
529     * @param count decrement to add to the counter.
530     */
531    public void decr(final String group, final String name, final long count) {
532        incr(group, name, -count);
533    }
534
535    /**
536     * Interface for instrumentation variables. <p> For example a the database service could expose the number of
537     * currently active connections.
538     */
539    public interface Variable<T> extends Element<T> {
540    }
541
542    /**
543     * Add an instrumentation variable. The variable must not exist. <p> This method is thread safe.
544     *
545     * @param group counter group.
546     * @param name counter name.
547     * @param variable variable to add.
548     */
549    @SuppressWarnings("unchecked")
550    public void addVariable(String group, String name, Variable variable) {
551        Map<String, Element<Variable>> map = variables.get(group);
552        if (map == null) {
553            try {
554                variableLock.lock();
555                map = variables.get(group);
556                if (map == null) {
557                    map = new HashMap<String, Element<Variable>>();
558                    variables.put(group, map);
559                }
560            }
561            finally {
562                variableLock.unlock();
563            }
564        }
565        if (map.containsKey(name)) {
566            throw new RuntimeException(XLog.format("Variable group=[{0}] name=[{1}] already defined", group, name));
567        }
568        map.put(name, variable);
569    }
570
571    /**
572     * Return the JVM system properties.
573     *
574     * @return JVM system properties.
575     */
576    public Map<String, String> getJavaSystemProperties() {
577        Map<String, String> unmasked = Maps.fromProperties(System.getProperties());
578        return new PasswordMasker().mask(unmasked);
579    }
580
581    /**
582     * Return the OS environment used to start Oozie.
583     *
584     * @return the OS environment used to start Oozie.
585     */
586    public Map<String, String> getOSEnv() {
587        Map<String, String> unmasked = System.getenv();
588        return new PasswordMasker().mask(unmasked);
589    }
590
591    /**
592     * Return the current system configuration as a Map&lt;String,String&gt;.
593     *
594     * @return the current system configuration as a Map&lt;String,String&gt;.
595     */
596    public Map<String, String> getConfiguration() {
597        final Configuration maskedConf = Services.get().get(ConfigurationService.class).getMaskedConfiguration();
598
599        return new Map<String, String>() {
600            public int size() {
601                return maskedConf.size();
602            }
603
604            public boolean isEmpty() {
605                return maskedConf.size() == 0;
606            }
607
608            public boolean containsKey(Object o) {
609                return maskedConf.get((String) o) != null;
610            }
611
612            public boolean containsValue(Object o) {
613                throw new UnsupportedOperationException();
614            }
615
616            public String get(Object o) {
617                return maskedConf.get((String) o);
618            }
619
620            public String put(String s, String s1) {
621                throw new UnsupportedOperationException();
622            }
623
624            public String remove(Object o) {
625                throw new UnsupportedOperationException();
626            }
627
628            public void putAll(Map<? extends String, ? extends String> map) {
629                throw new UnsupportedOperationException();
630            }
631
632            public void clear() {
633                throw new UnsupportedOperationException();
634            }
635
636            public Set<String> keySet() {
637                Set<String> set = new LinkedHashSet<String>();
638                for (Entry<String, String> entry : maskedConf) {
639                    set.add(entry.getKey());
640                }
641                return set;
642            }
643
644            public Collection<String> values() {
645                Set<String> set = new LinkedHashSet<String>();
646                for (Entry<String, String> entry : maskedConf) {
647                    set.add(entry.getValue());
648                }
649                return set;
650            }
651
652            public Set<Entry<String, String>> entrySet() {
653                Set<Entry<String, String>> set = new LinkedHashSet<Entry<String, String>>();
654                for (Entry<String, String> entry : maskedConf) {
655                    set.add(entry);
656                }
657                return set;
658            }
659        };
660    }
661
662    /**
663     * Return all the counters. <p> This method is thread safe. <p> The counters are live. The counter value is a
664     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
665     *
666     * @return all counters.
667     */
668    public Map<String, Map<String, Element<Long>>> getCounters() {
669        return counters;
670    }
671
672    /**
673     * Return all the timers. <p> This method is thread safe. <p> The timers are live. Once a timer is obtained, all
674     * its values are consistent (they are snapshot at the time the {@link Instrumentation.Element#getValue()} is
675     * invoked.
676     *
677     * @return all counters.
678     */
679    public Map<String, Map<String, Element<Timer>>> getTimers() {
680        return timers;
681    }
682
683    /**
684     * Return all the variables. <p> This method is thread safe. <p> The variables are live. The variable value is a
685     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
686     *
687     * @return all counters.
688     */
689    public Map<String, Map<String, Element<Variable>>> getVariables() {
690        return variables;
691    }
692
693    /**
694     * Return a map containing all variables, counters and timers.
695     *
696     * @return a map containing all variables, counters and timers.
697     */
698    public Map<String, Map<String, Map<String, Object>>> getAll() {
699        return all;
700    }
701
702    /**
703     * Return the string representation of the instrumentation.
704     *
705     * @return the string representation of the instrumentation.
706     */
707    public String toString() {
708        String E = System.getProperty("line.separator");
709        StringBuilder sb = new StringBuilder(4096);
710        for (String element : all.keySet()) {
711            sb.append(element).append(':').append(E);
712            List<String> groups = new ArrayList<String>(all.get(element).keySet());
713            Collections.sort(groups);
714            for (String group : groups) {
715                sb.append("  ").append(group).append(':').append(E);
716                List<String> names = new ArrayList<String>(all.get(element).get(group).keySet());
717                Collections.sort(names);
718                for (String name : names) {
719                    sb.append("    ").append(name).append(": ").append(((Element) all.get(element).
720                            get(group).get(name)).getValue()).append(E);
721                }
722            }
723        }
724        return sb.toString();
725    }
726
727    private static class Sampler implements Element<Double>, Runnable {
728        private Lock lock = new ReentrantLock();
729        private int samplingInterval;
730        private Variable<Long> variable;
731        private long[] values;
732        private int current;
733        private long valuesSum;
734        private double rate;
735
736        public Sampler(int samplingPeriod, int samplingInterval, Variable<Long> variable) {
737            this.samplingInterval = samplingInterval;
738            this.variable = variable;
739            values = new long[samplingPeriod / samplingInterval];
740            valuesSum = 0;
741            current = -1;
742        }
743
744        public int getSamplingInterval() {
745            return samplingInterval;
746        }
747
748        public void run() {
749            try {
750                lock.lock();
751                long newValue = variable.getValue();
752                if (current == -1) {
753                    valuesSum = newValue;
754                    current = 0;
755                    values[current] = newValue;
756                }
757                else {
758                    current = (current + 1) % values.length;
759                    valuesSum = valuesSum - values[current] + newValue;
760                    values[current] = newValue;
761                }
762                rate = ((double) valuesSum) / values.length;
763            }
764            finally {
765                lock.unlock();
766            }
767        }
768
769        public Double getValue() {
770            return rate;
771        }
772    }
773
774    /**
775     * Add a sampling variable. <p> This method is thread safe.
776     *
777     * @param group timer group.
778     * @param name timer name.
779     * @param period sampling period to compute rate.
780     * @param interval sampling frequency, how often the variable is probed.
781     * @param variable variable to sample.
782     */
783    public void addSampler(String group, String name, int period, int interval, Variable<Long> variable) {
784        if (scheduler == null) {
785            throw new IllegalStateException("scheduler not set, cannot sample");
786        }
787        try {
788            samplerLock.lock();
789            Map<String, Element<Double>> map = samplers.get(group);
790            if (map == null) {
791                map = samplers.get(group);
792                if (map == null) {
793                    map = new HashMap<String, Element<Double>>();
794                    samplers.put(group, map);
795                }
796            }
797            if (map.containsKey(name)) {
798                throw new RuntimeException(XLog.format("Sampler group=[{0}] name=[{1}] already defined", group, name));
799            }
800            else {
801                Sampler sampler = new Sampler(period, interval, variable);
802                map.put(name, sampler);
803                scheduler.scheduleAtFixedRate(sampler, 0, sampler.getSamplingInterval(), TimeUnit.SECONDS);
804            }
805        }
806        finally {
807            samplerLock.unlock();
808        }
809    }
810
811    /**
812     * Return all the samplers. <p> This method is thread safe. <p> The samplers are live. The sampler value is a
813     * snapshot at the time the {@link Instrumentation.Element#getValue()} is invoked.
814     *
815     * @return all counters.
816     */
817    public Map<String, Map<String, Element<Double>>> getSamplers() {
818        return samplers;
819    }
820
821    public void stop() {
822
823    }
824
825}