View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  /**
20   * Demux parser for system metrics data collected through
21   * org.apache.hadoop.chukwa.datacollection.adaptor.sigar.SystemMetrics.
22   */
23  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
24  
25  import java.util.Calendar;
26  import java.util.Iterator;
27  import java.util.TimeZone;
28  
29  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
30  import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
31  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
32  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
33  import org.apache.hadoop.mapred.OutputCollector;
34  import org.apache.hadoop.mapred.Reporter;
35  import org.json.simple.JSONArray;
36  import org.json.simple.JSONObject;
37  import org.json.simple.JSONValue;
38  
39  @Tables(annotations={
40      @Table(name="SystemMetrics",columnFamily="cpu"),
41      @Table(name="SystemMetrics",columnFamily="system"),
42      @Table(name="SystemMetrics",columnFamily="memory"),
43      @Table(name="SystemMetrics",columnFamily="network"),
44      @Table(name="SystemMetrics",columnFamily="disk")
45      })
46  public class SystemMetrics extends AbstractProcessor {
47  
48    @Override
49    protected void parse(String recordEntry,
50        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
51        throws Throwable {
52      JSONObject json = (JSONObject) JSONValue.parse(recordEntry);
53      long timestamp = ((Long)json.get("timestamp")).longValue();
54      ChukwaRecord record = new ChukwaRecord();
55      Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
56      cal.setTimeInMillis(timestamp);
57      cal.set(Calendar.SECOND, 0);
58      cal.set(Calendar.MILLISECOND, 0);
59      JSONArray cpuList = (JSONArray) json.get("cpu");
60      double combined = 0.0;
61      double user = 0.0;
62      double sys = 0.0;
63      double idle = 0.0;
64      for(int i = 0; i< cpuList.size(); i++) {
65        JSONObject cpu = (JSONObject) cpuList.get(i);
66        Iterator<String> keys = cpu.keySet().iterator();
67        combined = combined + Double.parseDouble(cpu.get("combined").toString());
68        user = user + Double.parseDouble(cpu.get("user").toString());
69        sys = sys + Double.parseDouble(cpu.get("sys").toString());
70        idle = idle + Double.parseDouble(cpu.get("idle").toString());
71        while(keys.hasNext()) {
72          String key = keys.next();
73          record.add(key + "." + i, cpu.get(key).toString());
74        }
75      }
76      combined = combined / cpuList.size();
77      user = user / cpuList.size();
78      sys = sys / cpuList.size();
79      idle = idle / cpuList.size();
80      record.add("combined", Double.toString(combined));
81      record.add("user", Double.toString(user));
82      record.add("idle", Double.toString(idle));    
83      record.add("sys", Double.toString(sys));
84      buildGenericRecord(record, null, cal.getTimeInMillis(), "cpu");
85      output.collect(key, record);    
86  
87      record = new ChukwaRecord();
88      record.add("Uptime", json.get("uptime").toString());
89      JSONArray loadavg = (JSONArray) json.get("loadavg");
90      record.add("LoadAverage.1", loadavg.get(0).toString());
91      record.add("LoadAverage.5", loadavg.get(1).toString());
92      record.add("LoadAverage.15", loadavg.get(2).toString());
93      buildGenericRecord(record, null, cal.getTimeInMillis(), "system");
94      output.collect(key, record);    
95  
96      record = new ChukwaRecord();
97      JSONObject memory = (JSONObject) json.get("memory");
98      Iterator<String> memKeys = memory.keySet().iterator();
99      while(memKeys.hasNext()) {
100       String key = memKeys.next();
101       record.add(key, memory.get(key).toString());
102     }
103     buildGenericRecord(record, null, cal.getTimeInMillis(), "memory");
104     output.collect(key, record);    
105     
106     double rxBytes = 0;
107     double rxDropped = 0;
108     double rxErrors = 0;
109     double rxPackets = 0;
110     double txBytes = 0;
111     double txCollisions = 0;
112     double txErrors = 0;
113     double txPackets = 0;
114     record = new ChukwaRecord();
115     JSONArray netList = (JSONArray) json.get("network");
116     for(int i = 0;i < netList.size(); i++) {
117       JSONObject netIf = (JSONObject) netList.get(i);
118       Iterator<String> keys = netIf.keySet().iterator();
119       while(keys.hasNext()) {
120         String key = keys.next();
121         record.add(key + "." + i, netIf.get(key).toString());
122         if(i!=0) {
123           if(key.equals("RxBytes")) {
124             rxBytes = rxBytes + (Long) netIf.get(key);
125           } else if(key.equals("RxDropped")) {
126             rxDropped = rxDropped + (Long) netIf.get(key);
127           } else if(key.equals("RxErrors")) {          
128             rxErrors = rxErrors + (Long) netIf.get(key);
129           } else if(key.equals("RxPackets")) {
130             rxPackets = rxPackets + (Long) netIf.get(key);
131           } else if(key.equals("TxBytes")) {
132             txBytes = txBytes + (Long) netIf.get(key);
133           } else if(key.equals("TxCollisions")) {
134             txCollisions = txCollisions + (Long) netIf.get(key);
135           } else if(key.equals("TxErrors")) {
136             txErrors = txErrors + (Long) netIf.get(key);
137           } else if(key.equals("TxPackets")) {
138             txPackets = txPackets + (Long) netIf.get(key);
139           }
140         }
141       }
142     }
143     buildGenericRecord(record, null, cal.getTimeInMillis(), "network");
144     record.add("RxBytes", Double.toString(rxBytes));
145     record.add("RxDropped", Double.toString(rxDropped));
146     record.add("RxErrors", Double.toString(rxErrors));
147     record.add("RxPackets", Double.toString(rxPackets));
148     record.add("TxBytes", Double.toString(txBytes));
149     record.add("TxCollisions", Double.toString(txCollisions));
150     record.add("TxErrors", Double.toString(txErrors));
151     record.add("TxPackets", Double.toString(txPackets));
152     output.collect(key, record);    
153     
154     double readBytes = 0;
155     double reads = 0;
156     double writeBytes = 0;
157     double writes = 0;
158     record = new ChukwaRecord();
159     JSONArray diskList = (JSONArray) json.get("disk");
160     for(int i = 0;i < diskList.size(); i++) {
161       JSONObject disk = (JSONObject) diskList.get(i);
162       Iterator<String> keys = disk.keySet().iterator();
163       while(keys.hasNext()) {
164         String key = keys.next();
165         record.add(key + "." + i, disk.get(key).toString());
166         if(key.equals("ReadBytes")) {
167           readBytes = readBytes + (Long) disk.get("ReadBytes");
168         } else if(key.equals("Reads")) {
169           reads = reads + (Long) disk.get("Reads");
170         } else if(key.equals("WriteBytes")) {
171           writeBytes = writeBytes + (Long) disk.get("WriteBytes");
172         } else if(key.equals("Writes")) {
173           writes = writes + (Long) disk.get("Writes");
174         }
175       }
176     }
177     record.add("ReadBytes", Double.toString(readBytes));
178     record.add("Reads", Double.toString(reads));
179     record.add("WriteBytes", Double.toString(writeBytes));
180     record.add("Writes", Double.toString(writes));    
181     buildGenericRecord(record, null, cal.getTimeInMillis(), "disk");
182     output.collect(key, record);
183     
184     record = new ChukwaRecord();
185     record.add("cluster", chunk.getTag("cluster"));
186     buildGenericRecord(record, null, cal.getTimeInMillis(), "tags");
187     output.collect(key, record);
188   }
189 
190 }