View Javadoc

1   /*
2    * AbstractMetricsContext.java
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.metrics.spi;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.Timer;
32  import java.util.TimerTask;
33  import java.util.TreeMap;
34  import java.util.Map.Entry;
35  
36  import org.apache.hadoop.metrics.ContextFactory;
37  import org.apache.hadoop.metrics.MetricsContext;
38  import org.apache.hadoop.metrics.MetricsException;
39  import org.apache.hadoop.metrics.MetricsRecord;
40  import org.apache.hadoop.metrics.Updater;
41  
42  /**
43   * The main class of the Service Provider Interface.  This class should be
44   * extended in order to integrate the Metrics API with a specific metrics
45   * client library. <p/>
46   *
47   * This class implements the internal table of metric data, and the timer
48   * on which data is to be sent to the metrics system.  Subclasses must
49   * override the abstract <code>emitRecord</code> method in order to transmit
50   * the data. <p/>
51   */
52  public abstract class AbstractMetricsContext implements MetricsContext {
53      
54    private int period = MetricsContext.DEFAULT_PERIOD;
55    private Timer timer = null;
56    private boolean computeRate = true;    
57    private Set<Updater> updaters = new HashSet<Updater>(1);
58    private volatile boolean isMonitoring = false;
59      
60    private ContextFactory factory = null;
61    private String contextName = null;
62      
63    static class TagMap extends TreeMap<String,Object> {
64      private static final long serialVersionUID = 3546309335061952993L;
65      TagMap() {
66        super();
67      }
68      TagMap(TagMap orig) {
69        super(orig);
70      }
71      /**
72       * Returns true if this tagmap contains every tag in other.
73       */
74      public boolean containsAll(TagMap other) {
75        for (Map.Entry<String,Object> entry : other.entrySet()) {
76          Object value = get(entry.getKey());
77          if (value == null || !value.equals(entry.getValue())) {
78            // either key does not exist here, or the value is different
79            return false;
80          }
81        }
82        return true;
83      }
84    }
85    
86    static class MetricMap extends TreeMap<String,Number> {
87      private static final long serialVersionUID = -7495051861141631609L;
88    }
89              
90    static class RecordMap extends HashMap<TagMap,MetricMap> {
91      private static final long serialVersionUID = 259835619700264611L;
92    }
93      
94    private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
95      
96  
97    /**
98     * Creates a new instance of AbstractMetricsContext
99     */
100   protected AbstractMetricsContext() {
101   }
102     
103   /**
104    * Initializes the context.
105    */
106   public void init(String contextName, ContextFactory factory) 
107   {
108     this.contextName = contextName;
109     this.factory = factory;
110   }
111     
112   /**
113    * Convenience method for subclasses to access factory attributes.
114    */
115   protected String getAttribute(String attributeName) {
116     String factoryAttribute = contextName + "." + attributeName;
117     return (String) factory.getAttribute(factoryAttribute);  
118   }
119     
120   /**
121    * Returns an attribute-value map derived from the factory attributes
122    * by finding all factory attributes that begin with 
123    * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
124    * those attributes with the contextName and tableName stripped off.
125    */
126   protected Map<String,String> getAttributeTable(String tableName) {
127     String prefix = contextName + "." + tableName + ".";
128     Map<String,String> result = new HashMap<String,String>();
129     for (String attributeName : factory.getAttributeNames()) {
130       if (attributeName.startsWith(prefix)) {
131         String name = attributeName.substring(prefix.length());
132         String value = (String) factory.getAttribute(attributeName);
133         result.put(name, value);
134       }
135     }
136     return result;
137   }
138     
139   /**
140    * Returns the context name.
141    */
142   public String getContextName() {
143     return contextName;
144   }
145     
146   /**
147    * Returns the factory by which this context was created.
148    */
149   public ContextFactory getContextFactory() {
150     return factory;
151   }
152     
153   /**
154    * Starts or restarts monitoring, the emitting of metrics records.
155    */
156   public synchronized void startMonitoring()
157     throws IOException {
158     if (!isMonitoring) {
159       startTimer();
160       isMonitoring = true;
161     }
162   }
163     
164   /**
165    * Stops monitoring.  This does not free buffered data. 
166    * @see #close()
167    */
168   public synchronized void stopMonitoring() {
169     if (isMonitoring) {
170       stopTimer();
171       isMonitoring = false;
172     }
173   }
174     
175   /**
176    * Returns true if monitoring is currently in progress.
177    */
178   public boolean isMonitoring() {
179     return isMonitoring;
180   }
181     
182   /**
183    * Stops monitoring and frees buffered data, returning this
184    * object to its initial state.  
185    */
186   public synchronized void close() {
187     stopMonitoring();
188     clearUpdaters();
189   } 
190     
191   /**
192    * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
193    * Throws an exception if the metrics implementation is configured with a fixed
194    * set of record names and <code>recordName</code> is not in that set.
195    * 
196    * @param recordName the name of the record
197    * @throws MetricsException if recordName conflicts with configuration data
198    */
199   public final synchronized MetricsRecord createRecord(String recordName) {
200     if (bufferedData.get(recordName) == null) {
201       bufferedData.put(recordName, new RecordMap());
202     }
203     return newRecord(recordName);
204   }
205     
206   /**
207    * Subclasses should override this if they subclass MetricsRecordImpl.
208    * @param recordName the name of the record
209    * @return newly created instance of MetricsRecordImpl or subclass
210    */
211   protected MetricsRecord newRecord(String recordName) {
212     return new MetricsRecordImpl(recordName, this);
213   }
214     
215   /**
216    * Registers a callback to be called at time intervals determined by
217    * the configuration.
218    *
219    * @param updater object to be run periodically; it should update
220    * some metrics records 
221    */
222   public synchronized void registerUpdater(final Updater updater) {
223     if (!updaters.contains(updater)) {
224       updaters.add(updater);
225     }
226   }
227     
228   /**
229    * Removes a callback, if it exists.
230    *
231    * @param updater object to be removed from the callback list
232    */
233   public synchronized void unregisterUpdater(Updater updater) {
234     updaters.remove(updater);
235   }
236     
237   private synchronized void clearUpdaters() {
238     updaters.clear();
239   }
240     
241   /**
242    * Starts timer if it is not already started
243    */
244   private synchronized void startTimer() {
245     if (timer == null) {
246       timer = new Timer("Timer thread for monitoring " + getContextName(), 
247                         true);
248       TimerTask task = new TimerTask() {
249           public void run() {
250             try {
251               timerEvent();
252             }
253             catch (IOException ioe) {
254               ioe.printStackTrace();
255             }
256           }
257         };
258       long millis = period * 1000;
259       timer.scheduleAtFixedRate(task, millis, millis);
260     }
261   }
262     
263   /**
264    * Stops timer if it is running
265    */
266   private synchronized void stopTimer() {
267     if (timer != null) {
268       timer.cancel();
269       timer = null;
270     }
271   }
272     
273   /**
274    * Timer callback.
275    */
276   private void timerEvent() throws IOException {
277     if (isMonitoring) {
278       Collection<Updater> myUpdaters;
279       synchronized (this) {
280         myUpdaters = new ArrayList<Updater>(updaters);
281       }     
282       // Run all the registered updates without holding a lock
283       // on this context
284       for (Updater updater : myUpdaters) {
285         try {
286           updater.doUpdates(this);
287         }
288         catch (Throwable throwable) {
289           throwable.printStackTrace();
290         }
291       }
292       emitRecords();
293     }
294   }
295     
296   /**
297    *  Emits the records.
298    */
299   private synchronized void emitRecords() throws IOException {
300     for (String recordName : bufferedData.keySet()) {
301       RecordMap recordMap = bufferedData.get(recordName);
302       synchronized (recordMap) {
303         Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
304         for (Entry<TagMap, MetricMap> entry : entrySet) {
305           OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
306           emitRecord(contextName, recordName, outRec);
307         }
308       }
309     }
310     flush();
311   }
312 
313   /**
314    * Sends a record to the metrics system.
315    */
316   protected abstract void emitRecord(String contextName, String recordName, 
317                                      OutputRecord outRec) throws IOException;
318     
319   /**
320    * Called each period after all records have been emitted, this method does nothing.
321    * Subclasses may override it in order to perform some kind of flush.
322    */
323   protected void flush() throws IOException {
324   }
325     
326   /**
327    * Called by MetricsRecordImpl.update().  Creates or updates a row in
328    * the internal table of metric data.
329    */
330   protected void update(MetricsRecordImpl record) {
331     
332     String recordName = record.getRecordName();
333     TagMap tagTable = record.getTagTable();
334     Map<String,MetricValue> metricUpdates = record.getMetricTable();
335         
336     RecordMap recordMap = getRecordMap(recordName);
337     synchronized (recordMap) {
338       MetricMap metricMap = recordMap.get(tagTable);
339       if (metricMap == null) {
340         metricMap = new MetricMap();
341         TagMap tagMap = new TagMap(tagTable); // clone tags
342         recordMap.put(tagMap, metricMap);
343       }
344 
345       Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
346       for (Entry<String, MetricValue> entry : entrySet) {
347         String metricName = entry.getKey ();
348         MetricValue updateValue = entry.getValue ();
349         Number updateNumber = updateValue.getNumber();
350         Number currentNumber = metricMap.get(metricName);
351         if (currentNumber == null || updateValue.isAbsolute()) {
352           metricMap.put(metricName, updateNumber);
353         }
354         else {
355           Number newNumber = sum(updateNumber, currentNumber);
356           metricMap.put(metricName, newNumber);
357           metricMap.put(metricName+"_raw", updateNumber);
358           if (computeRate ) {
359               double rate = updateNumber.doubleValue() * 60.0 / period;
360               metricMap.put(metricName+"_rate", rate);
361             }
362           computeRate = true;
363         }
364       }
365     }
366   }
367     
368   private synchronized RecordMap getRecordMap(String recordName) {
369     return bufferedData.get(recordName);
370   }
371     
372   /**
373    * Adds two numbers, coercing the second to the type of the first.
374    *
375    */
376   private Number sum(Number a, Number b) {
377     if (a instanceof Integer) {
378       return Integer.valueOf(a.intValue() + b.intValue());
379     }
380     else if (a instanceof Float) {
381       return new Float(a.floatValue() + b.floatValue());
382     }
383     else if (a instanceof Short) {
384       return Short.valueOf((short)(a.shortValue() + b.shortValue()));
385     }
386     else if (a instanceof Byte) {
387       return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
388     }
389     else if (a instanceof Long) {
390       return Long.valueOf((a.longValue() + b.longValue()));
391     }
392     else {
393       // should never happen
394       throw new MetricsException("Invalid number type");
395     }
396             
397   }
398     
399   /**
400    * Called by MetricsRecordImpl.remove().  Removes all matching rows in
401    * the internal table of metric data.  A row matches if it has the same
402    * tag names and values as record, but it may also have additional
403    * tags.
404    */    
405   protected void remove(MetricsRecordImpl record) {
406     String recordName = record.getRecordName();
407     TagMap tagTable = record.getTagTable();
408         
409     RecordMap recordMap = getRecordMap(recordName);
410     synchronized (recordMap) {
411       Iterator<TagMap> it = recordMap.keySet().iterator();
412       while (it.hasNext()) {
413         TagMap rowTags = it.next();
414         if (rowTags.containsAll(tagTable)) {
415           it.remove();
416         }
417       }
418     }
419   }
420     
421   /**
422    * Returns the timer period.
423    */
424   public int getPeriod() {
425     return period;
426   }
427     
428   /**
429    * Sets the timer period
430    */
431   protected void setPeriod(int period) {
432     this.period = period;
433   }
434 }