View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux;
20  
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.text.SimpleDateFormat;
25  import java.util.ArrayList;
26  import java.util.Date;
27  import java.util.Iterator;
28  import java.util.List;
29  import org.apache.hadoop.chukwa.ChukwaArchiveKey;
30  import org.apache.hadoop.chukwa.ChunkImpl;
31  import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
32  import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
33  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
34  import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
35  import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessorFactory;
36  import org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ReduceProcessor;
37  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
38  import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
39  import org.apache.hadoop.chukwa.util.ExceptionUtil;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.conf.Configured;
42  import org.apache.hadoop.filecache.DistributedCache;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.mapred.FileInputFormat;
47  import org.apache.hadoop.mapred.FileOutputFormat;
48  import org.apache.hadoop.mapred.JobClient;
49  import org.apache.hadoop.mapred.JobConf;
50  import org.apache.hadoop.mapred.JobPriority;
51  import org.apache.hadoop.mapred.MapReduceBase;
52  import org.apache.hadoop.mapred.Mapper;
53  import org.apache.hadoop.mapred.OutputCollector;
54  import org.apache.hadoop.mapred.Reducer;
55  import org.apache.hadoop.mapred.Reporter;
56  import org.apache.hadoop.mapred.SequenceFileInputFormat;
57  import org.apache.hadoop.util.Tool;
58  import org.apache.hadoop.util.ToolRunner;
59  import org.apache.log4j.Logger;
60  
61  public class Demux extends Configured implements Tool {
62    static Logger log = Logger.getLogger(Demux.class);
63    static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
64    public static JobConf jobConf = null;
65  
66    public static class MapClass extends MapReduceBase implements
67        Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
68  
69      @Override
70      public void configure(JobConf jobConf) {
71        super.configure(jobConf);
72        Demux.jobConf= jobConf;
73      }
74  
75      public void map(ChukwaArchiveKey key, ChunkImpl chunk,
76          OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
77          throws IOException {
78  
79        ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
80            "DemuxMapOutput", output, reporter);
81        try {
82          long duration = System.currentTimeMillis();
83          if (log.isDebugEnabled()) {
84            log.debug("Entry: [" + chunk.getData() + "] EventType: ["
85                + chunk.getDataType() + "]");
86          }
87  
88          String defaultProcessor = Demux.jobConf.get(
89              "chukwa.demux.mapper.default.processor",
90              "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
91  
92          String processorClass = Demux.jobConf.get(chunk.getDataType(),
93                  defaultProcessor);
94  
95          if (!processorClass.equalsIgnoreCase("Drop")) {
96            reporter.incrCounter("DemuxMapInput", "total chunks", 1);
97            reporter.incrCounter("DemuxMapInput",
98                chunk.getDataType() + " chunks", 1);
99  
100           MapProcessor processor = MapProcessorFactory
101               .getProcessor(processorClass);
102           processor.process(key, chunk, chukwaOutputCollector, reporter);
103           if (log.isDebugEnabled()) {
104             duration = System.currentTimeMillis() - duration;
105             log.debug("Demux:Map dataType:" + chunk.getDataType()
106                 + " duration:" + duration + " processor:" + processorClass
107                 + " recordCount:" + chunk.getRecordOffsets().length);
108           }
109 
110         } else {
111           log.info("action:Demux, dataType:" + chunk.getDataType()
112               + " duration:0 processor:Drop recordCount:"
113               + chunk.getRecordOffsets().length);
114         }
115 
116       } catch (Exception e) {
117         log.warn("Exception in Demux:MAP", e);
118         e.printStackTrace();
119       }
120     }
121   }
122 
123   public static class ReduceClass extends MapReduceBase implements
124       Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
125 
126     public void configure(JobConf jobConf) {
127       super.configure(jobConf);
128       Demux.jobConf = jobConf;
129     }
130 
131     public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
132         OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
133         throws IOException {
134       ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
135           "DemuxReduceOutput", output, reporter);
136       try {
137         long duration = System.currentTimeMillis();
138         reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
139         reporter.incrCounter("DemuxReduceInput", key.getReduceType()
140             + " total distinct keys", 1);
141 
142         String defaultProcessor = Demux.jobConf.get(
143             "chukwa.demux.reducer.default.processor", null);
144         ReduceProcessor processor = ReduceProcessorFactory.getProcessor(
145             key.getReduceType(), defaultProcessor);
146 
147         processor.process(key, values, chukwaOutputCollector, reporter);
148 
149         if (log.isDebugEnabled()) {
150           duration = System.currentTimeMillis() - duration;
151           log.debug("Demux:Reduce, dataType:" + key.getReduceType()
152               + " duration:" + duration);
153         }
154 
155       } catch (Exception e) {
156         log.warn("Exception in Demux:Reduce", e);
157         e.printStackTrace();
158       }
159     }
160   }
161 
162   static int printUsage() {
163     System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
164     ToolRunner.printGenericCommandUsage(System.out);
165     return -1;
166   }
167 
168   public static void addParsers(Configuration conf) {
169     String parserPath = conf.get("chukwa.data.dir")+File.separator+"demux";
170     try {
171       FileSystem fs = FileSystem.get(new Configuration());
172       FileStatus[] fstatus = fs.listStatus(new Path(parserPath));
173       if(fstatus!=null) {
174         String hdfsUrlPrefix = conf.get("fs.default.name");
175 
176         for(FileStatus parser : fstatus) {
177           Path jarPath = new Path(parser.getPath().toString().replace(hdfsUrlPrefix, ""));
178           log.debug("Adding parser JAR path " + jarPath);
179           DistributedCache.addFileToClassPath(jarPath, conf);
180         }
181       }
182     } catch (IOException e) {
183       log.error(ExceptionUtil.getStackTrace(e));
184     }
185   }
186   
187   public int run(String[] args) throws Exception {
188     JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class);
189     
190 
191     conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
192     conf.setInputFormat(SequenceFileInputFormat.class);
193     conf.setMapperClass(Demux.MapClass.class);
194     conf.setPartitionerClass(ChukwaRecordPartitioner.class);
195     conf.setReducerClass(Demux.ReduceClass.class);
196 
197     conf.setOutputKeyClass(ChukwaRecordKey.class);
198     conf.setOutputValueClass(ChukwaRecord.class);
199     conf.setOutputFormat(ChukwaRecordOutputFormat.class);
200     conf.setJobPriority(JobPriority.VERY_HIGH);
201     addParsers(conf);
202     
203     List<String> other_args = new ArrayList<String>();
204     for (int i = 0; i < args.length; ++i) {
205       try {
206         if ("-m".equals(args[i])) {
207           conf.setNumMapTasks(Integer.parseInt(args[++i]));
208         } else if ("-r".equals(args[i])) {
209           conf.setNumReduceTasks(Integer.parseInt(args[++i]));
210         } else {
211           other_args.add(args[i]);
212         }
213       } catch (NumberFormatException except) {
214         System.out.println("ERROR: Integer expected instead of " + args[i]);
215         return printUsage();
216       } catch (ArrayIndexOutOfBoundsException except) {
217         System.out.println("ERROR: Required parameter missing from "
218             + args[i - 1]);
219         return printUsage();
220       }
221     }
222     // Make sure there are exactly 2 parameters left.
223     if (other_args.size() != 2) {
224       System.out.println("ERROR: Wrong number of parameters: "
225           + other_args.size() + " instead of 2.");
226       return printUsage();
227     }
228 
229     FileInputFormat.setInputPaths(conf, other_args.get(0));
230     FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
231 
232     JobClient.runJob(conf);
233     return 0;
234   }
235 
236   public static void main(String[] args) throws Exception {
237     int res = ToolRunner.run(new Configuration(), new Demux(), args);
238     System.exit(res);
239   }
240 
241 }