1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.metrics.spi;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.Iterator;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.Timer;
32 import java.util.TimerTask;
33 import java.util.TreeMap;
34 import java.util.Map.Entry;
35
36 import org.apache.hadoop.metrics.ContextFactory;
37 import org.apache.hadoop.metrics.MetricsContext;
38 import org.apache.hadoop.metrics.MetricsException;
39 import org.apache.hadoop.metrics.MetricsRecord;
40 import org.apache.hadoop.metrics.Updater;
41
42
43
44
45
46
47
48
49
50
51
52 public abstract class AbstractMetricsContext implements MetricsContext {
53
54 private int period = MetricsContext.DEFAULT_PERIOD;
55 private Timer timer = null;
56 private boolean computeRate = true;
57 private Set<Updater> updaters = new HashSet<Updater>(1);
58 private volatile boolean isMonitoring = false;
59
60 private ContextFactory factory = null;
61 private String contextName = null;
62
63 static class TagMap extends TreeMap<String,Object> {
64 private static final long serialVersionUID = 3546309335061952993L;
65 TagMap() {
66 super();
67 }
68 TagMap(TagMap orig) {
69 super(orig);
70 }
71
72
73
74 public boolean containsAll(TagMap other) {
75 for (Map.Entry<String,Object> entry : other.entrySet()) {
76 Object value = get(entry.getKey());
77 if (value == null || !value.equals(entry.getValue())) {
78
79 return false;
80 }
81 }
82 return true;
83 }
84 }
85
86 static class MetricMap extends TreeMap<String,Number> {
87 private static final long serialVersionUID = -7495051861141631609L;
88 }
89
90 static class RecordMap extends HashMap<TagMap,MetricMap> {
91 private static final long serialVersionUID = 259835619700264611L;
92 }
93
94 private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
95
96
97
98
99
100 protected AbstractMetricsContext() {
101 }
102
103
104
105
106 public void init(String contextName, ContextFactory factory)
107 {
108 this.contextName = contextName;
109 this.factory = factory;
110 }
111
112
113
114
115 protected String getAttribute(String attributeName) {
116 String factoryAttribute = contextName + "." + attributeName;
117 return (String) factory.getAttribute(factoryAttribute);
118 }
119
120
121
122
123
124
125
126 protected Map<String,String> getAttributeTable(String tableName) {
127 String prefix = contextName + "." + tableName + ".";
128 Map<String,String> result = new HashMap<String,String>();
129 for (String attributeName : factory.getAttributeNames()) {
130 if (attributeName.startsWith(prefix)) {
131 String name = attributeName.substring(prefix.length());
132 String value = (String) factory.getAttribute(attributeName);
133 result.put(name, value);
134 }
135 }
136 return result;
137 }
138
139
140
141
142 public String getContextName() {
143 return contextName;
144 }
145
146
147
148
149 public ContextFactory getContextFactory() {
150 return factory;
151 }
152
153
154
155
156 public synchronized void startMonitoring()
157 throws IOException {
158 if (!isMonitoring) {
159 startTimer();
160 isMonitoring = true;
161 }
162 }
163
164
165
166
167
168 public synchronized void stopMonitoring() {
169 if (isMonitoring) {
170 stopTimer();
171 isMonitoring = false;
172 }
173 }
174
175
176
177
178 public boolean isMonitoring() {
179 return isMonitoring;
180 }
181
182
183
184
185
186 public synchronized void close() {
187 stopMonitoring();
188 clearUpdaters();
189 }
190
191
192
193
194
195
196
197
198
199 public final synchronized MetricsRecord createRecord(String recordName) {
200 if (bufferedData.get(recordName) == null) {
201 bufferedData.put(recordName, new RecordMap());
202 }
203 return newRecord(recordName);
204 }
205
206
207
208
209
210
211 protected MetricsRecord newRecord(String recordName) {
212 return new MetricsRecordImpl(recordName, this);
213 }
214
215
216
217
218
219
220
221
222 public synchronized void registerUpdater(final Updater updater) {
223 if (!updaters.contains(updater)) {
224 updaters.add(updater);
225 }
226 }
227
228
229
230
231
232
233 public synchronized void unregisterUpdater(Updater updater) {
234 updaters.remove(updater);
235 }
236
237 private synchronized void clearUpdaters() {
238 updaters.clear();
239 }
240
241
242
243
244 private synchronized void startTimer() {
245 if (timer == null) {
246 timer = new Timer("Timer thread for monitoring " + getContextName(),
247 true);
248 TimerTask task = new TimerTask() {
249 public void run() {
250 try {
251 timerEvent();
252 }
253 catch (IOException ioe) {
254 ioe.printStackTrace();
255 }
256 }
257 };
258 long millis = period * 1000;
259 timer.scheduleAtFixedRate(task, millis, millis);
260 }
261 }
262
263
264
265
266 private synchronized void stopTimer() {
267 if (timer != null) {
268 timer.cancel();
269 timer = null;
270 }
271 }
272
273
274
275
276 private void timerEvent() throws IOException {
277 if (isMonitoring) {
278 Collection<Updater> myUpdaters;
279 synchronized (this) {
280 myUpdaters = new ArrayList<Updater>(updaters);
281 }
282
283
284 for (Updater updater : myUpdaters) {
285 try {
286 updater.doUpdates(this);
287 }
288 catch (Throwable throwable) {
289 throwable.printStackTrace();
290 }
291 }
292 emitRecords();
293 }
294 }
295
296
297
298
299 private synchronized void emitRecords() throws IOException {
300 for (String recordName : bufferedData.keySet()) {
301 RecordMap recordMap = bufferedData.get(recordName);
302 synchronized (recordMap) {
303 Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
304 for (Entry<TagMap, MetricMap> entry : entrySet) {
305 OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
306 emitRecord(contextName, recordName, outRec);
307 }
308 }
309 }
310 flush();
311 }
312
313
314
315
316 protected abstract void emitRecord(String contextName, String recordName,
317 OutputRecord outRec) throws IOException;
318
319
320
321
322
323 protected void flush() throws IOException {
324 }
325
326
327
328
329
330 protected void update(MetricsRecordImpl record) {
331
332 String recordName = record.getRecordName();
333 TagMap tagTable = record.getTagTable();
334 Map<String,MetricValue> metricUpdates = record.getMetricTable();
335
336 RecordMap recordMap = getRecordMap(recordName);
337 synchronized (recordMap) {
338 MetricMap metricMap = recordMap.get(tagTable);
339 if (metricMap == null) {
340 metricMap = new MetricMap();
341 TagMap tagMap = new TagMap(tagTable);
342 recordMap.put(tagMap, metricMap);
343 }
344
345 Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
346 for (Entry<String, MetricValue> entry : entrySet) {
347 String metricName = entry.getKey ();
348 MetricValue updateValue = entry.getValue ();
349 Number updateNumber = updateValue.getNumber();
350 Number currentNumber = metricMap.get(metricName);
351 if (currentNumber == null || updateValue.isAbsolute()) {
352 metricMap.put(metricName, updateNumber);
353 }
354 else {
355 Number newNumber = sum(updateNumber, currentNumber);
356 metricMap.put(metricName, newNumber);
357 metricMap.put(metricName+"_raw", updateNumber);
358 if (computeRate ) {
359 double rate = updateNumber.doubleValue() * 60.0 / period;
360 metricMap.put(metricName+"_rate", rate);
361 }
362 computeRate = true;
363 }
364 }
365 }
366 }
367
368 private synchronized RecordMap getRecordMap(String recordName) {
369 return bufferedData.get(recordName);
370 }
371
372
373
374
375
376 private Number sum(Number a, Number b) {
377 if (a instanceof Integer) {
378 return Integer.valueOf(a.intValue() + b.intValue());
379 }
380 else if (a instanceof Float) {
381 return new Float(a.floatValue() + b.floatValue());
382 }
383 else if (a instanceof Short) {
384 return Short.valueOf((short)(a.shortValue() + b.shortValue()));
385 }
386 else if (a instanceof Byte) {
387 return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
388 }
389 else if (a instanceof Long) {
390 return Long.valueOf((a.longValue() + b.longValue()));
391 }
392 else {
393
394 throw new MetricsException("Invalid number type");
395 }
396
397 }
398
399
400
401
402
403
404
405 protected void remove(MetricsRecordImpl record) {
406 String recordName = record.getRecordName();
407 TagMap tagTable = record.getTagTable();
408
409 RecordMap recordMap = getRecordMap(recordName);
410 synchronized (recordMap) {
411 Iterator<TagMap> it = recordMap.keySet().iterator();
412 while (it.hasNext()) {
413 TagMap rowTags = it.next();
414 if (rowTags.containsAll(tagTable)) {
415 it.remove();
416 }
417 }
418 }
419 }
420
421
422
423
424 public int getPeriod() {
425 return period;
426 }
427
428
429
430
431 protected void setPeriod(int period) {
432 this.period = period;
433 }
434 }