1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
24
25 import java.util.Calendar;
26 import java.util.Iterator;
27 import java.util.TimeZone;
28
29 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33 import org.apache.hadoop.mapred.OutputCollector;
34 import org.apache.hadoop.mapred.Reporter;
35 import org.json.simple.JSONArray;
36 import org.json.simple.JSONObject;
37 import org.json.simple.JSONValue;
38
39 @Tables(annotations={
40 @Table(name="SystemMetrics",columnFamily="cpu"),
41 @Table(name="SystemMetrics",columnFamily="system"),
42 @Table(name="SystemMetrics",columnFamily="memory"),
43 @Table(name="SystemMetrics",columnFamily="network"),
44 @Table(name="SystemMetrics",columnFamily="disk")
45 })
46 public class SystemMetrics extends AbstractProcessor {
47
48 @Override
49 protected void parse(String recordEntry,
50 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51 throws Throwable {
52 JSONObject json = (JSONObject) JSONValue.parse(recordEntry);
53 long timestamp = ((Long)json.get("timestamp")).longValue();
54 ChukwaRecord record = new ChukwaRecord();
55 Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
56 cal.setTimeInMillis(timestamp);
57 cal.set(Calendar.SECOND, 0);
58 cal.set(Calendar.MILLISECOND, 0);
59 JSONArray cpuList = (JSONArray) json.get("cpu");
60 double combined = 0.0;
61 double user = 0.0;
62 double sys = 0.0;
63 double idle = 0.0;
64 for(int i = 0; i< cpuList.size(); i++) {
65 JSONObject cpu = (JSONObject) cpuList.get(i);
66 Iterator<String> keys = cpu.keySet().iterator();
67 combined = combined + Double.parseDouble(cpu.get("combined").toString());
68 user = user + Double.parseDouble(cpu.get("user").toString());
69 sys = sys + Double.parseDouble(cpu.get("sys").toString());
70 idle = idle + Double.parseDouble(cpu.get("idle").toString());
71 while(keys.hasNext()) {
72 String key = keys.next();
73 record.add(key + "." + i, cpu.get(key).toString());
74 }
75 }
76 combined = combined / cpuList.size();
77 user = user / cpuList.size();
78 sys = sys / cpuList.size();
79 idle = idle / cpuList.size();
80 record.add("combined", Double.toString(combined));
81 record.add("user", Double.toString(user));
82 record.add("idle", Double.toString(idle));
83 record.add("sys", Double.toString(sys));
84 buildGenericRecord(record, null, cal.getTimeInMillis(), "cpu");
85 output.collect(key, record);
86
87 record = new ChukwaRecord();
88 record.add("Uptime", json.get("uptime").toString());
89 JSONArray loadavg = (JSONArray) json.get("loadavg");
90 record.add("LoadAverage.1", loadavg.get(0).toString());
91 record.add("LoadAverage.5", loadavg.get(1).toString());
92 record.add("LoadAverage.15", loadavg.get(2).toString());
93 buildGenericRecord(record, null, cal.getTimeInMillis(), "system");
94 output.collect(key, record);
95
96 record = new ChukwaRecord();
97 JSONObject memory = (JSONObject) json.get("memory");
98 Iterator<String> memKeys = memory.keySet().iterator();
99 while(memKeys.hasNext()) {
100 String key = memKeys.next();
101 record.add(key, memory.get(key).toString());
102 }
103 buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
104 output.collect(key, record);
105
106 double rxBytes = 0;
107 double rxDropped = 0;
108 double rxErrors = 0;
109 double rxPackets = 0;
110 double txBytes = 0;
111 double txCollisions = 0;
112 double txErrors = 0;
113 double txPackets = 0;
114 record = new ChukwaRecord();
115 JSONArray netList = (JSONArray) json.get("network");
116 for(int i = 0;i < netList.size(); i++) {
117 JSONObject netIf = (JSONObject) netList.get(i);
118 Iterator<String> keys = netIf.keySet().iterator();
119 while(keys.hasNext()) {
120 String key = keys.next();
121 record.add(key + "." + i, netIf.get(key).toString());
122 if(i!=0) {
123 if(key.equals("RxBytes")) {
124 rxBytes = rxBytes + (Long) netIf.get(key);
125 } else if(key.equals("RxDropped")) {
126 rxDropped = rxDropped + (Long) netIf.get(key);
127 } else if(key.equals("RxErrors")) {
128 rxErrors = rxErrors + (Long) netIf.get(key);
129 } else if(key.equals("RxPackets")) {
130 rxPackets = rxPackets + (Long) netIf.get(key);
131 } else if(key.equals("TxBytes")) {
132 txBytes = txBytes + (Long) netIf.get(key);
133 } else if(key.equals("TxCollisions")) {
134 txCollisions = txCollisions + (Long) netIf.get(key);
135 } else if(key.equals("TxErrors")) {
136 txErrors = txErrors + (Long) netIf.get(key);
137 } else if(key.equals("TxPackets")) {
138 txPackets = txPackets + (Long) netIf.get(key);
139 }
140 }
141 }
142 }
143 buildGenericRecord(record, null, cal.getTimeInMillis(), "network");
144 record.add("RxBytes", Double.toString(rxBytes));
145 record.add("RxDropped", Double.toString(rxDropped));
146 record.add("RxErrors", Double.toString(rxErrors));
147 record.add("RxPackets", Double.toString(rxPackets));
148 record.add("TxBytes", Double.toString(txBytes));
149 record.add("TxCollisions", Double.toString(txCollisions));
150 record.add("TxErrors", Double.toString(txErrors));
151 record.add("TxPackets", Double.toString(txPackets));
152 output.collect(key, record);
153
154 double readBytes = 0;
155 double reads = 0;
156 double writeBytes = 0;
157 double writes = 0;
158 record = new ChukwaRecord();
159 JSONArray diskList = (JSONArray) json.get("disk");
160 for(int i = 0;i < diskList.size(); i++) {
161 JSONObject disk = (JSONObject) diskList.get(i);
162 Iterator<String> keys = disk.keySet().iterator();
163 while(keys.hasNext()) {
164 String key = keys.next();
165 record.add(key + "." + i, disk.get(key).toString());
166 if(key.equals("ReadBytes")) {
167 readBytes = readBytes + (Long) disk.get("ReadBytes");
168 } else if(key.equals("Reads")) {
169 reads = reads + (Long) disk.get("Reads");
170 } else if(key.equals("WriteBytes")) {
171 writeBytes = writeBytes + (Long) disk.get("WriteBytes");
172 } else if(key.equals("Writes")) {
173 writes = writes + (Long) disk.get("Writes");
174 }
175 }
176 }
177 record.add("ReadBytes", Double.toString(readBytes));
178 record.add("Reads", Double.toString(reads));
179 record.add("WriteBytes", Double.toString(writeBytes));
180 record.add("Writes", Double.toString(writes));
181 buildGenericRecord(record, null, cal.getTimeInMillis(), "disk");
182 output.collect(key, record);
183
184 record = new ChukwaRecord();
185 record.add("cluster", chunk.getTag("cluster"));
186 buildGenericRecord(record, null, cal.getTimeInMillis(), "tags");
187 output.collect(key, record);
188 }
189
190 }