001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.Date; 026import java.util.HashMap; 027import java.util.Map; 028import java.util.Properties; 029import java.util.Set; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.oozie.DagELFunctions; 035import org.apache.oozie.ErrorCode; 036import org.apache.oozie.WorkflowActionBean; 037import org.apache.oozie.WorkflowJobBean; 038import org.apache.oozie.action.ActionExecutor; 039import org.apache.oozie.client.Job; 040import org.apache.oozie.client.WorkflowAction; 041import org.apache.oozie.client.WorkflowJob; 042import org.apache.oozie.client.rest.JsonTags; 043import org.apache.oozie.command.CommandException; 044import org.apache.oozie.service.CallbackService; 045import org.apache.oozie.service.ConfigurationService; 046import org.apache.oozie.service.ELService; 047import org.apache.oozie.service.HadoopAccessorException; 048import org.apache.oozie.service.HadoopAccessorService; 049import org.apache.oozie.service.JPAService; 050import org.apache.oozie.service.LiteWorkflowStoreService; 051import org.apache.oozie.service.Services; 052import org.apache.oozie.util.ELEvaluator; 053import org.apache.oozie.util.InstrumentUtils; 054import org.apache.oozie.util.Instrumentation; 055import org.apache.oozie.util.JobUtils; 056import org.apache.oozie.util.XConfiguration; 057import org.apache.oozie.workflow.WorkflowException; 058import org.apache.oozie.workflow.WorkflowInstance; 059import org.apache.oozie.workflow.lite.LiteWorkflowApp; 060import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 061import org.apache.oozie.workflow.lite.NodeDef; 062 063/** 064 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 065 * attempting to start or end an action. 066 */ 067public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { 068 private static final String INSTRUMENTATION_GROUP = "action.executors"; 069 public static final String RETRY = "retry."; 070 071 protected static final String RECOVERY_ID_SEPARATOR = "@"; 072 073 public ActionXCommand(String name, String type, int priority) { 074 super(name, type, priority); 075 } 076 077 /** 078 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 079 * attempts have been made. Otherwise returns false. 080 * 081 * @param context the execution context. 082 * @param executor the executor instance being used. 083 * @param status the status to be set for the action. 084 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 085 * maximum number of configured retries. 086 * @throws CommandException thrown if unable to handle transient 087 */ 088 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, 089 WorkflowAction.Status status) throws CommandException { 090 LOG.debug("Attempting to retry"); 091 ActionExecutorContext aContext = (ActionExecutorContext) context; 092 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 093 incrActionErrorCounter(action.getType(), "transient", 1); 094 095 int actionRetryCount = action.getRetries(); 096 if (actionRetryCount >= executor.getMaxRetries()) { 097 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 098 return false; 099 } 100 else { 101 action.setStatus(status); 102 action.setPending(); 103 action.incRetries(); 104 long retryDelayMillis = getRetryDelay(actionRetryCount, executor.getRetryInterval(), executor.getRetryPolicy()); 105 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 106 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); 107 this.resetUsed(); 108 queueCommandForTransientFailure(retryDelayMillis); 109 return true; 110 } 111 } 112 113 protected void queueCommandForTransientFailure(long retryDelayMillis){ 114 queue(this, retryDelayMillis); 115 } 116 /** 117 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and 118 * set pending flag of action to false 119 * 120 * @param context the execution context. 121 * @param executor the executor instance being used. 122 * @param status the status to be set for the action. 123 * @throws CommandException thrown if unable to suspend job 124 */ 125 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, 126 WorkflowAction.Status status) throws CommandException { 127 ActionExecutorContext aContext = (ActionExecutorContext) context; 128 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 129 incrActionErrorCounter(action.getType(), "nontransient", 1); 130 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 131 String id = workflow.getId(); 132 action.setStatus(status); 133 action.resetPendingOnly(); 134 LOG.warn("Suspending Workflow Job id=" + id); 135 try { 136 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null); 137 } 138 catch (Exception e) { 139 throw new CommandException(ErrorCode.E0727, id, e.getMessage()); 140 } 141 finally { 142 updateParentIfNecessary(workflow, 3); 143 } 144 } 145 146 /** 147 * Takes care of errors. <p> For errors while attempting to start the action, the job state is updated and an 148 * {@link ActionEndXCommand} is queued. <p> For errors while attempting to end the action, the job state is updated. 149 * <p> 150 * 151 * @param context the execution context. 152 * @param executor the executor instance being used. 153 * @param message 154 * @param isStart whether the error was generated while starting or ending an action. 155 * @param status the status to be set for the action. 156 * @throws CommandException thrown if unable to handle action error 157 */ 158 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 159 boolean isStart, WorkflowAction.Status status) throws CommandException { 160 LOG.warn("Setting Action Status to [{0}]", status); 161 ActionExecutorContext aContext = (ActionExecutorContext) context; 162 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 163 if (!handleUserRetry(context, action)) { 164 incrActionErrorCounter(action.getType(), "error", 1); 165 action.setPending(); 166 if (isStart) { 167 action.setExecutionData(message, null); 168 queue(new ActionEndXCommand(action.getId(), action.getType())); 169 } 170 else { 171 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 172 } 173 } 174 } 175 176 /** 177 * Fail the job due to failed action 178 * 179 * @param context the execution context. 180 * @throws CommandException thrown if unable to fail job 181 */ 182 public void failJob(ActionExecutor.Context context) throws CommandException { 183 ActionExecutorContext aContext = (ActionExecutorContext) context; 184 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 185 failJob(context, action); 186 } 187 188 /** 189 * Fail the job due to failed action 190 * 191 * @param context the execution context. 192 * @param action the action that caused the workflow to fail 193 * @throws CommandException thrown if unable to fail job 194 */ 195 public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { 196 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 197 if (!handleUserRetry(context, action)) { 198 incrActionErrorCounter(action.getType(), "failed", 1); 199 LOG.warn("Failing Job due to failed action [{0}]", action.getName()); 200 try { 201 workflow.getWorkflowInstance().fail(action.getName()); 202 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 203 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 204 workflow.setWorkflowInstance(wfInstance); 205 workflow.setStatus(WorkflowJob.Status.FAILED); 206 action.setStatus(WorkflowAction.Status.FAILED); 207 action.resetPending(); 208 queue(new WorkflowNotificationXCommand(workflow, action)); 209 queue(new KillXCommand(workflow.getId())); 210 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 211 } 212 catch (WorkflowException ex) { 213 throw new CommandException(ex); 214 } 215 } 216 } 217 218 /** 219 * Execute retry for action if this action is eligible for user-retry 220 * 221 * @param action the Workflow action bean 222 * @return true if user-retry has to be handled for this action 223 * @throws CommandException thrown if unable to fail job 224 */ 225 public boolean handleUserRetry(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { 226 WorkflowJobBean wfJob = (WorkflowJobBean) context.getWorkflow(); 227 String errorCode = action.getErrorCode(); 228 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); 229 230 if ((allowedRetryCode.contains(LiteWorkflowStoreService.USER_ERROR_CODE_ALL) || allowedRetryCode.contains(errorCode)) 231 && action.getUserRetryCount() < action.getUserRetryMax()) { 232 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " 233 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action 234 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); 235 ActionExecutor.RETRYPOLICY retryPolicy = getUserRetryPolicy(action, wfJob); 236 long interval = getRetryDelay(action.getUserRetryCount(), action.getUserRetryInterval() * 60, retryPolicy); 237 action.setStatus(WorkflowAction.Status.USER_RETRY); 238 context.setVar(JobUtils.getRetryKey(action, JsonTags.WORKFLOW_ACTION_END_TIME), String.valueOf(new Date().getTime())); 239 action.incrmentUserRetryCount(); 240 action.setPending(); 241 queue(new ActionStartXCommand(action.getId(), action.getType()), interval); 242 return true; 243 } 244 return false; 245 } 246 247 /* 248 * In case of action error increment the error count for instrumentation 249 */ 250 private void incrActionErrorCounter(String type, String error, int count) { 251 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 252 } 253 254 /** 255 * Increment the action counter in the instrumentation log. indicating how 256 * many times the action was executed since the start Oozie server 257 */ 258 protected void incrActionCounter(String type, int count) { 259 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 260 } 261 262 /** 263 * Adding a cron for the instrumentation time for the given Instrumentation 264 * group 265 */ 266 protected void addActionCron(String type, Instrumentation.Cron cron) { 267 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 268 } 269 270 /* 271 * Returns the next retry time in milliseconds, based on retry policy algorithm. 272 */ 273 private long getRetryDelay(int retryCount, long retryInterval, ActionExecutor.RETRYPOLICY retryPolicy) { 274 switch (retryPolicy) { 275 case EXPONENTIAL: 276 long retryTime = ((long) Math.pow(2, retryCount) * retryInterval * 1000L); 277 return retryTime; 278 case PERIODIC: 279 return retryInterval * 1000L; 280 default: 281 throw new UnsupportedOperationException("Retry policy not supported"); 282 } 283 } 284 285 /** 286 * Workflow action executor context 287 * 288 */ 289 public static class ActionExecutorContext implements ActionExecutor.Context { 290 protected final WorkflowJobBean workflow; 291 private Configuration protoConf; 292 protected final WorkflowActionBean action; 293 private final boolean isRetry; 294 private final boolean isUserRetry; 295 private boolean started; 296 private boolean ended; 297 private boolean executed; 298 private boolean shouldEndWF; 299 private Job.Status jobStatus; 300 301 /** 302 * Constructing the ActionExecutorContext, setting the private members 303 * and constructing the proto configuration 304 */ 305 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, 306 boolean isUserRetry) { 307 this.workflow = workflow; 308 this.action = action; 309 this.isRetry = isRetry; 310 this.isUserRetry = isUserRetry; 311 if (null != workflow.getProtoActionConf()) { 312 try { 313 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 314 } 315 catch (IOException ex) { 316 throw new RuntimeException("It should not happen", ex); 317 } 318 } 319 } 320 321 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action) { 322 this(workflow, action, false, false); 323 } 324 325 /* 326 * (non-Javadoc) 327 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String) 328 */ 329 public String getCallbackUrl(String externalStatusVar) { 330 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 331 } 332 333 /* 334 * (non-Javadoc) 335 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf() 336 */ 337 public Configuration getProtoActionConf() { 338 return protoConf; 339 } 340 341 /* 342 * (non-Javadoc) 343 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow() 344 */ 345 public WorkflowJob getWorkflow() { 346 return workflow; 347 } 348 349 /** 350 * Returns the workflow action of the given action context 351 * 352 * @return the workflow action of the given action context 353 */ 354 public WorkflowAction getAction() { 355 return action; 356 } 357 358 /* 359 * (non-Javadoc) 360 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator() 361 */ 362 public ELEvaluator getELEvaluator() { 363 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 364 DagELFunctions.configureEvaluator(evaluator, workflow, action); 365 return evaluator; 366 } 367 368 /* 369 * (non-Javadoc) 370 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String) 371 */ 372 public void setVar(String name, String value) { 373 setVarToWorkflow(name, value); 374 } 375 376 /** 377 * This is not thread safe, don't use if workflowjob is shared among multiple actions command 378 * @param name 379 * @param value 380 */ 381 public void setVarToWorkflow(String name, String value) { 382 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 383 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 384 wfInstance.setVar(name, value); 385 workflow.setWorkflowInstance(wfInstance); 386 } 387 388 /* 389 * (non-Javadoc) 390 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String) 391 */ 392 public String getVar(String name) { 393 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 394 return workflow.getWorkflowInstance().getVar(name); 395 } 396 397 /* 398 * (non-Javadoc) 399 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String) 400 */ 401 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 402 setVar(JobUtils.getRetryKey(action, JsonTags.WORKFLOW_ACTION_CONSOLE_URL), consoleUrl); 403 action.setStartData(externalId, trackerUri, consoleUrl); 404 started = true; 405 } 406 407 /** 408 * Setting the start time of the action 409 */ 410 public void setStartTime() { 411 Date now = new Date(); 412 action.setStartTime(now); 413 } 414 415 /* 416 * (non-Javadoc) 417 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties) 418 */ 419 public void setExecutionData(String externalStatus, Properties actionData) { 420 action.setExecutionData(externalStatus, actionData); 421 executed = true; 422 } 423 424 /* 425 * (non-Javadoc) 426 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String) 427 */ 428 public void setExecutionStats(String jsonStats) { 429 action.setExecutionStats(jsonStats); 430 executed = true; 431 } 432 433 /* 434 * (non-Javadoc) 435 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String) 436 */ 437 public void setExternalChildIDs(String externalChildIDs) { 438 setVar(JobUtils.getRetryKey(action, JsonTags.WORKFLOW_ACTION_EXTERNAL_CHILD_IDS), externalChildIDs); 439 action.setExternalChildIDs(externalChildIDs); 440 executed = true; 441 } 442 443 /* 444 * (non-Javadoc) 445 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String) 446 */ 447 public void setEndData(WorkflowAction.Status status, String signalValue) { 448 action.setEndData(status, signalValue); 449 ended = true; 450 } 451 452 /* 453 * (non-Javadoc) 454 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry() 455 */ 456 public boolean isRetry() { 457 return isRetry; 458 } 459 460 /** 461 * Return if the executor invocation is a user retry or not. 462 * 463 * @return if the executor invocation is a user retry or not. 464 */ 465 public boolean isUserRetry() { 466 return isUserRetry; 467 } 468 469 /** 470 * Returns whether setStartData has been called or not. 471 * 472 * @return true if start completion info has been set. 473 */ 474 public boolean isStarted() { 475 return started; 476 } 477 478 /** 479 * Returns whether setExecutionData has been called or not. 480 * 481 * @return true if execution completion info has been set, otherwise false. 482 */ 483 public boolean isExecuted() { 484 return executed; 485 } 486 487 /** 488 * Returns whether setEndData has been called or not. 489 * 490 * @return true if end completion info has been set. 491 */ 492 public boolean isEnded() { 493 return ended; 494 } 495 496 public void setExternalStatus(String externalStatus) { 497 action.setExternalStatus(externalStatus); 498 } 499 500 @Override 501 public String getRecoveryId() { 502 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 503 } 504 505 /* (non-Javadoc) 506 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 507 */ 508 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 509 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 510 FileSystem fs = getAppFileSystem(); 511 String actionDirPath = Services.get().getSystemId() + "/" + name; 512 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 513 return fqActionDir; 514 } 515 516 /* (non-Javadoc) 517 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 518 */ 519 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 520 WorkflowJob workflow = getWorkflow(); 521 URI uri = new URI(getWorkflow().getAppPath()); 522 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 523 Configuration fsConf = has.createConfiguration(uri.getAuthority()); 524 return has.createFileSystem(workflow.getUser(), uri, fsConf); 525 526 } 527 528 /* (non-Javadoc) 529 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String) 530 */ 531 @Override 532 public void setErrorInfo(String str, String exMsg) { 533 action.setErrorInfo(str, exMsg); 534 } 535 536 public boolean isShouldEndWF() { 537 return shouldEndWF; 538 } 539 540 public void setShouldEndWF(boolean shouldEndWF) { 541 this.shouldEndWF = shouldEndWF; 542 } 543 544 public Job.Status getJobStatus() { 545 return jobStatus; 546 } 547 548 public void setJobStatus(Job.Status jobStatus) { 549 this.jobStatus = jobStatus; 550 } 551 } 552 553 public static class ForkedActionExecutorContext extends ActionExecutorContext { 554 private Map<String, String> contextVariableMap = new HashMap<String, String>(); 555 556 public ForkedActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, 557 boolean isUserRetry) { 558 super(workflow, action, isRetry, isUserRetry); 559 } 560 561 public void setVar(String name, String value) { 562 if (value == null) { 563 contextVariableMap.remove(name); 564 } 565 else { 566 contextVariableMap.put(name, value); 567 } 568 } 569 570 public String getVar(String name) { 571 return contextVariableMap.get(name); 572 } 573 574 public Map<String, String> getContextMap() { 575 return contextVariableMap; 576 } 577 } 578 579 /* 580 * Returns user retry policy 581 */ 582 private ActionExecutor.RETRYPOLICY getUserRetryPolicy(WorkflowActionBean wfAction, WorkflowJobBean wfJob) { 583 WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); 584 LiteWorkflowApp wfApp = (LiteWorkflowApp) wfInstance.getApp(); 585 NodeDef nodeDef = wfApp.getNode(wfAction.getName()); 586 if (nodeDef == null) { 587 return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY); 588 } 589 String userRetryPolicy = nodeDef.getUserRetryPolicy().toUpperCase(); 590 String userRetryPolicyInSysConfig = ConfigurationService.get(LiteWorkflowStoreService.CONF_USER_RETRY_POLICY) 591 .toUpperCase(); 592 if (isValidRetryPolicy(userRetryPolicy)) { 593 return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicy); 594 } 595 else if (isValidRetryPolicy(userRetryPolicyInSysConfig)) { 596 return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicyInSysConfig); 597 } 598 else { 599 return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY); 600 } 601 } 602 603 /* 604 * Returns true if policy is valid, otherwise false 605 */ 606 private static boolean isValidRetryPolicy(String policy) { 607 try { 608 ActionExecutor.RETRYPOLICY.valueOf(policy.toUpperCase().trim()); 609 } 610 catch (IllegalArgumentException e) { 611 // Invalid Policy 612 return false; 613 } 614 return true; 615 } 616 617}