View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.util.Calendar;
23  import org.apache.hadoop.chukwa.Chunk;
24  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26  import org.apache.hadoop.chukwa.extraction.engine.Record;
27  import org.apache.hadoop.chukwa.util.ExceptionUtil;
28  import org.apache.hadoop.io.DataOutputBuffer;
29  import org.apache.hadoop.mapred.OutputCollector;
30  import org.apache.hadoop.mapred.Reporter;
31  import org.apache.log4j.Logger;
32  
33  public class ChunkSaver {
34    static Logger log = Logger.getLogger(ChunkSaver.class);
35  
36    public static ChukwaRecord saveChunk(Chunk chunk, Throwable throwable,
37        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
38      try {
39        reporter.incrCounter("DemuxError", "count", 1);
40        reporter.incrCounter("DemuxError", chunk.getDataType() + "Count", 1);
41  
42        ChukwaRecord record = new ChukwaRecord();
43        long ts = System.currentTimeMillis();
44        Calendar calendar = Calendar.getInstance();
45        calendar.setTimeInMillis(ts);
46        calendar.set(Calendar.MINUTE, 0);
47        calendar.set(Calendar.SECOND, 0);
48        calendar.set(Calendar.MILLISECOND, 0);
49        ChukwaRecordKey key = new ChukwaRecordKey();
50        key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getDataType()
51            + "/" + chunk.getSource() + "/" + ts);
52        key.setReduceType(chunk.getDataType() + "InError");
53  
54        record.setTime(ts);
55  
56        record.add(Record.tagsField, chunk.getTags());
57        record.add(Record.sourceField, chunk.getSource());
58        record.add(Record.applicationField, chunk.getStreamName());
59  
60        DataOutputBuffer ob = new DataOutputBuffer(chunk
61            .getSerializedSizeEstimate());
62        chunk.write(ob);
63        record.add(Record.chunkDataField, new String(ob.getData()));
64        record.add(Record.chunkExceptionField, ExceptionUtil
65            .getStackTrace(throwable));
66        output.collect(key, record);
67  
68        return record;
69      } catch (Throwable e) {
70        e.printStackTrace();
71        try {
72          log.warn("Unable to save a chunk: tags: " + chunk.getTags()
73              + " - source:" + chunk.getSource() + " - dataType: "
74              + chunk.getDataType() + " - Stream: " + chunk.getStreamName()
75              + " - SeqId: " + chunk.getSeqID() + " - Data: "
76              + new String(chunk.getData()));
77        } catch (Throwable e1) {
78          e.printStackTrace();
79        }
80      }
81      return null;
82    }
83  
84  }