001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action; 020 021import org.apache.commons.lang.StringUtils; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.oozie.client.WorkflowAction; 026import org.apache.oozie.client.WorkflowJob; 027import org.apache.oozie.service.ConfigurationService; 028import org.apache.oozie.util.ELEvaluator; 029import org.apache.oozie.util.ParamChecker; 030import org.apache.oozie.util.XLog; 031import org.apache.oozie.service.HadoopAccessorException; 032import org.apache.oozie.service.Services; 033 034import java.io.ByteArrayOutputStream; 035import java.io.IOException; 036import java.io.PrintStream; 037import java.net.URISyntaxException; 038import java.util.HashMap; 039import java.util.Map; 040import java.util.Properties; 041import java.util.LinkedHashMap; 042 043/** 044 * Base action executor class. <p/> All the action executors should extend this class. 045 */ 046public abstract class ActionExecutor { 047 048 /** 049 * Configuration prefix for action executor (sub-classes) properties. 050 */ 051 public static final String CONF_PREFIX = "oozie.action."; 052 053 public static final String MAX_RETRIES = CONF_PREFIX + "retries.max"; 054 055 public static final String ACTION_RETRY_INTERVAL = CONF_PREFIX + "retry.interval"; 056 057 public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy"; 058 059 /** 060 * Error code used by {@link #convertException} when there is not register error information for an exception. 061 */ 062 public static final String ERROR_OTHER = "OTHER"; 063 064 public boolean requiresNNJT = false; 065 066 public static enum RETRYPOLICY { 067 EXPONENTIAL, PERIODIC 068 } 069 070 private static class ErrorInfo { 071 ActionExecutorException.ErrorType errorType; 072 String errorCode; 073 Class<?> errorClass; 074 075 private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) { 076 this.errorType = errorType; 077 this.errorCode = errorCode; 078 this.errorClass = errorClass; 079 } 080 } 081 082 private static boolean initMode = false; 083 private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>(); 084 085 /** 086 * Context information passed to the ActionExecutor methods. 087 */ 088 public interface Context { 089 090 /** 091 * Create the callback URL for the action. 092 * 093 * @param externalStatusVar variable for the caller to inject the external status. 094 * @return the callback URL. 095 */ 096 public String getCallbackUrl(String externalStatusVar); 097 098 /** 099 * Return a proto configuration for actions with auth properties already set. 100 * 101 * @return a proto configuration for actions with auth properties already set. 102 */ 103 public Configuration getProtoActionConf(); 104 105 /** 106 * Return the workflow job. 107 * 108 * @return the workflow job. 109 */ 110 public WorkflowJob getWorkflow(); 111 112 /** 113 * Return an ELEvaluator with the context injected. 114 * 115 * @return configured ELEvaluator. 116 */ 117 public ELEvaluator getELEvaluator(); 118 119 /** 120 * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 121 * plus a '.'. 122 * 123 * @param name variable name. 124 * @param value variable value, <code>null</code> removes the variable. 125 */ 126 public void setVar(String name, String value); 127 128 /** 129 * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name 130 * plus a '.'. 131 * 132 * @param name variable name. 133 * @return the variable value, <code>null</code> if not set. 134 */ 135 public String getVar(String name); 136 137 /** 138 * Set the action tracking information for an successfully started action. 139 * 140 * @param externalId the action external ID. 141 * @param trackerUri the action tracker URI. 142 * @param consoleUrl the action console URL. 143 */ 144 void setStartData(String externalId, String trackerUri, String consoleUrl); 145 146 /** 147 * Set the action execution completion information for an action. The action status is set to {@link 148 * org.apache.oozie.client.WorkflowAction.Status#DONE} 149 * 150 * @param externalStatus the action external end status. 151 * @param actionData the action data on completion, <code>null</code> if none. 152 */ 153 void setExecutionData(String externalStatus, Properties actionData); 154 155 /** 156 * Set execution statistics information for a particular action. The action status is set to {@link 157 * org.apache.oozie.client.WorkflowAction.Status#DONE} 158 * 159 * @param jsonStats the JSON string representation of the stats. 160 */ 161 void setExecutionStats(String jsonStats); 162 163 /** 164 * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link 165 * org.apache.oozie.client.WorkflowAction.Status#DONE} 166 * 167 * @param externalChildIDs the external child IDs as a comma-delimited string. 168 */ 169 void setExternalChildIDs(String externalChildIDs); 170 171 /** 172 * Set the action end completion information for a completed action. 173 * 174 * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or 175 * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}. 176 * @param signalValue the action external end status. 177 */ 178 void setEndData(WorkflowAction.Status status, String signalValue); 179 180 /** 181 * Return if the executor invocation is a retry or not. 182 * 183 * @return if the executor invocation is a retry or not. 184 */ 185 boolean isRetry(); 186 187 /** 188 * Sets the external status for the action in context. 189 * 190 * @param externalStatus the external status. 191 */ 192 void setExternalStatus(String externalStatus); 193 194 /** 195 * Get the Action Recovery ID. 196 * 197 * @return recovery ID. 198 */ 199 String getRecoveryId(); 200 201 /* 202 * @return the path that will be used to store action specific data 203 * @throws IOException @throws URISyntaxException @throws HadoopAccessorException 204 */ 205 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException; 206 207 /** 208 * @return filesystem handle for the application deployment fs. 209 * @throws IOException 210 * @throws URISyntaxException 211 * @throws HadoopAccessorException 212 */ 213 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException; 214 215 public void setErrorInfo(String str, String exMsg); 216 } 217 218 219 /** 220 * Define the default inteval in seconds between retries. 221 */ 222 public static final long RETRY_INTERVAL = 60; 223 224 private String type; 225 private int maxRetries; 226 private long retryInterval; 227 private RETRYPOLICY retryPolicy; 228 229 /** 230 * Create an action executor with default retry parameters. 231 * 232 * @param type action executor type. 233 */ 234 protected ActionExecutor(String type) { 235 this(type, RETRY_INTERVAL); 236 } 237 238 /** 239 * Create an action executor. 240 * 241 * @param type action executor type. 242 * @param defaultRetryInterval retry interval, in seconds. 243 */ 244 protected ActionExecutor(String type, long defaultRetryInterval) { 245 this.type = ParamChecker.notEmpty(type, "type"); 246 this.maxRetries = ConfigurationService.getInt(MAX_RETRIES); 247 int retryInterval = ConfigurationService.getInt(ACTION_RETRY_INTERVAL); 248 this.retryInterval = retryInterval > 0 ? retryInterval : defaultRetryInterval; 249 this.retryPolicy = getRetryPolicyFromConf(); 250 } 251 252 private RETRYPOLICY getRetryPolicyFromConf() { 253 String retryPolicy = ConfigurationService.get(ACTION_RETRY_POLICY); 254 if (StringUtils.isBlank(retryPolicy)) { 255 return RETRYPOLICY.PERIODIC; 256 } else { 257 try { 258 return RETRYPOLICY.valueOf(retryPolicy.toUpperCase().trim()); 259 } catch (IllegalArgumentException e) { 260 return RETRYPOLICY.PERIODIC; 261 } 262 } 263 } 264 265 /** 266 * Clear all init settings for all action types. 267 */ 268 public static void resetInitInfo() { 269 if (!initMode) { 270 throw new IllegalStateException("Error, action type info locked"); 271 } 272 ERROR_INFOS.clear(); 273 } 274 275 /** 276 * Enable action type initialization. 277 */ 278 public static void enableInit() { 279 initMode = true; 280 } 281 282 /** 283 * Disable action type initialization. 284 */ 285 public static void disableInit() { 286 initMode = false; 287 } 288 289 /** 290 * Invoked once at system initialization time. <p/> It can be used to register error information for the expected 291 * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing 292 * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register 293 * all its possible errors. <p/> Subclasses overriding must invoke super. 294 */ 295 public void initActionType() { 296 XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType()); 297 ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>()); 298 } 299 300 /** 301 * Return the system ID, this ID is defined in Oozie configuration. 302 * 303 * @return the system ID. 304 */ 305 public String getOozieSystemId() { 306 return Services.get().getSystemId(); 307 } 308 309 /** 310 * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a 311 * new directory per system initialization. 312 * 313 * @return the runtime directory of the Oozie instance. 314 */ 315 public String getOozieRuntimeDir() { 316 return Services.get().getRuntimeDir(); 317 } 318 319 /** 320 * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties. 321 * 322 * @return Oozie configuration. 323 */ 324 public Configuration getOozieConf() { 325 return Services.get().getConf(); 326 } 327 328 /** 329 * Register error handling information for an exception. 330 * 331 * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed 332 * to be able to handle multiple version of Hadoop or other JARs used by executors with the same codebase). 333 * @param errorType error type for the exception. 334 * @param errorCode error code for the exception. 335 */ 336 protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) { 337 if (!initMode) { 338 throw new IllegalStateException("Error, action type info locked"); 339 } 340 try { 341 Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass); 342 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 343 executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass)); 344 } 345 catch (ClassNotFoundException cnfe) { 346 XLog.getLog(getClass()).warn( 347 "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass, 348 getType()); 349 } 350 catch (java.lang.NoClassDefFoundError err) { 351 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 352 err.printStackTrace(new PrintStream(baos)); 353 XLog.getLog(getClass()).warn(baos.toString()); 354 } 355 } 356 357 /** 358 * Return the action executor type. 359 * 360 * @return the action executor type. 361 */ 362 public String getType() { 363 return type; 364 } 365 366 /** 367 * Return the maximum number of retries for the action executor. 368 * 369 * @return the maximum number of retries for the action executor. 370 */ 371 public int getMaxRetries() { 372 return maxRetries; 373 } 374 375 /** 376 * Set the maximum number of retries for the action executor. 377 * 378 * @param maxRetries the maximum number of retries. 379 */ 380 public void setMaxRetries(int maxRetries) { 381 this.maxRetries = maxRetries; 382 } 383 384 /** 385 * Return the retry policy for the action executor. 386 * 387 * @return the retry policy for the action executor. 388 */ 389 public RETRYPOLICY getRetryPolicy() { 390 return retryPolicy; 391 } 392 393 /** 394 * Sets the retry policy for the action executor. 395 * 396 * @param retryPolicy retry policy for the action executor. 397 */ 398 public void setRetryPolicy(RETRYPOLICY retryPolicy) { 399 this.retryPolicy = retryPolicy; 400 } 401 402 /** 403 * Return the retry interval for the action executor in seconds. 404 * 405 * @return the retry interval for the action executor in seconds. 406 */ 407 public long getRetryInterval() { 408 return retryInterval; 409 } 410 411 /** 412 * Sets the retry interval for the action executor. 413 * 414 * @param retryInterval retry interval in seconds. 415 */ 416 public void setRetryInterval(long retryInterval) { 417 this.retryInterval = retryInterval; 418 } 419 420 /** 421 * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods 422 * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s. 423 * 424 * @param ex exception to convert. 425 * @return ActionExecutorException converted exception. 426 */ 427 @SuppressWarnings({"ThrowableInstanceNeverThrown"}) 428 protected ActionExecutorException convertException(Exception ex) { 429 if (ex instanceof ActionExecutorException) { 430 return (ActionExecutorException) ex; 431 } 432 433 ActionExecutorException aee = null; 434 // Check the cause of the exception first 435 if (ex.getCause() != null) { 436 aee = convertExceptionHelper(ex.getCause()); 437 } 438 // If the cause isn't registered or doesn't exist, check the exception itself 439 if (aee == null) { 440 aee = convertExceptionHelper(ex); 441 // If the cause isn't registered either, then just create a new ActionExecutorException 442 if (aee == null) { 443 String exClass = ex.getClass().getName(); 444 String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1); 445 aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex); 446 } 447 } 448 return aee; 449 } 450 451 private ActionExecutorException convertExceptionHelper(Throwable ex) { 452 Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType()); 453 // Check if we have registered ex 454 ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName()); 455 if (classErrorInfo != null) { 456 return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex); 457 } 458 // Else, check if a parent class of ex is registered 459 else { 460 for (ErrorInfo errorInfo : executorErrorInfo.values()) { 461 if (errorInfo.errorClass.isInstance(ex)) { 462 return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex); 463 } 464 } 465 } 466 return null; 467 } 468 469 /** 470 * Convenience method that return the signal for an Action based on the action status. 471 * 472 * @param status action status. 473 * @return the action signal. 474 */ 475 protected String getActionSignal(WorkflowAction.Status status) { 476 switch (status) { 477 case OK: 478 return "OK"; 479 case ERROR: 480 case KILLED: 481 return "ERROR"; 482 default: 483 throw new IllegalArgumentException("Action status for signal can only be OK or ERROR"); 484 } 485 } 486 487 /** 488 * Return the path that will be used to store action specific data 489 * 490 * @param jobId Worfklow ID 491 * @param action Action 492 * @param key An Identifier 493 * @param temp temp directory flag 494 * @return A string that has the path 495 */ 496 protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) { 497 String name = jobId + "/" + action.getName() + "--" + key; 498 if (temp) { 499 name += ".temp"; 500 } 501 return getOozieSystemId() + "/" + name; 502 } 503 504 /** 505 * Return the path that will be used to store action specific data. 506 * 507 * @param jobId Workflow ID 508 * @param action Action 509 * @param key An identifier 510 * @param temp Temp directory flag 511 * @return Path to the directory 512 */ 513 public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) { 514 return new Path(getActionDirPath(jobId, action, key, temp)); 515 } 516 517 /** 518 * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the 519 * action has completed, the {@link Context#setExecutionData} method must be called within this method. 520 * 521 * @param context executor context. 522 * @param action the action to start. 523 * @throws ActionExecutorException thrown if the action could not start. 524 */ 525 public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException; 526 527 /** 528 * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this 529 * method. 530 * 531 * @param context executor context. 532 * @param action the action to end. 533 * @throws ActionExecutorException thrown if the action could not end. 534 */ 535 public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException; 536 537 /** 538 * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action 539 * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action 540 * has not completed, the {@link Context#setExternalStatus} method must be called within this method. 541 * 542 * @param context executor context. 543 * @param action the action to end. 544 * @throws ActionExecutorException thrown if the action could not be checked. 545 */ 546 public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException; 547 548 /** 549 * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method. 550 * 551 * @param context executor context. 552 * @param action the action to kill. 553 * @throws ActionExecutorException thrown if the action could not be killed. 554 */ 555 public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException; 556 557 /** 558 * Return if the external status indicates that the action has completed. 559 * 560 * @param externalStatus external status to check. 561 * @return if the external status indicates that the action has completed. 562 */ 563 public abstract boolean isCompleted(String externalStatus); 564 565}