001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *     http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package org.apache.hadoop.lib.service.instrumentation;
020    
021    import org.apache.hadoop.classification.InterfaceAudience;
022    import org.apache.hadoop.lib.server.BaseService;
023    import org.apache.hadoop.lib.server.ServiceException;
024    import org.apache.hadoop.lib.service.Instrumentation;
025    import org.apache.hadoop.lib.service.Scheduler;
026    import org.apache.hadoop.util.Time;
027    import org.json.simple.JSONAware;
028    import org.json.simple.JSONObject;
029    import org.json.simple.JSONStreamAware;
030    
031    import java.io.IOException;
032    import java.io.Writer;
033    import java.util.ArrayList;
034    import java.util.LinkedHashMap;
035    import java.util.List;
036    import java.util.Map;
037    import java.util.concurrent.ConcurrentHashMap;
038    import java.util.concurrent.TimeUnit;
039    import java.util.concurrent.atomic.AtomicLong;
040    import java.util.concurrent.locks.Lock;
041    import java.util.concurrent.locks.ReentrantLock;
042    
043    @InterfaceAudience.Private
044    public class InstrumentationService extends BaseService implements Instrumentation {
045      public static final String PREFIX = "instrumentation";
046      public static final String CONF_TIMERS_SIZE = "timers.size";
047    
048      private int timersSize;
049      private Lock counterLock;
050      private Lock timerLock;
051      private Lock variableLock;
052      private Lock samplerLock;
053      private Map<String, Map<String, AtomicLong>> counters;
054      private Map<String, Map<String, Timer>> timers;
055      private Map<String, Map<String, VariableHolder>> variables;
056      private Map<String, Map<String, Sampler>> samplers;
057      private List<Sampler> samplersList;
058      private Map<String, Map<String, ?>> all;
059    
060      public InstrumentationService() {
061        super(PREFIX);
062      }
063    
064      @Override
065      @SuppressWarnings("unchecked")
066      public void init() throws ServiceException {
067        timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10);
068        counterLock = new ReentrantLock();
069        timerLock = new ReentrantLock();
070        variableLock = new ReentrantLock();
071        samplerLock = new ReentrantLock();
072        Map<String, VariableHolder> jvmVariables = new ConcurrentHashMap<String, VariableHolder>();
073        counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>();
074        timers = new ConcurrentHashMap<String, Map<String, Timer>>();
075        variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>();
076        samplers = new ConcurrentHashMap<String, Map<String, Sampler>>();
077        samplersList = new ArrayList<Sampler>();
078        all = new LinkedHashMap<String, Map<String, ?>>();
079        all.put("os-env", System.getenv());
080        all.put("sys-props", (Map<String, ?>) (Map) System.getProperties());
081        all.put("jvm", jvmVariables);
082        all.put("counters", (Map) counters);
083        all.put("timers", (Map) timers);
084        all.put("variables", (Map) variables);
085        all.put("samplers", (Map) samplers);
086    
087        jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
088          @Override
089          public Long getValue() {
090            return Runtime.getRuntime().freeMemory();
091          }
092        }));
093        jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
094          @Override
095          public Long getValue() {
096            return Runtime.getRuntime().maxMemory();
097          }
098        }));
099        jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() {
100          @Override
101          public Long getValue() {
102            return Runtime.getRuntime().totalMemory();
103          }
104        }));
105      }
106    
107      @Override
108      public void postInit() throws ServiceException {
109        Scheduler scheduler = getServer().get(Scheduler.class);
110        if (scheduler != null) {
111          scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS);
112        }
113      }
114    
115      @Override
116      public Class getInterface() {
117        return Instrumentation.class;
118      }
119    
120      @SuppressWarnings("unchecked")
121      private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) {
122        boolean locked = false;
123        try {
124          Map<String, T> groupMap = map.get(group);
125          if (groupMap == null) {
126            lock.lock();
127            locked = true;
128            groupMap = map.get(group);
129            if (groupMap == null) {
130              groupMap = new ConcurrentHashMap<String, T>();
131              map.put(group, groupMap);
132            }
133          }
134          T element = groupMap.get(name);
135          if (element == null) {
136            if (!locked) {
137              lock.lock();
138              locked = true;
139            }
140            element = groupMap.get(name);
141            if (element == null) {
142              try {
143                if (klass == Timer.class) {
144                  element = (T) new Timer(timersSize);
145                } else {
146                  element = klass.newInstance();
147                }
148              } catch (Exception ex) {
149                throw new RuntimeException(ex);
150              }
151              groupMap.put(name, element);
152            }
153          }
154          return element;
155        } finally {
156          if (locked) {
157            lock.unlock();
158          }
159        }
160      }
161    
162      static class Cron implements Instrumentation.Cron {
163        long start;
164        long lapStart;
165        long own;
166        long total;
167    
168        @Override
169        public Cron start() {
170          if (total != 0) {
171            throw new IllegalStateException("Cron already used");
172          }
173          if (start == 0) {
174            start = Time.now();
175            lapStart = start;
176          } else if (lapStart == 0) {
177            lapStart = Time.now();
178          }
179          return this;
180        }
181    
182        @Override
183        public Cron stop() {
184          if (total != 0) {
185            throw new IllegalStateException("Cron already used");
186          }
187          if (lapStart > 0) {
188            own += Time.now() - lapStart;
189            lapStart = 0;
190          }
191          return this;
192        }
193    
194        void end() {
195          stop();
196          total = Time.now() - start;
197        }
198    
199      }
200    
201      static class Timer implements JSONAware, JSONStreamAware {
202        static final int LAST_TOTAL = 0;
203        static final int LAST_OWN = 1;
204        static final int AVG_TOTAL = 2;
205        static final int AVG_OWN = 3;
206    
207        Lock lock = new ReentrantLock();
208        private long[] own;
209        private long[] total;
210        private int last;
211        private boolean full;
212        private int size;
213    
214        public Timer(int size) {
215          this.size = size;
216          own = new long[size];
217          total = new long[size];
218          for (int i = 0; i < size; i++) {
219            own[i] = -1;
220            total[i] = -1;
221          }
222          last = -1;
223        }
224    
225        long[] getValues() {
226          lock.lock();
227          try {
228            long[] values = new long[4];
229            values[LAST_TOTAL] = total[last];
230            values[LAST_OWN] = own[last];
231            int limit = (full) ? size : (last + 1);
232            for (int i = 0; i < limit; i++) {
233              values[AVG_TOTAL] += total[i];
234              values[AVG_OWN] += own[i];
235            }
236            values[AVG_TOTAL] = values[AVG_TOTAL] / limit;
237            values[AVG_OWN] = values[AVG_OWN] / limit;
238            return values;
239          } finally {
240            lock.unlock();
241          }
242        }
243    
244        void addCron(Cron cron) {
245          cron.end();
246          lock.lock();
247          try {
248            last = (last + 1) % size;
249            full = full || last == (size - 1);
250            total[last] = cron.total;
251            own[last] = cron.own;
252          } finally {
253            lock.unlock();
254          }
255        }
256    
257        @SuppressWarnings("unchecked")
258        private JSONObject getJSON() {
259          long[] values = getValues();
260          JSONObject json = new JSONObject();
261          json.put("lastTotal", values[0]);
262          json.put("lastOwn", values[1]);
263          json.put("avgTotal", values[2]);
264          json.put("avgOwn", values[3]);
265          return json;
266        }
267    
268        @Override
269        public String toJSONString() {
270          return getJSON().toJSONString();
271        }
272    
273        @Override
274        public void writeJSONString(Writer out) throws IOException {
275          getJSON().writeJSONString(out);
276        }
277    
278      }
279    
280      @Override
281      public Cron createCron() {
282        return new Cron();
283      }
284    
285      @Override
286      public void incr(String group, String name, long count) {
287        AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters);
288        counter.addAndGet(count);
289      }
290    
291      @Override
292      public void addCron(String group, String name, Instrumentation.Cron cron) {
293        Timer timer = getToAdd(group, name, Timer.class, timerLock, timers);
294        timer.addCron((Cron) cron);
295      }
296    
297      static class VariableHolder<E> implements JSONAware, JSONStreamAware {
298        Variable<E> var;
299    
300        public VariableHolder() {
301        }
302    
303        public VariableHolder(Variable<E> var) {
304          this.var = var;
305        }
306    
307        @SuppressWarnings("unchecked")
308        private JSONObject getJSON() {
309          JSONObject json = new JSONObject();
310          json.put("value", var.getValue());
311          return json;
312        }
313    
314        @Override
315        public String toJSONString() {
316          return getJSON().toJSONString();
317        }
318    
319        @Override
320        public void writeJSONString(Writer out) throws IOException {
321          out.write(toJSONString());
322        }
323    
324      }
325    
326      @Override
327      public void addVariable(String group, String name, Variable<?> variable) {
328        VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables);
329        holder.var = variable;
330      }
331    
332      static class Sampler implements JSONAware, JSONStreamAware {
333        Variable<Long> variable;
334        long[] values;
335        private AtomicLong sum;
336        private int last;
337        private boolean full;
338    
339        void init(int size, Variable<Long> variable) {
340          this.variable = variable;
341          values = new long[size];
342          sum = new AtomicLong();
343          last = 0;
344        }
345    
346        void sample() {
347          int index = last;
348          long valueGoingOut = values[last];
349          full = full || last == (values.length - 1);
350          last = (last + 1) % values.length;
351          values[index] = variable.getValue();
352          sum.addAndGet(-valueGoingOut + values[index]);
353        }
354    
355        double getRate() {
356          return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last));
357        }
358    
359        @SuppressWarnings("unchecked")
360        private JSONObject getJSON() {
361          JSONObject json = new JSONObject();
362          json.put("sampler", getRate());
363          json.put("size", (full) ? values.length : last);
364          return json;
365        }
366    
367        @Override
368        public String toJSONString() {
369          return getJSON().toJSONString();
370        }
371    
372        @Override
373        public void writeJSONString(Writer out) throws IOException {
374          out.write(toJSONString());
375        }
376      }
377    
378      @Override
379      public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) {
380        Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers);
381        samplerLock.lock();
382        try {
383          sampler.init(samplingSize, variable);
384          samplersList.add(sampler);
385        } finally {
386          samplerLock.unlock();
387        }
388      }
389    
390      class SamplersRunnable implements Runnable {
391    
392        @Override
393        public void run() {
394          samplerLock.lock();
395          try {
396            for (Sampler sampler : samplersList) {
397              sampler.sample();
398            }
399          } finally {
400            samplerLock.unlock();
401          }
402        }
403      }
404    
405      @Override
406      public Map<String, Map<String, ?>> getSnapshot() {
407        return all;
408      }
409    
410    
411    }