1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.archive;
20
21
22 import java.text.SimpleDateFormat;
23 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
24 import org.apache.hadoop.chukwa.ChunkImpl;
25 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
26 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.mapred.JobConf;
29 import org.apache.hadoop.mapred.RecordWriter;
30 import org.apache.hadoop.util.Progressable;
31 import org.apache.log4j.Logger;
32
33 public class ChukwaArchiveDataTypeOutputFormat extends
34 MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
35
36 static final String GROUP_BY_CLUSTER_OPTION_NAME = "archive.groupByClusterName";
37 static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
38 SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
39 boolean useClusterID;
40
41 public RecordWriter<ChukwaArchiveKey,ChunkImpl> getRecordWriter(FileSystem fs,
42 JobConf job, String name, Progressable arg3)
43 throws java.io.IOException{
44
45 log.info(GROUP_BY_CLUSTER_OPTION_NAME + " is " + job.get(GROUP_BY_CLUSTER_OPTION_NAME));
46 useClusterID = "true".equals(job.get(GROUP_BY_CLUSTER_OPTION_NAME));
47
48 return super.getRecordWriter(fs, job, name, arg3);
49 }
50
51 @Override
52 protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
53 ChunkImpl chunk, String name) {
54
55 if (log.isDebugEnabled()) {
56 log.debug("ChukwaArchiveOutputFormat.fileName: "
57 + sdf.format(key.getTimePartition()));
58 }
59
60 if(useClusterID) {
61 String clusterID = RecordUtil.getClusterName(chunk);
62 return clusterID + "/" + chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
63 + ".arc";
64 } else
65 return chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
66 + ".arc";
67 }
68 }