View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  
24  import org.apache.hadoop.chukwa.extraction.engine.Record;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
26  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
27  import org.apache.hadoop.mapred.OutputCollector;
28  import org.apache.hadoop.mapred.Reporter;
29  import org.apache.log4j.Logger;
30  
31  public class ClientTrace implements ReduceProcessor {
32  
33    static final Logger log = Logger.getLogger(ClientTrace.class);
34  
35    @Override
36    public String getDataType() {
37      return this.getClass().getName();
38    }
39  
40    @Override
41    public void process(ChukwaRecordKey key,
42              Iterator<ChukwaRecord> values,
43              OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
44              Reporter reporter) {
45      try {
46        long bytes = 0L;
47        ChukwaRecord rec = null;
48        while (values.hasNext()) {
49          /* aggregate bytes for current key */
50          rec = values.next();
51          bytes += Long.valueOf(rec.getValue("bytes"));
52          
53          /* output raw values to different data type for uses which
54           * require detailed per-operation data */
55          ChukwaRecordKey detailed_key = new ChukwaRecordKey();
56          String [] k = key.getKey().split("/");
57          String full_timestamp = null;
58          full_timestamp = rec.getValue("actual_time");
59          detailed_key.setReduceType("ClientTraceDetailed");
60          detailed_key.setKey(k[0]+"/"+k[1]+"_"+k[2]+"/"+full_timestamp);
61          output.collect(detailed_key, rec);
62        }
63        if (null == rec) {
64          return;
65        }
66        ChukwaRecord emit = new ChukwaRecord();
67        emit.add(Record.tagsField, rec.getValue(Record.tagsField));
68        emit.add(Record.sourceField, "undefined"); // TODO
69        emit.add(Record.applicationField, rec.getValue(Record.applicationField));
70  
71        String[] k = key.getKey().split("/");
72        emit.add(k[1] + "_" + k[2], String.valueOf(bytes));
73        emit.setTime(Long.valueOf(k[3]));
74        output.collect(key, emit);
75  
76      } catch (IOException e) {
77        log.warn("Unable to collect output in SystemMetricsReduceProcessor [" + key + "]", e);
78      }
79    }
80  }