1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21 import java.io.IOException;
22 import java.net.URI;
23 import java.net.URISyntaxException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
28
29 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
30 import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory;
31 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
32 import org.apache.hadoop.chukwa.util.DaemonWatcher;
33 import org.apache.hadoop.chukwa.util.ExceptionUtil;
34 import org.apache.hadoop.chukwa.datatrigger.TriggerAction;
35 import org.apache.hadoop.chukwa.datatrigger.TriggerEvent;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.fs.PathFilter;
40 import org.apache.log4j.Logger;
41
42 public class PostProcessorManager implements CHUKWA_CONSTANT{
43 static Logger log = Logger.getLogger(PostProcessorManager.class);
44
45 protected static HashMap<String, String> dataSources = new HashMap<String, String>();
46 public static int errorCount = 0;
47
48 protected int ERROR_SLEEP_TIME = 60;
49 protected ChukwaConfiguration conf = null;
50 protected FileSystem fs = null;
51 protected volatile boolean isRunning = true;
52
53 private static final int DEFAULT_MAX_ERROR_COUNT = 4;
54
55 final private static PathFilter POST_PROCESS_DEMUX_DIR_FILTER = new PathFilter() {
56 public boolean accept(Path file) {
57 return ( file.getName().startsWith("demuxOutputDir") || file.getName().startsWith("pigOutputDir"));
58 }
59 };
60
61
62 public PostProcessorManager() throws Exception {
63 this.conf = new ChukwaConfiguration();
64 init();
65 }
66
67 public PostProcessorManager(ChukwaConfiguration conf) throws Exception {
68 this.conf = conf;
69 init();
70 }
71
72 protected void init() throws IOException, URISyntaxException {
73 String fsName = conf.get(HDFS_DEFAULT_NAME_FIELD);
74 fs = FileSystem.get(new URI(fsName), conf);
75 }
76
77 public static void main(String[] args) throws Exception {
78
79 DaemonWatcher.createInstance("PostProcessorManager");
80
81
82
83 PostProcessorManager postProcessorManager = new PostProcessorManager();
84 postProcessorManager.start();
85 }
86
87 public void shutdown() {
88 this.isRunning = false;
89 }
90
91 public void start() {
92
93 String chukwaRootDir = conf.get(CHUKWA_ROOT_DIR_FIELD, "/chukwa/");
94 if ( ! chukwaRootDir.endsWith("/") ) {
95 chukwaRootDir += "/";
96 }
97 log.info("chukwaRootDir:" + chukwaRootDir);
98
99 String postProcessDir = conf.get(CHUKWA_POST_PROCESS_DIR_FIELD, chukwaRootDir +DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME);
100 if ( ! postProcessDir.endsWith("/") ) {
101 postProcessDir += "/";
102 }
103
104 String chukwaRootReposDir = conf.get(CHUKWA_ROOT_REPOS_DIR_FIELD, chukwaRootDir +DEFAULT_REPOS_DIR_NAME);
105 if ( ! chukwaRootReposDir.endsWith("/") ) {
106 chukwaRootReposDir += "/";
107 }
108
109 String chukwaPostProcessInErrorDir = conf.get(CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD, chukwaRootDir +DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME);
110 if ( ! chukwaPostProcessInErrorDir.endsWith("/") ) {
111 chukwaPostProcessInErrorDir += "/";
112 }
113
114 int maxPermittedErrorCount = conf.getInt(CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD,
115 DEFAULT_MAX_ERROR_COUNT);
116
117
118 dataSources = new HashMap<String, String>();
119 Path postProcessDirectory = new Path(postProcessDir);
120 while (isRunning) {
121
122 if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) {
123 log.warn("==================\nToo many errors (" + errorCount +
124 "), Bail out!\n==================");
125 DaemonWatcher.bailout(-1);
126 }
127
128 try {
129 FileStatus[] demuxOutputDirs = fs.listStatus(postProcessDirectory,POST_PROCESS_DEMUX_DIR_FILTER);
130 List<String> directories = new ArrayList<String>();
131 for (FileStatus fstatus : demuxOutputDirs) {
132 directories.add(fstatus.getPath().getName());
133 }
134
135 if (demuxOutputDirs.length == 0) {
136 try { Thread.sleep(10*1000);} catch(InterruptedException e) {
137 continue;
138 }
139
140 Collections.sort(directories);
141
142 String directoryToBeProcessed = null;
143 long start = 0;
144
145 for(String directory : directories) {
146 directoryToBeProcessed = postProcessDirectory + "/"+ directory;
147
148 log.info("PostProcess Start, directory:" + directoryToBeProcessed);
149 start = System.currentTimeMillis();
150
151 try {
152 if ( processDataLoaders(directoryToBeProcessed) == true) {
153 Path[] destFiles = movetoMainRepository(
154 directoryToBeProcessed,chukwaRootReposDir);
155 if (destFiles != null && destFiles.length > 0) {
156 deleteDirectory(directoryToBeProcessed);
157 log.info("PostProcess Stop, directory:" + directoryToBeProcessed);
158 log.info("processDemuxOutput Duration:" + (System.currentTimeMillis() - start));
159 processPostMoveTriggers(destFiles);
160 continue;
161 }
162 } else {
163 log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed + ". Will try again.");
164 if (errorCount > 3)
165 moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir);
166 else
167 errorCount++;
168 continue;
169
170 }
171
172
173 log.warn("Error in processDemuxOutput for :" + directoryToBeProcessed);
174 moveToInErrorDirectory(directoryToBeProcessed,directory,chukwaPostProcessInErrorDir);
175 } catch (Throwable e) {
176 log.warn("Error in processDemuxOutput:" ,e);
177 }
178 }
179
180 } catch (Throwable e) {
181 errorCount ++;
182 log.warn(e);
183 try { Thread.sleep(ERROR_SLEEP_TIME * 1000); }
184 catch (InterruptedException e1) {
185 }
186 }
187 }
188
189 public boolean processDataLoaders(String directory) throws IOException {
190 long start = System.currentTimeMillis();
191 try {
192 String[] classes = conf.get(POST_DEMUX_DATA_LOADER,"org.apache.hadoop.chukwa.dataloader.MetricDataLoaderPool,org.apache.hadoop.chukwa.dataloader.FSMDataLoader").split(",");
193 for(String dataLoaderName : classes) {
194 Class<? extends DataLoaderFactory> dl = (Class<? extends DataLoaderFactory>) Class.forName(dataLoaderName);
195 java.lang.reflect.Constructor<? extends DataLoaderFactory> c =
196 dl.getConstructor();
197 DataLoaderFactory dataloader = c.newInstance();
198
199
200
201 log.info(dataLoaderName+" processing: "+directory);
202 StringBuilder dirSearch = new StringBuilder();
203 dirSearch.append(directory);
204 dirSearch.append("/*/*/*.evt");
205 Path demuxDir = new Path(dirSearch.toString());
206 FileStatus[] events = fs.globStatus(demuxDir);
207 dataloader.load(conf, fs, events);
208 }
209 } catch(Exception e) {
210 log.error(ExceptionUtil.getStackTrace(e));
211 return false;
212 }
213 log.info("loadData Duration:" + (System.currentTimeMillis() - start));
214 return true;
215 }
216
217 public boolean processPostMoveTriggers(Path[] files) throws IOException {
218 long start = System.currentTimeMillis();
219 try {
220 String actions = conf.get(POST_DEMUX_SUCCESS_ACTION, null);
221 if (actions == null || actions.trim().length() == 0) {
222 return true;
223 }
224 log.info("PostProcess firing postMoveTriggers");
225
226 String[] classes = actions.trim().split(",");
227 for(String actionName : classes) {
228 Class<? extends TriggerAction> actionClass =
229 (Class<? extends TriggerAction>) Class.forName(actionName);
230 java.lang.reflect.Constructor<? extends TriggerAction> c =
231 actionClass.getConstructor();
232 TriggerAction action = c.newInstance();
233
234 log.info(actionName + " handling " + files.length + " events");
235
236
237 FileStatus[] events = fs.listStatus(files);
238 action.execute(conf, fs, events, TriggerEvent.POST_DEMUX_SUCCESS);
239 }
240 } catch(Exception e) {
241 log.error(ExceptionUtil.getStackTrace(e));
242 return false;
243 }
244 log.info("postMoveTriggers Duration:" + (System.currentTimeMillis() - start));
245 return true;
246 }
247
248 public Path[] movetoMainRepository(String sourceDirectory,String repoRootDirectory) throws Exception {
249 long start = System.currentTimeMillis();
250 Path[] destFiles = MoveToRepository.doMove(new Path(sourceDirectory),repoRootDirectory);
251 log.info("movetoMainRepository Duration:" + (System.currentTimeMillis() - start));
252 return destFiles;
253 }
254
255 public boolean moveToInErrorDirectory(String sourceDirectory,String dirName,String inErrorDirectory) throws Exception {
256 Path inErrorDir = new Path(inErrorDirectory);
257 if (!fs.exists(inErrorDir)) {
258 fs.mkdirs(inErrorDir);
259 }
260
261 if (inErrorDirectory.endsWith("/")) {
262 inErrorDirectory += "/";
263 }
264 String finalInErrorDirectory = inErrorDirectory + dirName + "_" + System.currentTimeMillis();
265 fs.rename(new Path(sourceDirectory), new Path(finalInErrorDirectory));
266 log.warn("Error in postProcess :" + sourceDirectory + " has been moved to:" + finalInErrorDirectory);
267 return true;
268 }
269
270 public boolean deleteDirectory(String directory) throws IOException {
271 return fs.delete(new Path(directory), true);
272 }
273
274 }