1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.adaptor.sigar;
20
21 import java.util.HashMap;
22 import java.util.TimerTask;
23
24 import org.apache.hadoop.chukwa.ChunkImpl;
25 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
26 import org.apache.hadoop.chukwa.util.ExceptionUtil;
27 import org.apache.log4j.Logger;
28 import org.hyperic.sigar.CpuInfo;
29 import org.hyperic.sigar.CpuPerc;
30 import org.hyperic.sigar.FileSystem;
31 import org.hyperic.sigar.FileSystemUsage;
32 import org.hyperic.sigar.Mem;
33 import org.hyperic.sigar.NetInterfaceStat;
34 import org.hyperic.sigar.Sigar;
35 import org.hyperic.sigar.Uptime;
36 import org.json.simple.JSONArray;
37 import org.json.simple.JSONObject;
38
39
40
41
42 public class SigarRunner extends TimerTask {
43
44 private static Sigar sigar = new Sigar();
45 private static Logger log = Logger.getLogger(SigarRunner.class);
46 private ChunkReceiver receiver = null;
47 private long sendOffset = 0;
48 private SystemMetrics systemMetrics;
49 private HashMap<String, JSONObject> previousNetworkStats = new HashMap<String, JSONObject>();
50 private HashMap<String, JSONObject> previousDiskStats = new HashMap<String, JSONObject>();
51
52 public SigarRunner(ChunkReceiver dest, SystemMetrics systemMetrics) {
53 receiver = dest;
54 this.systemMetrics = systemMetrics;
55 }
56
57 @SuppressWarnings("unchecked")
58 @Override
59 public void run() {
60 boolean skip = false;
61 CpuInfo[] cpuinfo = null;
62 CpuPerc[] cpuPerc = null;
63 Mem mem = null;
64 FileSystem[] fs = null;
65 String[] netIf = null;
66 Uptime uptime = null;
67 double[] loadavg = null;
68 JSONObject json = new JSONObject();
69 try {
70
71 cpuinfo = sigar.getCpuInfoList();
72 cpuPerc = sigar.getCpuPercList();
73 JSONArray cpuList = new JSONArray();
74 for (int i = 0; i < cpuinfo.length; i++) {
75 JSONObject cpuMap = new JSONObject();
76 cpuMap.putAll(cpuinfo[i].toMap());
77 cpuMap.put("combined", cpuPerc[i].getCombined());
78 cpuMap.put("user", cpuPerc[i].getUser());
79 cpuMap.put("sys", cpuPerc[i].getSys());
80 cpuMap.put("idle", cpuPerc[i].getIdle());
81 cpuMap.put("wait", cpuPerc[i].getWait());
82 cpuMap.put("nice", cpuPerc[i].getNice());
83 cpuMap.put("irq", cpuPerc[i].getIrq());
84 cpuList.add(cpuMap);
85 }
86 sigar.getCpuPerc();
87 json.put("cpu", cpuList);
88
89
90 uptime = sigar.getUptime();
91 json.put("uptime", uptime.getUptime());
92
93
94 loadavg = sigar.getLoadAverage();
95 JSONArray load = new JSONArray();
96 load.add(loadavg[0]);
97 load.add(loadavg[1]);
98 load.add(loadavg[2]);
99 json.put("loadavg", load);
100
101
102 mem = sigar.getMem();
103 JSONObject memMap = new JSONObject();
104 memMap.putAll(mem.toMap());
105 json.put("memory", memMap);
106
107
108 netIf = sigar.getNetInterfaceList();
109 JSONArray netInterfaces = new JSONArray();
110 for (int i = 0; i < netIf.length; i++) {
111 NetInterfaceStat net = new NetInterfaceStat();
112 net = sigar.getNetInterfaceStat(netIf[i]);
113 JSONObject netMap = new JSONObject();
114 netMap.putAll(net.toMap());
115 if(previousNetworkStats.containsKey(netIf[i])) {
116 JSONObject deltaMap = previousNetworkStats.get(netIf[i]);
117 deltaMap.put("RxBytes", Long.parseLong(netMap.get("RxBytes").toString()) - Long.parseLong(deltaMap.get("RxBytes").toString()));
118 deltaMap.put("RxDropped", Long.parseLong(netMap.get("RxDropped").toString()) - Long.parseLong(deltaMap.get("RxDropped").toString()));
119 deltaMap.put("RxErrors", Long.parseLong(netMap.get("RxErrors").toString()) - Long.parseLong(deltaMap.get("RxErrors").toString()));
120 deltaMap.put("RxPackets", Long.parseLong(netMap.get("RxPackets").toString()) - Long.parseLong(deltaMap.get("RxPackets").toString()));
121 deltaMap.put("TxBytes", Long.parseLong(netMap.get("TxBytes").toString()) - Long.parseLong(deltaMap.get("TxBytes").toString()));
122 deltaMap.put("TxCollisions", Long.parseLong(netMap.get("TxCollisions").toString()) - Long.parseLong(deltaMap.get("TxCollisions").toString()));
123 deltaMap.put("TxErrors", Long.parseLong(netMap.get("TxErrors").toString()) - Long.parseLong(deltaMap.get("TxErrors").toString()));
124 deltaMap.put("TxPackets", Long.parseLong(netMap.get("TxPackets").toString()) - Long.parseLong(deltaMap.get("TxPackets").toString()));
125 netInterfaces.add(deltaMap);
126 skip = false;
127 } else {
128 netInterfaces.add(netMap);
129 skip = true;
130 }
131 previousNetworkStats.put(netIf[i], netMap);
132 }
133 json.put("network", netInterfaces);
134
135
136 fs = sigar.getFileSystemList();
137 JSONArray fsList = new JSONArray();
138 for (int i = 0; i < fs.length; i++) {
139 FileSystemUsage usage = sigar.getFileSystemUsage(fs[i].getDirName());
140 JSONObject fsMap = new JSONObject();
141 fsMap.putAll(fs[i].toMap());
142 fsMap.put("ReadBytes", usage.getDiskReadBytes());
143 fsMap.put("Reads", usage.getDiskReads());
144 fsMap.put("WriteBytes", usage.getDiskWriteBytes());
145 fsMap.put("Writes", usage.getDiskWrites());
146 if(previousDiskStats.containsKey(fs[i].getDevName())) {
147 JSONObject deltaMap = previousDiskStats.get(fs[i].getDevName());
148 deltaMap.put("ReadBytes", usage.getDiskReadBytes() - (Long) deltaMap.get("ReadBytes"));
149 deltaMap.put("Reads", usage.getDiskReads() - (Long) deltaMap.get("Reads"));
150 deltaMap.put("WriteBytes", usage.getDiskWriteBytes() - (Long) deltaMap.get("WriteBytes"));
151 deltaMap.put("Writes", usage.getDiskWrites() - (Long) deltaMap.get("Writes"));
152 deltaMap.putAll(fs[i].toMap());
153 fsList.add(deltaMap);
154 skip = false;
155 } else {
156 fsList.add(fsMap);
157 skip = true;
158 }
159 previousDiskStats.put(fs[i].getDevName(), fsMap);
160 }
161 json.put("disk", fsList);
162 json.put("timestamp", System.currentTimeMillis());
163 byte[] data = json.toString().getBytes();
164 sendOffset += data.length;
165 ChunkImpl c = new ChunkImpl("SystemMetrics", "Sigar", sendOffset, data, systemMetrics);
166 if(!skip) {
167 receiver.add(c);
168 }
169 } catch (Exception se) {
170 log.error(ExceptionUtil.getStackTrace(se));
171 }
172 }
173
174 }