1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20
21
22 import java.util.Calendar;
23 import org.apache.hadoop.chukwa.Chunk;
24 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
25 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
26 import org.apache.hadoop.chukwa.extraction.engine.Record;
27 import org.apache.hadoop.chukwa.util.ExceptionUtil;
28 import org.apache.hadoop.io.DataOutputBuffer;
29 import org.apache.hadoop.mapred.OutputCollector;
30 import org.apache.hadoop.mapred.Reporter;
31 import org.apache.log4j.Logger;
32
33 public class ChunkSaver {
34 static Logger log = Logger.getLogger(ChunkSaver.class);
35
36 public static ChukwaRecord saveChunk(Chunk chunk, Throwable throwable,
37 OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
38 try {
39 reporter.incrCounter("DemuxError", "count", 1);
40 reporter.incrCounter("DemuxError", chunk.getDataType() + "Count", 1);
41
42 ChukwaRecord record = new ChukwaRecord();
43 long ts = System.currentTimeMillis();
44 Calendar calendar = Calendar.getInstance();
45 calendar.setTimeInMillis(ts);
46 calendar.set(Calendar.MINUTE, 0);
47 calendar.set(Calendar.SECOND, 0);
48 calendar.set(Calendar.MILLISECOND, 0);
49 ChukwaRecordKey key = new ChukwaRecordKey();
50 key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getDataType()
51 + "/" + chunk.getSource() + "/" + ts);
52 key.setReduceType(chunk.getDataType() + "InError");
53
54 record.setTime(ts);
55
56 record.add(Record.tagsField, chunk.getTags());
57 record.add(Record.sourceField, chunk.getSource());
58 record.add(Record.applicationField, chunk.getStreamName());
59
60 DataOutputBuffer ob = new DataOutputBuffer(chunk
61 .getSerializedSizeEstimate());
62 chunk.write(ob);
63 record.add(Record.chunkDataField, new String(ob.getData()));
64 record.add(Record.chunkExceptionField, ExceptionUtil
65 .getStackTrace(throwable));
66 output.collect(key, record);
67
68 return record;
69 } catch (Throwable e) {
70 e.printStackTrace();
71 try {
72 log.warn("Unable to save a chunk: tags: " + chunk.getTags()
73 + " - source:" + chunk.getSource() + " - dataType: "
74 + chunk.getDataType() + " - Stream: " + chunk.getStreamName()
75 + " - SeqId: " + chunk.getSeqID() + " - Data: "
76 + new String(chunk.getData()));
77 } catch (Throwable e1) {
78 e.printStackTrace();
79 }
80 }
81 return null;
82 }
83
84 }