1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux.processor.reducer;
20
21
22 import java.util.HashMap;
23 import org.apache.log4j.Logger;
24
25 public class ReduceProcessorFactory {
26 static Logger log = Logger.getLogger(ReduceProcessorFactory.class);
27
28
29
30
31
32
33
34
35
36
37
38
39 private static HashMap<String, ReduceProcessor> processors = new HashMap<String, ReduceProcessor>();
40
41 private ReduceProcessorFactory() {
42 }
43
44 public static ReduceProcessor getProcessor(String reduceType, String defaultProcessor)
45 throws UnknownReduceTypeException {
46 String path = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer."
47 + reduceType;
48 if (processors.containsKey(reduceType)) {
49 return processors.get(reduceType);
50 } else {
51 ReduceProcessor processor = null;
52 try {
53 processor = (ReduceProcessor) Class.forName(path).getConstructor()
54 .newInstance();
55 } catch (ClassNotFoundException e) {
56
57
58
59 if(defaultProcessor != null) {
60 processor = getProcessor(defaultProcessor, null);
61 }
62 else {
63 processor = getProcessor("IdentityReducer", null);
64 }
65 register(reduceType, processor);
66 return processor;
67 } catch (Exception e) {
68 throw new UnknownReduceTypeException("error constructing processor", e);
69 }
70
71
72
73 register(reduceType, processor);
74 return processor;
75 }
76 }
77
78
79
80
81 public static synchronized void register(String reduceType,
82 ReduceProcessor processor) {
83 log.info("register " + processor.getClass().getName()
84 + " for this recordType :" + reduceType);
85 if (processors.containsKey(reduceType)) {
86 throw new DuplicateReduceProcessorException(
87 "Duplicate processor for recordType:" + reduceType);
88 }
89 ReduceProcessorFactory.processors.put(reduceType, processor);
90 }
91
92 }