1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.datacollection.writer;
20
21
22 import java.util.List;
23 import org.apache.hadoop.chukwa.Chunk;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.log4j.Logger;
26
27
28
29
30
31
32
33 public class PipelineStageWriter implements ChukwaWriter {
34 Logger log = Logger.getLogger(PipelineStageWriter.class);
35
36 ChukwaWriter writer;
37
38 @Override
39 public CommitStatus add(List<Chunk> chunks) throws WriterException {
40 return writer.add(chunks);
41 }
42
43 @Override
44 public void close() throws WriterException {
45 writer.close();
46 }
47
48 @Override
49 public void init(Configuration conf) throws WriterException {
50 if (conf.get("chukwaCollector.pipeline") != null) {
51 String pipeline = conf.get("chukwaCollector.pipeline");
52 try {
53 String[] classes = pipeline.split(",");
54 log.info("using pipelined writers, pipe length is " + classes.length);
55 PipelineableWriter lastWriter = null;
56 if (classes.length > 1) {
57 lastWriter = (PipelineableWriter) conf.getClassByName(classes[0])
58 .newInstance();
59 lastWriter.init(conf);
60 writer = lastWriter;
61 }
62
63 for (int i = 1; i < classes.length - 1; ++i) {
64 Class<?> stageClass = conf.getClassByName(classes[i]);
65 Object st = stageClass.newInstance();
66 if (!(st instanceof PipelineableWriter))
67 log.error("class " + classes[i]
68 + " in processing pipeline isn't a pipeline stage");
69
70 PipelineableWriter stage = (PipelineableWriter) stageClass
71 .newInstance();
72 stage.init(conf);
73
74
75
76 lastWriter.setNextStage(stage);
77 lastWriter = stage;
78 }
79 Class<?> stageClass = conf.getClassByName(classes[classes.length - 1]);
80 Object st = stageClass.newInstance();
81
82 if (!(st instanceof ChukwaWriter)) {
83 log.error("class " + classes[classes.length - 1]
84 + " at end of processing pipeline isn't a ChukwaWriter");
85 throw new WriterException("bad pipeline");
86 } else {
87 ((ChukwaWriter)st).init(conf);
88 if (lastWriter != null)
89 lastWriter.setNextStage((ChukwaWriter) st);
90 else
91 writer = (ChukwaWriter) st;
92 }
93 return;
94 } catch (Exception e) {
95
96 log.error("failed to set up pipeline, defaulting to SeqFileWriter", e);
97
98 throw new WriterException("bad pipeline");
99 }
100 } else {
101 throw new WriterException("must set chukwaCollector.pipeline");
102 }
103 }
104
105 }