1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Calendar;
27 import java.util.List;
28 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31 import org.apache.hadoop.chukwa.util.DaemonWatcher;
32 import org.apache.hadoop.chukwa.util.ExceptionUtil;
33 import org.apache.hadoop.conf.Configured;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.FileUtil;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.mapred.FileInputFormat;
39 import org.apache.hadoop.mapred.FileOutputFormat;
40 import org.apache.hadoop.mapred.JobClient;
41 import org.apache.hadoop.mapred.JobConf;
42 import org.apache.hadoop.mapred.JobPriority;
43 import org.apache.hadoop.mapred.SequenceFileInputFormat;
44 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
45 import org.apache.hadoop.mapred.lib.IdentityMapper;
46 import org.apache.hadoop.mapred.lib.IdentityReducer;
47 import org.apache.hadoop.util.Tool;
48 import org.apache.log4j.Logger;
49
50
51 public class DailyChukwaRecordRolling extends Configured implements Tool {
52 static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
53
54 static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
55 static ChukwaConfiguration conf = null;
56 static FileSystem fs = null;
57 static final String HadoopLogDir = "_logs";
58 static final String hadoopTempDir = "_temporary";
59
60 static boolean rollInSequence = true;
61 static boolean deleteRawdata = false;
62
63 public static void usage() {
64 System.err
65 .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
66 System.exit(-1);
67 }
68
69 public static boolean hourlyRolling(String dailyStreamDirectory) {
70
71 Path pHour = null;
72 try {
73 log.info("Checking for HourlyRolling in " + dailyStreamDirectory);
74
75 for (int i=0;i<24;i++) {
76 pHour = new Path(dailyStreamDirectory + "/" + i);
77 if (! fs.exists(pHour)) {
78 log.info("HourlyData is missing for:" + pHour);
79 continue;
80 } else {
81 FileStatus[] files = fs.listStatus(pHour);
82 boolean containsHourly = false;
83 for(FileStatus file: files) {
84 log.info("Debug checking" + file.getPath());
85 if (file.getPath().getName().indexOf("_HourlyDone_") > 0) {
86 containsHourly = true;
87 break;
88 }
89 }
90 if (containsHourly == false) {
91 log.info("HourlyDone is missing for : " + pHour);
92 return false;
93 }
94 }
95 }
96 return true;
97 }catch(Exception e) {
98 e.printStackTrace();
99 return false;
100 }
101 }
102 public static void buildDailyFiles(String chukwaMainRepository,
103 String tempDir, String rollingFolder, int workingDay) throws IOException {
104
105
106 boolean alldone = true;
107
108 Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
109 FileStatus[] clustersFS = fs.listStatus(dayPath);
110 for (FileStatus clusterFs : clustersFS) {
111 String cluster = clusterFs.getPath().getName();
112
113 Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
114 + workingDay + "/" + cluster);
115 FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
116 for (FileStatus dataSourceFS : dataSourcesFS) {
117 String dataSource = dataSourceFS.getPath().getName();
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204 public static void main(String[] args) throws Exception {
205
206 DaemonWatcher.createInstance("DailyChukwaRecordRolling");
207
208 conf = new ChukwaConfiguration();
209 String fsName = conf.get("writer.hdfs.filesystem");
210 fs = FileSystem.get(new URI(fsName), conf);
211
212
213 String rollingFolder = "/chukwa/rolling/";
214 String chukwaMainRepository = "/chukwa/repos/";
215 String tempDir = "/chukwa/temp/dailyRolling/";
216
217
218 if (args.length != 4) {
219 usage();
220 }
221
222 if (!args[0].equalsIgnoreCase("rollInSequence")) {
223 usage();
224 }
225
226 if (!args[2].equalsIgnoreCase("deleteRawdata")) {
227 usage();
228 }
229
230 if (args[1].equalsIgnoreCase("true")) {
231 rollInSequence = true;
232 } else {
233 rollInSequence = false;
234 }
235
236 if (args[3].equalsIgnoreCase("true")) {
237 deleteRawdata = true;
238 } else {
239 deleteRawdata = false;
240 }
241
242 log.info("rollInSequence: " + rollInSequence);
243 log.info("deleteRawdata: " + deleteRawdata);
244
245 Calendar calendar = Calendar.getInstance();
246 int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
247 int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
248 log.info("CurrentDay: " + currentDay);
249 log.info("currentHour" + currentHour);
250
251 Path rootFolder = new Path(rollingFolder + "/daily/");
252
253 FileStatus[] daysFS = fs.listStatus(rootFolder);
254 for (FileStatus dayFS : daysFS) {
255 try {
256 int workingDay = Integer.parseInt(dayFS.getPath().getName());
257 log.info("Daily working on :" + workingDay);
258 if (workingDay < currentDay) {
259
260 try {
261 buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
262 workingDay);
263 } catch(Throwable e) {
264 e.printStackTrace();
265 log.warn("Daily rolling failed on :" + rollingFolder +"/" + workingDay ) ;
266 }
267
268 }
269 }
270
271 catch (NumberFormatException e) {
272 log.debug(ExceptionUtil.getStackTrace(e));
273 }
274
275 }
276 }
277
278 public int run(String[] args) throws Exception {
279 JobConf conf = new JobConf(new ChukwaConfiguration(), DailyChukwaRecordRolling.class);
280
281 conf.setJobName("DailyChukwa-Rolling");
282 conf.setInputFormat(SequenceFileInputFormat.class);
283
284 conf.setMapperClass(IdentityMapper.class);
285 conf.setReducerClass(IdentityReducer.class);
286
287 conf.setOutputKeyClass(ChukwaRecordKey.class);
288 conf.setOutputValueClass(ChukwaRecord.class);
289 conf.setOutputFormat(SequenceFileOutputFormat.class);
290
291 log.info("DailyChukwaRecordRolling input: " + args[0]);
292 log.info("DailyChukwaRecordRolling output: " + args[1]);
293
294 FileInputFormat.setInputPaths(conf, args[0]);
295 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
296 conf.setJobPriority(JobPriority.LOW);
297 conf.setNumReduceTasks(1);
298 JobClient.runJob(conf);
299 return 0;
300 }
301
302 }