1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.archive;
20
21
22 import java.text.SimpleDateFormat;
23 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
24 import org.apache.hadoop.chukwa.ChunkImpl;
25 import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
26 import org.apache.hadoop.mapred.JobConf;
27 import org.apache.hadoop.mapred.Partitioner;
28
29 public class ChukwaArchiveDataTypePartitioner<K, V> implements
30 Partitioner<ChukwaArchiveKey, ChunkImpl> {
31 SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
32
33 boolean useClusterID = false;
34 public void configure(JobConf conf) {
35 useClusterID = "true".equals(conf.get(ChukwaArchiveDataTypeOutputFormat.
36 GROUP_BY_CLUSTER_OPTION_NAME));
37 }
38
39 public int getPartition(ChukwaArchiveKey key, ChunkImpl chunk,
40 int numReduceTasks) {
41
42 if(useClusterID) {
43 String clusterID = RecordUtil.getClusterName(chunk);
44 return ((chunk.getDataType() + "_" + clusterID + "_" + sdf.format(key.getTimePartition()))
45 .hashCode() & Integer.MAX_VALUE)
46 % numReduceTasks;
47 } else {
48 return ((chunk.getDataType() + "_" + sdf.format(key.getTimePartition()))
49 .hashCode() & Integer.MAX_VALUE)
50 % numReduceTasks;
51 }
52 }
53
54 }