001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.pig.Main; 021 import org.apache.pig.PigRunner; 022 import org.apache.pig.tools.pigstats.JobStats; 023 import org.apache.pig.tools.pigstats.PigStats; 024 import org.apache.hadoop.conf.Configuration; 025 import org.apache.hadoop.fs.Path; 026 027 import java.io.BufferedWriter; 028 import java.io.FileNotFoundException; 029 import java.io.FileWriter; 030 import java.io.OutputStream; 031 import java.io.FileOutputStream; 032 import java.io.BufferedReader; 033 import java.io.FileReader; 034 import java.io.File; 035 import java.io.IOException; 036 import java.util.Arrays; 037 import java.util.HashSet; 038 import java.util.Map; 039 import java.util.List; 040 import java.util.ArrayList; 041 import java.util.Properties; 042 import java.util.Set; 043 import java.net.URL; 044 import java.util.regex.Pattern; 045 046 public class PigMain extends LauncherMain { 047 private static final Set<String> DISALLOWED_PIG_OPTIONS = new HashSet<String>(); 048 public static final String ACTION_PREFIX = "oozie.action."; 049 public static final String EXTERNAL_CHILD_IDS = ACTION_PREFIX + "externalChildIDs.properties"; 050 public static final String EXTERNAL_ACTION_STATS = ACTION_PREFIX + "stats.properties"; 051 public static final String EXTERNAL_STATS_WRITE = ACTION_PREFIX + "external.stats.write"; 052 public static final int STRING_BUFFER_SIZE = 100; 053 054 private static final Pattern[] PIG_JOB_IDS_PATTERNS = { 055 Pattern.compile("HadoopJobId: (job_\\S*)") 056 }; 057 058 static { 059 DISALLOWED_PIG_OPTIONS.add("-4"); 060 DISALLOWED_PIG_OPTIONS.add("-log4jconf"); 061 DISALLOWED_PIG_OPTIONS.add("-e"); 062 DISALLOWED_PIG_OPTIONS.add("-execute"); 063 DISALLOWED_PIG_OPTIONS.add("-f"); 064 DISALLOWED_PIG_OPTIONS.add("-file"); 065 DISALLOWED_PIG_OPTIONS.add("-l"); 066 DISALLOWED_PIG_OPTIONS.add("-logfile"); 067 DISALLOWED_PIG_OPTIONS.add("-r"); 068 DISALLOWED_PIG_OPTIONS.add("-dryrun"); 069 DISALLOWED_PIG_OPTIONS.add("-x"); 070 DISALLOWED_PIG_OPTIONS.add("-exectype"); 071 DISALLOWED_PIG_OPTIONS.add("-P"); 072 DISALLOWED_PIG_OPTIONS.add("-propertyFile"); 073 } 074 075 public static void main(String[] args) throws Exception { 076 run(PigMain.class, args); 077 } 078 079 @Override 080 protected void run(String[] args) throws Exception { 081 System.out.println(); 082 System.out.println("Oozie Pig action configuration"); 083 System.out.println("================================================================="); 084 085 // loading action conf prepared by Oozie 086 Configuration actionConf = new Configuration(false); 087 088 String actionXml = System.getProperty("oozie.action.conf.xml"); 089 090 if (actionXml == null) { 091 throw new RuntimeException("Missing Java System Property [oozie.action.conf.xml]"); 092 } 093 if (!new File(actionXml).exists()) { 094 throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); 095 } 096 097 actionConf.addResource(new Path("file:///", actionXml)); 098 099 Properties pigProperties = new Properties(); 100 for (Map.Entry<String, String> entry : actionConf) { 101 pigProperties.setProperty(entry.getKey(), entry.getValue()); 102 } 103 104 // propagate delegation related props from launcher job to Pig job 105 if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { 106 pigProperties.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 107 System.out.println("------------------------"); 108 System.out.println("Setting env property for mapreduce.job.credentials.binary to:" 109 + System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 110 System.out.println("------------------------"); 111 System.setProperty("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); 112 } 113 else { 114 System.out.println("Non-kerberoes execution"); 115 } 116 117 OutputStream os = new FileOutputStream("pig.properties"); 118 pigProperties.store(os, ""); 119 os.close(); 120 121 logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet()); 122 123 List<String> arguments = new ArrayList<String>(); 124 String script = actionConf.get("oozie.pig.script"); 125 126 if (script == null) { 127 throw new RuntimeException("Action Configuration does not have [oozie.pig.script] property"); 128 } 129 130 if (!new File(script).exists()) { 131 throw new RuntimeException("Error: Pig script file [" + script + "] does not exist"); 132 } 133 134 System.out.println("Pig script [" + script + "] content: "); 135 System.out.println("------------------------"); 136 BufferedReader br = new BufferedReader(new FileReader(script)); 137 String line = br.readLine(); 138 while (line != null) { 139 System.out.println(line); 140 line = br.readLine(); 141 } 142 br.close(); 143 System.out.println("------------------------"); 144 System.out.println(); 145 146 arguments.add("-file"); 147 arguments.add(script); 148 String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params"); 149 for (String param : params) { 150 arguments.add("-param"); 151 arguments.add(param); 152 } 153 154 String hadoopJobId = System.getProperty("oozie.launcher.job.id"); 155 if (hadoopJobId == null) { 156 throw new RuntimeException("Launcher Hadoop Job ID system property not set"); 157 } 158 159 String logFile = new File("pig-oozie-" + hadoopJobId + ".log").getAbsolutePath(); 160 161 URL log4jFile = Thread.currentThread().getContextClassLoader().getResource("log4j.properties"); 162 if (log4jFile != null) { 163 164 String pigLogLevel = actionConf.get("oozie.pig.log.level", "INFO"); 165 166 // append required PIG properties to the default hadoop log4j file 167 Properties hadoopProps = new Properties(); 168 hadoopProps.load(log4jFile.openStream()); 169 hadoopProps.setProperty("log4j.rootLogger", pigLogLevel + ", A, B"); 170 hadoopProps.setProperty("log4j.logger.org.apache.pig", pigLogLevel + ", A, B"); 171 hadoopProps.setProperty("log4j.appender.A", "org.apache.log4j.ConsoleAppender"); 172 hadoopProps.setProperty("log4j.appender.A.layout", "org.apache.log4j.PatternLayout"); 173 hadoopProps.setProperty("log4j.appender.A.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n"); 174 hadoopProps.setProperty("log4j.appender.B", "org.apache.log4j.FileAppender"); 175 hadoopProps.setProperty("log4j.appender.B.file", logFile); 176 hadoopProps.setProperty("log4j.appender.B.layout", "org.apache.log4j.PatternLayout"); 177 hadoopProps.setProperty("log4j.appender.B.layout.ConversionPattern", "%d [%t] %-5p %c %x - %m%n"); 178 179 String localProps = new File("piglog4j.properties").getAbsolutePath(); 180 OutputStream os1 = new FileOutputStream(localProps); 181 hadoopProps.store(os1, ""); 182 os1.close(); 183 184 arguments.add("-log4jconf"); 185 arguments.add(localProps); 186 187 // print out current directory 188 File localDir = new File(localProps).getParentFile(); 189 System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); 190 } 191 else { 192 System.out.println("log4jfile is null"); 193 } 194 195 String pigLog = "pig-" + hadoopJobId + ".log"; 196 arguments.add("-logfile"); 197 arguments.add(pigLog); 198 199 String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args"); 200 for (String pigArg : pigArgs) { 201 if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) { 202 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported"); 203 } 204 arguments.add(pigArg); 205 } 206 207 System.out.println("Pig command arguments :"); 208 for (String arg : arguments) { 209 System.out.println(" " + arg); 210 } 211 212 System.out.println("================================================================="); 213 System.out.println(); 214 System.out.println(">>> Invoking Pig command line now >>>"); 215 System.out.println(); 216 System.out.flush(); 217 218 System.out.println(); 219 runPigJob(new String[] { "-version" }, null, true, false); 220 System.out.println(); 221 System.out.flush(); 222 boolean hasStats = Boolean.parseBoolean(actionConf.get(EXTERNAL_STATS_WRITE)); 223 runPigJob(arguments.toArray(new String[arguments.size()]), pigLog, false, hasStats); 224 225 System.out.println(); 226 System.out.println("<<< Invocation of Pig command completed <<<"); 227 System.out.println(); 228 229 // For embedded python or for version of pig lower than 0.8, pig stats are not supported. 230 // So retrieving hadoop Ids here 231 File file = new File(System.getProperty(EXTERNAL_CHILD_IDS)); 232 if (!file.exists()) { 233 Properties props = getHadoopJobIds(logFile, PIG_JOB_IDS_PATTERNS); 234 writeExternalData(props.getProperty(HADOOP_JOBS), file); 235 System.out.println(" Hadoop Job IDs executed by Pig: " + props.getProperty(HADOOP_JOBS)); 236 System.out.println(); 237 } 238 } 239 240 241 242 private void handleError(String pigLog) throws Exception { 243 System.err.println(); 244 System.err.println("Pig logfile dump:"); 245 System.err.println(); 246 try { 247 BufferedReader reader = new BufferedReader(new FileReader(pigLog)); 248 String line = reader.readLine(); 249 while (line != null) { 250 System.err.println(line); 251 line = reader.readLine(); 252 } 253 reader.close(); 254 } 255 catch (FileNotFoundException e) { 256 System.err.println("pig log file: " + pigLog + " not found."); 257 } 258 } 259 260 /** 261 * Runs the pig script using PigRunner API if version 0.8 or above. Embedded 262 * pig within python is also supported. 263 * 264 * @param args pig command line arguments 265 * @param pigLog pig log file 266 * @param resetSecurityManager specify if need to reset security manager 267 * @param retrieveStats specify if stats are to be retrieved 268 * @throws Exception 269 */ 270 protected void runPigJob(String[] args, String pigLog, boolean resetSecurityManager, boolean retrieveStats) throws Exception { 271 // running as from the command line 272 boolean pigRunnerExists = true; 273 Class klass; 274 try { 275 klass = Class.forName("org.apache.pig.PigRunner"); 276 } 277 catch (ClassNotFoundException ex) { 278 pigRunnerExists = false; 279 } 280 281 if (pigRunnerExists) { 282 System.out.println("Run pig script using PigRunner.run() for Pig version 0.8+"); 283 PigStats stats = PigRunner.run(args, null); 284 // isSuccessful is the API from 0.9 supported by both PigStats and 285 // EmbeddedPigStats 286 if (!stats.isSuccessful()) { 287 if (pigLog != null) { 288 handleError(pigLog); 289 } 290 throw new LauncherMainException(PigRunner.ReturnCode.FAILURE); 291 } 292 else { 293 // If pig command is ran with just the "version" option, then 294 // return 295 if (resetSecurityManager) { 296 return; 297 } 298 String jobIds = getHadoopJobIds(stats); 299 if (jobIds != null) { 300 System.out.println(" Hadoop Job IDs executed by Pig: " + jobIds); 301 File f = new File(System.getProperty(EXTERNAL_CHILD_IDS)); 302 writeExternalData(jobIds, f); 303 } 304 // Retrieve stats only if user has specified in workflow 305 // configuration 306 if (retrieveStats) { 307 ActionStats pigStats; 308 String JSONString; 309 try { 310 pigStats = new OoziePigStats(stats); 311 JSONString = pigStats.toJSON(); 312 } catch (UnsupportedOperationException uoe) { 313 throw new UnsupportedOperationException( 314 "Pig stats are not supported for this type of operation", uoe); 315 } 316 File f = new File(System.getProperty(EXTERNAL_ACTION_STATS)); 317 writeExternalData(JSONString, f); 318 } 319 } 320 } 321 else { 322 try { 323 System.out.println("Run pig script using Main.main() for Pig version before 0.8"); 324 Main.main(args); 325 } 326 catch (SecurityException ex) { 327 if (resetSecurityManager) { 328 LauncherSecurityManager.reset(); 329 } 330 else { 331 if (LauncherSecurityManager.getExitInvoked()) { 332 if (LauncherSecurityManager.getExitCode() != 0) { 333 if (pigLog != null) { 334 handleError(pigLog); 335 } 336 throw ex; 337 } 338 } 339 } 340 } 341 } 342 } 343 344 // write external data(stats, hadoopIds) to the file which will be read by the LauncherMapper 345 private static void writeExternalData(String data, File f) throws IOException { 346 BufferedWriter out = null; 347 try { 348 out = new BufferedWriter(new FileWriter(f)); 349 out.write(data); 350 } 351 finally { 352 if (out != null) { 353 out.close(); 354 } 355 } 356 } 357 358 public static void setPigScript(Configuration conf, String script, String[] params, String[] args) { 359 conf.set("oozie.pig.script", script); 360 MapReduceMain.setStrings(conf, "oozie.pig.params", params); 361 MapReduceMain.setStrings(conf, "oozie.pig.args", args); 362 } 363 364 /** 365 * Get Hadoop Ids through PigStats API 366 * 367 * @param pigStats stats object obtained through PigStats API 368 * @return comma-separated String 369 */ 370 protected String getHadoopJobIds(PigStats pigStats) { 371 StringBuilder sb = new StringBuilder(STRING_BUFFER_SIZE); 372 String separator = ","; 373 // Collect Hadoop Ids through JobGraph API of Pig and store them as 374 // comma separated string 375 try { 376 PigStats.JobGraph jobGraph = pigStats.getJobGraph(); 377 for (JobStats jobStats : jobGraph) { 378 String hadoopJobId = jobStats.getJobId(); 379 if (sb.length() > 0) { 380 sb.append(separator); 381 } 382 sb.append(hadoopJobId); 383 } 384 } 385 // Return null if Pig API's are not supported 386 catch (UnsupportedOperationException uoe) { 387 return null; 388 } 389 return sb.toString(); 390 } 391 392 }