View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
20  
21  
22  import java.io.IOException;
23  import java.util.Iterator;
24  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26  import org.apache.hadoop.chukwa.extraction.engine.Record;
27  import org.apache.hadoop.mapred.OutputCollector;
28  import org.apache.hadoop.mapred.Reporter;
29  import org.apache.log4j.Logger;
30  
31  public class JobLogHistoryReduceProcessor implements ReduceProcessor {
32    static Logger log = Logger.getLogger(JobLogHistoryReduceProcessor.class);
33  
34    @Override
35    public String getDataType() {
36      return this.getClass().getName();
37    }
38  
39    @Override
40    public void process(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
41        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
42      try {
43        String action = key.getKey();
44        int count = 0;
45  
46        ChukwaRecord record = null;
47        while (values.hasNext()) {
48          record = values.next();
49          if (record.containsField("START_TIME")) {
50            count++;
51          } else {
52            count--;
53          }
54        }
55        ChukwaRecordKey newKey = new ChukwaRecordKey();
56        newKey.setKey("" + record.getTime());
57        newKey.setReduceType("MSSRGraph");
58        ChukwaRecord newRecord = new ChukwaRecord();
59        newRecord.add(Record.tagsField, record.getValue(Record.tagsField));
60        newRecord.setTime(record.getTime());
61        newRecord.add("count", "" + count);
62        newRecord.add("JOBID", record.getValue("JOBID"));
63        if (action.indexOf("JobLogHist/Map/") >= 0) {
64          newRecord.add("type", "MAP");
65        } else if (action.indexOf("JobLogHist/SHUFFLE/") >= 0) {
66          newRecord.add("type", "SHUFFLE");
67        } else if (action.indexOf("JobLogHist/SORT/") >= 0) {
68          newRecord.add("type", "SORT");
69        } else if (action.indexOf("JobLogHist/REDUCE/") >= 0) {
70          newRecord.add("type", "REDUCE");
71        }
72  
73        output.collect(newKey, newRecord);
74      } catch (IOException e) {
75        log.warn("Unable to collect output in JobLogHistoryReduceProcessor ["
76            + key + "]", e);
77        e.printStackTrace();
78      }
79  
80    }
81  
82  }