1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.inputtools.log4j;
20
21 import org.json.simple.JSONObject;
22 import org.apache.log4j.Logger;
23 import org.apache.log4j.PatternLayout;
24 import org.apache.commons.configuration.SubsetConfiguration;
25 import org.apache.hadoop.metrics2.Metric;
26 import org.apache.hadoop.metrics2.MetricsRecord;
27 import org.apache.hadoop.metrics2.MetricsSink;
28 import org.apache.hadoop.metrics2.MetricsTag;
29
30 public class Log4jMetricsSink implements MetricsSink {
31
32 private static final String HOST_PROPERTY = "host";
33 private static final String PORT_PROPERTY = "port";
34 private static final String TIMESTAMP = "timestamp";
35 private static String CONTEXT = "context";
36 private static final String CONTEXT_NAME = "contextName";
37 private static final String RECORD_NAME = "recordName";
38 protected String context = "HadoopMetrics";
39 protected String host = "localhost";
40 protected int port = 9095;
41 protected Logger out = null;
42
43 @Override
44 public void init(SubsetConfiguration conf) {
45 String host = conf.getString(HOST_PROPERTY);
46 if (host != null) {
47 this.host = host;
48 }
49 String port = conf.getString(PORT_PROPERTY);
50 if (port != null) {
51 this.port = Integer.parseInt(port);
52 }
53 String context = conf.getString(CONTEXT);
54 if (context != null) {
55 this.context = context;
56 }
57
58 PatternLayout layout = new PatternLayout("%d{ISO8601} %p %c: %m%n");
59
60 org.apache.log4j.net.SocketAppender appender = new org.apache.log4j.net.SocketAppender(this.host, this.port);
61
62 appender.setName("chukwa.metrics." + this.context);
63 appender.setLayout(layout);
64
65 Logger logger = Logger.getLogger("chukwa.metrics." + this.context);
66 logger.setAdditivity(false);
67 logger.addAppender(appender);
68 appender.activateOptions();
69 out = logger;
70 }
71
72 @Override
73 @SuppressWarnings("unchecked")
74 public void putMetrics(MetricsRecord record) {
75 JSONObject json = new JSONObject();
76 json.put(TIMESTAMP, Long.valueOf(record.timestamp()));
77 json.put(CONTEXT_NAME, record.context());
78 json.put(RECORD_NAME, record.name());
79 for (MetricsTag tag : record.tags()) {
80 json.put(tag.name(), tag.value());
81 }
82 for (Metric metric : record.metrics()) {
83 json.put(metric.name(), metric.value());
84 }
85 out.info(json);
86 }
87
88 @Override
89 public void flush() {
90 }
91 }