View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
20  
21  
22  import java.util.HashMap;
23  import org.apache.log4j.Logger;
24  
25  public class ProcessorFactory {
26    static Logger log = Logger.getLogger(ProcessorFactory.class);
27  
28    // TODO
29    // add new mapper package at the end.
30    // We should have a more generic way to do this.
31    // Ex: read from config
32    // list of alias
33    // and
34    // alias -> processor class
35  
36    private static HashMap<String, ChunkProcessor> processors = new HashMap<String, ChunkProcessor>(); // registry
37  
38    private ProcessorFactory() {
39    }
40  
41    public static ChunkProcessor getProcessor(String recordType)
42        throws UnknownRecordTypeException {
43      String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper"
44          + recordType;
45      if (processors.containsKey(recordType)) {
46        return processors.get(recordType);
47      } else {
48        ChunkProcessor processor = null;
49        try {
50          processor = (ChunkProcessor) Class.forName(path).getConstructor()
51              .newInstance();
52        } catch (ClassNotFoundException e) {
53          throw new UnknownRecordTypeException(
54              "Unknown recordType:" + recordType, e);
55        } catch (Exception e) {
56          throw new UnknownRecordTypeException("error constructing processor", e);
57        }
58  
59        // TODO using a ThreadSafe/reuse flag to actually decide if we want
60        // to reuse the same processor again and again
61        register(recordType, processor);
62        return processor;
63      }
64    }
65  
66    /**
67     * Register a specific parser for a {@link ChunkProcessor} implementation.
68     */
69    public static synchronized void register(String recordType,
70        ChunkProcessor processor) {
71      log.info("register " + processor.getClass().getName()
72          + " for this recordType :" + recordType);
73      if (processors.containsKey(recordType)) {
74        throw new DuplicateProcessorException(
75            "Duplicate processor for recordType:" + recordType);
76      }
77      ProcessorFactory.processors.put(recordType, processor);
78    }
79  
80  }