1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.Map.Entry;
24 import java.util.regex.Matcher;
25 import java.util.regex.Pattern;
26
27 import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
28 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
30 import org.apache.hadoop.mapred.OutputCollector;
31 import org.apache.hadoop.mapred.Reporter;
32 import org.apache.log4j.Logger;
33
34 @Table(name="SystemMetrics",columnFamily="Ps")
35 public class Ps extends AbstractProcessor {
36 static Logger log = Logger.getLogger(Ps.class);
37 public static final String reduceType = "Ps";
38
39 @Override
40 protected void parse(String recordEntry,
41 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
42 throws Throwable {
43 LogEntry log = new LogEntry(recordEntry);
44 PsOutput ps = new PsOutput(log.getBody());
45 for (HashMap<String, String> processInfo : ps.getProcessList()) {
46 key = new ChukwaRecordKey();
47 ChukwaRecord record = new ChukwaRecord();
48 this.buildGenericRecord(record, null, log.getDate().getTime(), reduceType);
49 for (Entry<String, String> entry : processInfo.entrySet()) {
50 record.add(entry.getKey(), entry.getValue());
51 }
52 output.collect(key, record);
53 }
54 }
55
56 public static class PsOutput {
57
58
59 private ArrayList<HashMap<String, String>> recordList = new ArrayList<HashMap<String, String>>();
60
61 public PsOutput(String psCmdOutput) throws InvalidPsRecord {
62 if (psCmdOutput == null || psCmdOutput.length() == 0)
63 return;
64
65 String[] lines = psCmdOutput.split("[\n\r]+");
66
67
68 if (lines.length < 2)
69 return;
70
71
72 ArrayList<String> header = new ArrayList<String>();
73 Matcher matcher = Pattern.compile("[^ ^\t]+").matcher(lines[0]);
74 while (matcher.find()) {
75 header.add(matcher.group(0));
76 }
77 if (!header.get(header.size() - 1).equals("CMD")) {
78 throw new InvalidPsRecord("CMD must be the last column");
79 }
80
81
82 boolean foundInitCmd = false;
83 for (int line = 1; line < lines.length; line++) {
84 HashMap<String, String> record = new HashMap<String, String>();
85 recordList.add(record);
86
87 matcher = Pattern.compile("[^ ^\t]+").matcher(lines[line]);
88 for (int index = 0; index < header.size(); index++) {
89 String key = header.get(index);
90 matcher.find();
91 if (!key.equals("CMD")) {
92 String value = matcher.group(0);
93
94
95
96
97
98 if (key.equals("STARTED")) {
99 char c = value.charAt(0);
100 if (c < '0' || c > '9') {
101 matcher.find();
102 value += matcher.group(0);
103 }
104 }
105 record.put(key, value);
106 } else {
107
108
109 String value = lines[line].substring(matcher.start());
110 record.put(key, value);
111 if (!foundInitCmd)
112 foundInitCmd = value.startsWith("init");
113 break;
114 }
115 }
116 }
117 if (!foundInitCmd)
118 throw new InvalidPsRecord("Did not find 'init' cmd");
119 }
120
121 public ArrayList<HashMap<String, String>> getProcessList() {
122 return recordList;
123 }
124 }
125
126 public static class InvalidPsRecord extends Exception {
127 private static final long serialVersionUID = 1L;
128
129 public InvalidPsRecord() {
130 }
131
132 public InvalidPsRecord(String arg0) {
133 super(arg0);
134 }
135
136 public InvalidPsRecord(Throwable arg0) {
137 super(arg0);
138 }
139
140 public InvalidPsRecord(String arg0, Throwable arg1) {
141 super(arg0, arg1);
142 }
143 }
144 }