View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.archive;
20  
21  
22  import java.text.SimpleDateFormat;
23  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
24  import org.apache.hadoop.chukwa.ChunkImpl;
25  import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
26  import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.mapred.JobConf;
29  import org.apache.hadoop.mapred.RecordWriter;
30  import org.apache.hadoop.util.Progressable;
31  import org.apache.log4j.Logger;
32  
33  public class ChukwaArchiveDataTypeOutputFormat extends
34      MultipleSequenceFileOutputFormat<ChukwaArchiveKey, ChunkImpl> {
35    
36    static final String GROUP_BY_CLUSTER_OPTION_NAME = "archive.groupByClusterName";
37    static Logger log = Logger.getLogger(ChukwaArchiveDataTypeOutputFormat.class);
38    SimpleDateFormat sdf = new SimpleDateFormat("yyyy_MM_dd");
39    boolean useClusterID;
40    
41    public RecordWriter<ChukwaArchiveKey,ChunkImpl>  getRecordWriter(FileSystem fs,
42        JobConf job, String name, Progressable arg3) 
43    throws java.io.IOException{
44  
45      log.info(GROUP_BY_CLUSTER_OPTION_NAME + " is " + job.get(GROUP_BY_CLUSTER_OPTION_NAME));
46      useClusterID = "true".equals(job.get(GROUP_BY_CLUSTER_OPTION_NAME));
47  
48      return super.getRecordWriter(fs, job, name, arg3);
49    }
50    
51    @Override
52    protected String generateFileNameForKeyValue(ChukwaArchiveKey key,
53        ChunkImpl chunk, String name) {
54  
55      if (log.isDebugEnabled()) {
56        log.debug("ChukwaArchiveOutputFormat.fileName: "
57            + sdf.format(key.getTimePartition()));
58      }
59  
60      if(useClusterID) {
61        String clusterID = RecordUtil.getClusterName(chunk);
62        return clusterID + "/" + chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
63        + ".arc";
64      } else
65        return chunk.getDataType() + "_" + sdf.format(key.getTimePartition())
66          + ".arc";
67    }
68  }