1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.chukwa.extraction.demux;
20
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.text.SimpleDateFormat;
25 import java.util.ArrayList;
26 import java.util.Calendar;
27 import java.util.List;
28 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
29 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
30 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
31 import org.apache.hadoop.chukwa.util.DaemonWatcher;
32 import org.apache.hadoop.conf.Configured;
33 import org.apache.hadoop.fs.FileStatus;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.FileUtil;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.mapred.FileInputFormat;
38 import org.apache.hadoop.mapred.FileOutputFormat;
39 import org.apache.hadoop.mapred.JobClient;
40 import org.apache.hadoop.mapred.JobConf;
41 import org.apache.hadoop.mapred.JobPriority;
42 import org.apache.hadoop.mapred.SequenceFileInputFormat;
43 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
44 import org.apache.hadoop.mapred.lib.IdentityMapper;
45 import org.apache.hadoop.mapred.lib.IdentityReducer;
46 import org.apache.hadoop.util.Tool;
47 import org.apache.log4j.Logger;
48
49
50 public class HourlyChukwaRecordRolling extends Configured implements Tool {
51 static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
52
53 static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
54 static ChukwaConfiguration conf = null;
55 static FileSystem fs = null;
56 static final String HadoopLogDir = "_logs";
57 static final String hadoopTempDir = "_temporary";
58
59 static boolean rollInSequence = true;
60 static boolean deleteRawdata = false;
61
62 public static void usage() {
63 System.err
64 .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
65 System.exit(-1);
66 }
67
68 public static void buildHourlyFiles(String chukwaMainRepository,
69 String tempDir, String rollingFolder, int workingDay, int workingHour)
70 throws IOException {
71
72 Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
73 + workingHour);
74 FileStatus[] clustersFS = fs.listStatus(hourPath);
75 for (FileStatus clusterFs : clustersFS) {
76 String cluster = clusterFs.getPath().getName();
77
78 Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
79 + workingDay + "/" + workingHour + "/" + cluster);
80 FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
81
82 for (FileStatus dataSourceFS : dataSourcesFS) {
83 String dataSource = dataSourceFS.getPath().getName();
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 public static void main(String[] args) throws Exception {
155 DaemonWatcher.createInstance("HourlyChukwaRecordRolling");
156
157 conf = new ChukwaConfiguration();
158 String fsName = conf.get("writer.hdfs.filesystem");
159 fs = FileSystem.get(new URI(fsName), conf);
160
161
162 String rollingFolder = "/chukwa/rolling/";
163 String chukwaMainRepository = "/chukwa/repos/";
164 String tempDir = "/chukwa/temp/hourlyRolling/";
165
166
167 if (args.length != 4) {
168 usage();
169 }
170
171 if (!args[0].equalsIgnoreCase("rollInSequence")) {
172 usage();
173 }
174
175 if (!args[2].equalsIgnoreCase("deleteRawdata")) {
176 usage();
177 }
178
179 if (args[1].equalsIgnoreCase("true")) {
180 rollInSequence = true;
181 } else {
182 rollInSequence = false;
183 }
184
185 if (args[3].equalsIgnoreCase("true")) {
186 deleteRawdata = true;
187 } else {
188 deleteRawdata = false;
189 }
190
191 Calendar calendar = Calendar.getInstance();
192 int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
193 int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
194 log.info("CurrentDay: " + currentDay);
195 log.info("currentHour" + currentHour);
196
197 Path rootFolder = new Path(rollingFolder + "/hourly/");
198
199 FileStatus[] daysFS = fs.listStatus(rootFolder);
200 for (FileStatus dayFS : daysFS) {
201 try {
202 log.info("dayFs:" + dayFS.getPath().getName());
203 int workingDay = Integer.parseInt(dayFS.getPath().getName());
204
205 Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
206 FileStatus[] hoursFS = fs.listStatus(hourlySrc);
207 for (FileStatus hourFS : hoursFS) {
208 String workinhHourStr = hourFS.getPath().getName();
209 int workingHour = Integer.parseInt(workinhHourStr);
210 if ((workingDay < currentDay) ||
211 ((workingDay == currentDay) && (workingHour < currentHour))
212
213
214
215
216 ) {
217
218 try {
219 buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
220 workingDay, workingHour);
221 } catch(Throwable e) {
222 e.printStackTrace();
223 log.warn("Hourly rolling failed on :" + rollingFolder +"/" + workingDay +"/" + workingHour ) ;
224 }
225
226 }
227
228 }
229 }
230
231 catch (NumberFormatException e) {
232 log.warn("Exception in hourlyRolling:", e);
233 }
234
235 }
236 }
237
238 public int run(String[] args) throws Exception {
239 JobConf conf = new JobConf(new ChukwaConfiguration(), HourlyChukwaRecordRolling.class);
240
241 conf.setJobName("HourlyChukwa-Rolling");
242 conf.setInputFormat(SequenceFileInputFormat.class);
243
244 conf.setMapperClass(IdentityMapper.class);
245 conf.setReducerClass(IdentityReducer.class);
246
247 conf.setOutputKeyClass(ChukwaRecordKey.class);
248 conf.setOutputValueClass(ChukwaRecord.class);
249 conf.setOutputFormat(SequenceFileOutputFormat.class);
250
251 log.info("HourlyChukwaRecordRolling input: " + args[0]);
252 log.info("HourlyChukwaRecordRolling output: " + args[1]);
253
254 FileInputFormat.setInputPaths(conf, args[0]);
255 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
256 conf.setJobPriority(JobPriority.LOW);
257 conf.setNumReduceTasks(1);
258
259 JobClient.runJob(conf);
260 return 0;
261 }
262
263 }