001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package org.apache.hadoop.lib.service.instrumentation; 020 021 import org.apache.hadoop.classification.InterfaceAudience; 022 import org.apache.hadoop.lib.server.BaseService; 023 import org.apache.hadoop.lib.server.ServiceException; 024 import org.apache.hadoop.lib.service.Instrumentation; 025 import org.apache.hadoop.lib.service.Scheduler; 026 import org.apache.hadoop.util.Time; 027 import org.json.simple.JSONAware; 028 import org.json.simple.JSONObject; 029 import org.json.simple.JSONStreamAware; 030 031 import java.io.IOException; 032 import java.io.Writer; 033 import java.util.ArrayList; 034 import java.util.LinkedHashMap; 035 import java.util.List; 036 import java.util.Map; 037 import java.util.concurrent.ConcurrentHashMap; 038 import java.util.concurrent.TimeUnit; 039 import java.util.concurrent.atomic.AtomicLong; 040 import java.util.concurrent.locks.Lock; 041 import java.util.concurrent.locks.ReentrantLock; 042 043 @InterfaceAudience.Private 044 public class InstrumentationService extends BaseService implements Instrumentation { 045 public static final String PREFIX = "instrumentation"; 046 public static final String CONF_TIMERS_SIZE = "timers.size"; 047 048 private int timersSize; 049 private Lock counterLock; 050 private Lock timerLock; 051 private Lock variableLock; 052 private Lock samplerLock; 053 private Map<String, Map<String, AtomicLong>> counters; 054 private Map<String, Map<String, Timer>> timers; 055 private Map<String, Map<String, VariableHolder>> variables; 056 private Map<String, Map<String, Sampler>> samplers; 057 private List<Sampler> samplersList; 058 private Map<String, Map<String, ?>> all; 059 060 public InstrumentationService() { 061 super(PREFIX); 062 } 063 064 @Override 065 @SuppressWarnings("unchecked") 066 public void init() throws ServiceException { 067 timersSize = getServiceConfig().getInt(CONF_TIMERS_SIZE, 10); 068 counterLock = new ReentrantLock(); 069 timerLock = new ReentrantLock(); 070 variableLock = new ReentrantLock(); 071 samplerLock = new ReentrantLock(); 072 Map<String, VariableHolder> jvmVariables = new ConcurrentHashMap<String, VariableHolder>(); 073 counters = new ConcurrentHashMap<String, Map<String, AtomicLong>>(); 074 timers = new ConcurrentHashMap<String, Map<String, Timer>>(); 075 variables = new ConcurrentHashMap<String, Map<String, VariableHolder>>(); 076 samplers = new ConcurrentHashMap<String, Map<String, Sampler>>(); 077 samplersList = new ArrayList<Sampler>(); 078 all = new LinkedHashMap<String, Map<String, ?>>(); 079 all.put("os-env", System.getenv()); 080 all.put("sys-props", (Map<String, ?>) (Map) System.getProperties()); 081 all.put("jvm", jvmVariables); 082 all.put("counters", (Map) counters); 083 all.put("timers", (Map) timers); 084 all.put("variables", (Map) variables); 085 all.put("samplers", (Map) samplers); 086 087 jvmVariables.put("free.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 088 @Override 089 public Long getValue() { 090 return Runtime.getRuntime().freeMemory(); 091 } 092 })); 093 jvmVariables.put("max.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 094 @Override 095 public Long getValue() { 096 return Runtime.getRuntime().maxMemory(); 097 } 098 })); 099 jvmVariables.put("total.memory", new VariableHolder<Long>(new Instrumentation.Variable<Long>() { 100 @Override 101 public Long getValue() { 102 return Runtime.getRuntime().totalMemory(); 103 } 104 })); 105 } 106 107 @Override 108 public void postInit() throws ServiceException { 109 Scheduler scheduler = getServer().get(Scheduler.class); 110 if (scheduler != null) { 111 scheduler.schedule(new SamplersRunnable(), 0, 1, TimeUnit.SECONDS); 112 } 113 } 114 115 @Override 116 public Class getInterface() { 117 return Instrumentation.class; 118 } 119 120 @SuppressWarnings("unchecked") 121 private <T> T getToAdd(String group, String name, Class<T> klass, Lock lock, Map<String, Map<String, T>> map) { 122 boolean locked = false; 123 try { 124 Map<String, T> groupMap = map.get(group); 125 if (groupMap == null) { 126 lock.lock(); 127 locked = true; 128 groupMap = map.get(group); 129 if (groupMap == null) { 130 groupMap = new ConcurrentHashMap<String, T>(); 131 map.put(group, groupMap); 132 } 133 } 134 T element = groupMap.get(name); 135 if (element == null) { 136 if (!locked) { 137 lock.lock(); 138 locked = true; 139 } 140 element = groupMap.get(name); 141 if (element == null) { 142 try { 143 if (klass == Timer.class) { 144 element = (T) new Timer(timersSize); 145 } else { 146 element = klass.newInstance(); 147 } 148 } catch (Exception ex) { 149 throw new RuntimeException(ex); 150 } 151 groupMap.put(name, element); 152 } 153 } 154 return element; 155 } finally { 156 if (locked) { 157 lock.unlock(); 158 } 159 } 160 } 161 162 static class Cron implements Instrumentation.Cron { 163 long start; 164 long lapStart; 165 long own; 166 long total; 167 168 @Override 169 public Cron start() { 170 if (total != 0) { 171 throw new IllegalStateException("Cron already used"); 172 } 173 if (start == 0) { 174 start = Time.now(); 175 lapStart = start; 176 } else if (lapStart == 0) { 177 lapStart = Time.now(); 178 } 179 return this; 180 } 181 182 @Override 183 public Cron stop() { 184 if (total != 0) { 185 throw new IllegalStateException("Cron already used"); 186 } 187 if (lapStart > 0) { 188 own += Time.now() - lapStart; 189 lapStart = 0; 190 } 191 return this; 192 } 193 194 void end() { 195 stop(); 196 total = Time.now() - start; 197 } 198 199 } 200 201 static class Timer implements JSONAware, JSONStreamAware { 202 static final int LAST_TOTAL = 0; 203 static final int LAST_OWN = 1; 204 static final int AVG_TOTAL = 2; 205 static final int AVG_OWN = 3; 206 207 Lock lock = new ReentrantLock(); 208 private long[] own; 209 private long[] total; 210 private int last; 211 private boolean full; 212 private int size; 213 214 public Timer(int size) { 215 this.size = size; 216 own = new long[size]; 217 total = new long[size]; 218 for (int i = 0; i < size; i++) { 219 own[i] = -1; 220 total[i] = -1; 221 } 222 last = -1; 223 } 224 225 long[] getValues() { 226 lock.lock(); 227 try { 228 long[] values = new long[4]; 229 values[LAST_TOTAL] = total[last]; 230 values[LAST_OWN] = own[last]; 231 int limit = (full) ? size : (last + 1); 232 for (int i = 0; i < limit; i++) { 233 values[AVG_TOTAL] += total[i]; 234 values[AVG_OWN] += own[i]; 235 } 236 values[AVG_TOTAL] = values[AVG_TOTAL] / limit; 237 values[AVG_OWN] = values[AVG_OWN] / limit; 238 return values; 239 } finally { 240 lock.unlock(); 241 } 242 } 243 244 void addCron(Cron cron) { 245 cron.end(); 246 lock.lock(); 247 try { 248 last = (last + 1) % size; 249 full = full || last == (size - 1); 250 total[last] = cron.total; 251 own[last] = cron.own; 252 } finally { 253 lock.unlock(); 254 } 255 } 256 257 @SuppressWarnings("unchecked") 258 private JSONObject getJSON() { 259 long[] values = getValues(); 260 JSONObject json = new JSONObject(); 261 json.put("lastTotal", values[0]); 262 json.put("lastOwn", values[1]); 263 json.put("avgTotal", values[2]); 264 json.put("avgOwn", values[3]); 265 return json; 266 } 267 268 @Override 269 public String toJSONString() { 270 return getJSON().toJSONString(); 271 } 272 273 @Override 274 public void writeJSONString(Writer out) throws IOException { 275 getJSON().writeJSONString(out); 276 } 277 278 } 279 280 @Override 281 public Cron createCron() { 282 return new Cron(); 283 } 284 285 @Override 286 public void incr(String group, String name, long count) { 287 AtomicLong counter = getToAdd(group, name, AtomicLong.class, counterLock, counters); 288 counter.addAndGet(count); 289 } 290 291 @Override 292 public void addCron(String group, String name, Instrumentation.Cron cron) { 293 Timer timer = getToAdd(group, name, Timer.class, timerLock, timers); 294 timer.addCron((Cron) cron); 295 } 296 297 static class VariableHolder<E> implements JSONAware, JSONStreamAware { 298 Variable<E> var; 299 300 public VariableHolder() { 301 } 302 303 public VariableHolder(Variable<E> var) { 304 this.var = var; 305 } 306 307 @SuppressWarnings("unchecked") 308 private JSONObject getJSON() { 309 JSONObject json = new JSONObject(); 310 json.put("value", var.getValue()); 311 return json; 312 } 313 314 @Override 315 public String toJSONString() { 316 return getJSON().toJSONString(); 317 } 318 319 @Override 320 public void writeJSONString(Writer out) throws IOException { 321 out.write(toJSONString()); 322 } 323 324 } 325 326 @Override 327 public void addVariable(String group, String name, Variable<?> variable) { 328 VariableHolder holder = getToAdd(group, name, VariableHolder.class, variableLock, variables); 329 holder.var = variable; 330 } 331 332 static class Sampler implements JSONAware, JSONStreamAware { 333 Variable<Long> variable; 334 long[] values; 335 private AtomicLong sum; 336 private int last; 337 private boolean full; 338 339 void init(int size, Variable<Long> variable) { 340 this.variable = variable; 341 values = new long[size]; 342 sum = new AtomicLong(); 343 last = 0; 344 } 345 346 void sample() { 347 int index = last; 348 long valueGoingOut = values[last]; 349 full = full || last == (values.length - 1); 350 last = (last + 1) % values.length; 351 values[index] = variable.getValue(); 352 sum.addAndGet(-valueGoingOut + values[index]); 353 } 354 355 double getRate() { 356 return ((double) sum.get()) / ((full) ? values.length : ((last == 0) ? 1 : last)); 357 } 358 359 @SuppressWarnings("unchecked") 360 private JSONObject getJSON() { 361 JSONObject json = new JSONObject(); 362 json.put("sampler", getRate()); 363 json.put("size", (full) ? values.length : last); 364 return json; 365 } 366 367 @Override 368 public String toJSONString() { 369 return getJSON().toJSONString(); 370 } 371 372 @Override 373 public void writeJSONString(Writer out) throws IOException { 374 out.write(toJSONString()); 375 } 376 } 377 378 @Override 379 public void addSampler(String group, String name, int samplingSize, Variable<Long> variable) { 380 Sampler sampler = getToAdd(group, name, Sampler.class, samplerLock, samplers); 381 samplerLock.lock(); 382 try { 383 sampler.init(samplingSize, variable); 384 samplersList.add(sampler); 385 } finally { 386 samplerLock.unlock(); 387 } 388 } 389 390 class SamplersRunnable implements Runnable { 391 392 @Override 393 public void run() { 394 samplerLock.lock(); 395 try { 396 for (Sampler sampler : samplersList) { 397 sampler.sample(); 398 } 399 } finally { 400 samplerLock.unlock(); 401 } 402 } 403 } 404 405 @Override 406 public Map<String, Map<String, ?>> getSnapshot() { 407 return all; 408 } 409 410 411 }