001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.wf; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.net.URI; 024import java.net.URISyntaxException; 025import java.util.Date; 026import java.util.Properties; 027import java.util.Set; 028 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.oozie.DagELFunctions; 033import org.apache.oozie.ErrorCode; 034import org.apache.oozie.WorkflowActionBean; 035import org.apache.oozie.WorkflowJobBean; 036import org.apache.oozie.action.ActionExecutor; 037import org.apache.oozie.client.WorkflowAction; 038import org.apache.oozie.client.WorkflowJob; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.service.CallbackService; 041import org.apache.oozie.service.ELService; 042import org.apache.oozie.service.HadoopAccessorException; 043import org.apache.oozie.service.HadoopAccessorService; 044import org.apache.oozie.service.JPAService; 045import org.apache.oozie.service.LiteWorkflowStoreService; 046import org.apache.oozie.service.Services; 047import org.apache.oozie.util.ELEvaluator; 048import org.apache.oozie.util.InstrumentUtils; 049import org.apache.oozie.util.Instrumentation; 050import org.apache.oozie.util.XConfiguration; 051import org.apache.oozie.workflow.WorkflowException; 052import org.apache.oozie.workflow.WorkflowInstance; 053import org.apache.oozie.workflow.lite.LiteWorkflowInstance; 054 055/** 056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while 057 * attempting to start or end an action. 058 */ 059public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> { 060 private static final String INSTRUMENTATION_GROUP = "action.executors"; 061 062 protected static final String RECOVERY_ID_SEPARATOR = "@"; 063 064 public ActionXCommand(String name, String type, int priority) { 065 super(name, type, priority); 066 } 067 068 /** 069 * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough 070 * attempts have been made. Otherwise returns false. 071 * 072 * @param context the execution context. 073 * @param executor the executor instance being used. 074 * @param status the status to be set for the action. 075 * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the 076 * maximum number of configured retries. 077 * @throws CommandException thrown if unable to handle transient 078 */ 079 protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor, 080 WorkflowAction.Status status) throws CommandException { 081 LOG.debug("Attempting to retry"); 082 ActionExecutorContext aContext = (ActionExecutorContext) context; 083 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 084 incrActionErrorCounter(action.getType(), "transient", 1); 085 086 int actionRetryCount = action.getRetries(); 087 if (actionRetryCount >= executor.getMaxRetries()) { 088 LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries()); 089 return false; 090 } 091 else { 092 action.setStatus(status); 093 action.setPending(); 094 action.incRetries(); 095 long retryDelayMillis = getRetryDelay(actionRetryCount, executor.getRetryInterval(), executor.getRetryPolicy()); 096 action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis)); 097 LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis); 098 this.resetUsed(); 099 queue(this, retryDelayMillis); 100 return true; 101 } 102 } 103 104 /** 105 * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and 106 * set pending flag of action to false 107 * 108 * @param context the execution context. 109 * @param executor the executor instance being used. 110 * @param status the status to be set for the action. 111 * @throws CommandException thrown if unable to suspend job 112 */ 113 protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor, 114 WorkflowAction.Status status) throws CommandException { 115 ActionExecutorContext aContext = (ActionExecutorContext) context; 116 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 117 incrActionErrorCounter(action.getType(), "nontransient", 1); 118 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 119 String id = workflow.getId(); 120 action.setStatus(status); 121 action.resetPendingOnly(); 122 LOG.warn("Suspending Workflow Job id=" + id); 123 try { 124 SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null); 125 } 126 catch (Exception e) { 127 throw new CommandException(ErrorCode.E0727, id, e.getMessage()); 128 } 129 finally { 130 updateParentIfNecessary(workflow, 3); 131 } 132 } 133 134 /** 135 * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an 136 * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated. 137 * </p> 138 * 139 * @param context the execution context. 140 * @param executor the executor instance being used. 141 * @param message 142 * @param isStart whether the error was generated while starting or ending an action. 143 * @param status the status to be set for the action. 144 * @throws CommandException thrown if unable to handle action error 145 */ 146 protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message, 147 boolean isStart, WorkflowAction.Status status) throws CommandException { 148 LOG.warn("Setting Action Status to [{0}]", status); 149 ActionExecutorContext aContext = (ActionExecutorContext) context; 150 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 151 152 if (!handleUserRetry(action)) { 153 incrActionErrorCounter(action.getType(), "error", 1); 154 action.setPending(); 155 if (isStart) { 156 action.setExecutionData(message, null); 157 queue(new ActionEndXCommand(action.getId(), action.getType())); 158 } 159 else { 160 action.setEndData(status, WorkflowAction.Status.ERROR.toString()); 161 } 162 } 163 } 164 165 /** 166 * Fail the job due to failed action 167 * 168 * @param context the execution context. 169 * @throws CommandException thrown if unable to fail job 170 */ 171 public void failJob(ActionExecutor.Context context) throws CommandException { 172 ActionExecutorContext aContext = (ActionExecutorContext) context; 173 WorkflowActionBean action = (WorkflowActionBean) aContext.getAction(); 174 failJob(context, action); 175 } 176 177 /** 178 * Fail the job due to failed action 179 * 180 * @param context the execution context. 181 * @param action the action that caused the workflow to fail 182 * @throws CommandException thrown if unable to fail job 183 */ 184 public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException { 185 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 186 if (!handleUserRetry(action)) { 187 incrActionErrorCounter(action.getType(), "failed", 1); 188 LOG.warn("Failing Job due to failed action [{0}]", action.getName()); 189 try { 190 workflow.getWorkflowInstance().fail(action.getName()); 191 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 192 ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED); 193 workflow.setWorkflowInstance(wfInstance); 194 workflow.setStatus(WorkflowJob.Status.FAILED); 195 action.setStatus(WorkflowAction.Status.FAILED); 196 action.resetPending(); 197 queue(new WorkflowNotificationXCommand(workflow, action)); 198 queue(new KillXCommand(workflow.getId())); 199 InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation()); 200 } 201 catch (WorkflowException ex) { 202 throw new CommandException(ex); 203 } 204 } 205 } 206 207 /** 208 * Execute retry for action if this action is eligible for user-retry 209 * 210 * @param context the execution context. 211 * @return true if user-retry has to be handled for this action 212 * @throws CommandException thrown if unable to fail job 213 */ 214 public boolean handleUserRetry(WorkflowActionBean action) throws CommandException { 215 String errorCode = action.getErrorCode(); 216 Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode(); 217 218 if ((allowedRetryCode.contains(LiteWorkflowStoreService.USER_ERROR_CODE_ALL) || allowedRetryCode.contains(errorCode)) 219 && action.getUserRetryCount() < action.getUserRetryMax()) { 220 LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], " 221 + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action 222 .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval()); 223 int interval = action.getUserRetryInterval() * 60 * 1000; 224 action.setStatus(WorkflowAction.Status.USER_RETRY); 225 action.incrmentUserRetryCount(); 226 action.setPending(); 227 queue(new ActionStartXCommand(action.getId(), action.getType()), interval); 228 return true; 229 } 230 return false; 231 } 232 233 /* 234 * In case of action error increment the error count for instrumentation 235 */ 236 private void incrActionErrorCounter(String type, String error, int count) { 237 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count); 238 } 239 240 /** 241 * Increment the action counter in the instrumentation log. indicating how 242 * many times the action was executed since the start Oozie server 243 */ 244 protected void incrActionCounter(String type, int count) { 245 getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count); 246 } 247 248 /** 249 * Adding a cron for the instrumentation time for the given Instrumentation 250 * group 251 */ 252 protected void addActionCron(String type, Instrumentation.Cron cron) { 253 getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron); 254 } 255 256 /* 257 * Returns the next retry time in milliseconds, based on retry policy algorithm. 258 */ 259 private long getRetryDelay(int retryCount, long retryInterval, ActionExecutor.RETRYPOLICY retryPolicy) { 260 switch (retryPolicy) { 261 case EXPONENTIAL: 262 long retryTime = ((long) Math.pow(2, retryCount) * retryInterval * 1000L); 263 return retryTime; 264 case PERIODIC: 265 return retryInterval * 1000L; 266 default: 267 throw new UnsupportedOperationException("Retry policy not supported"); 268 } 269 } 270 271 /** 272 * Workflow action executor context 273 * 274 */ 275 public static class ActionExecutorContext implements ActionExecutor.Context { 276 private final WorkflowJobBean workflow; 277 private Configuration protoConf; 278 private final WorkflowActionBean action; 279 private final boolean isRetry; 280 private final boolean isUserRetry; 281 private boolean started; 282 private boolean ended; 283 private boolean executed; 284 285 /** 286 * Constructing the ActionExecutorContext, setting the private members 287 * and constructing the proto configuration 288 */ 289 public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) { 290 this.workflow = workflow; 291 this.action = action; 292 this.isRetry = isRetry; 293 this.isUserRetry = isUserRetry; 294 try { 295 protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf())); 296 } 297 catch (IOException ex) { 298 throw new RuntimeException("It should not happen", ex); 299 } 300 } 301 302 /* 303 * (non-Javadoc) 304 * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String) 305 */ 306 public String getCallbackUrl(String externalStatusVar) { 307 return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar); 308 } 309 310 /* 311 * (non-Javadoc) 312 * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf() 313 */ 314 public Configuration getProtoActionConf() { 315 return protoConf; 316 } 317 318 /* 319 * (non-Javadoc) 320 * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow() 321 */ 322 public WorkflowJob getWorkflow() { 323 return workflow; 324 } 325 326 /** 327 * Returns the workflow action of the given action context 328 * 329 * @return the workflow action of the given action context 330 */ 331 public WorkflowAction getAction() { 332 return action; 333 } 334 335 /* 336 * (non-Javadoc) 337 * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator() 338 */ 339 public ELEvaluator getELEvaluator() { 340 ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow"); 341 DagELFunctions.configureEvaluator(evaluator, workflow, action); 342 return evaluator; 343 } 344 345 /* 346 * (non-Javadoc) 347 * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String) 348 */ 349 public void setVar(String name, String value) { 350 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 351 WorkflowInstance wfInstance = workflow.getWorkflowInstance(); 352 wfInstance.setVar(name, value); 353 workflow.setWorkflowInstance(wfInstance); 354 } 355 356 /* 357 * (non-Javadoc) 358 * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String) 359 */ 360 public String getVar(String name) { 361 name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; 362 return workflow.getWorkflowInstance().getVar(name); 363 } 364 365 /* 366 * (non-Javadoc) 367 * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String) 368 */ 369 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 370 action.setStartData(externalId, trackerUri, consoleUrl); 371 started = true; 372 } 373 374 /** 375 * Setting the start time of the action 376 */ 377 public void setStartTime() { 378 Date now = new Date(); 379 action.setStartTime(now); 380 } 381 382 /* 383 * (non-Javadoc) 384 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties) 385 */ 386 public void setExecutionData(String externalStatus, Properties actionData) { 387 action.setExecutionData(externalStatus, actionData); 388 executed = true; 389 } 390 391 /* 392 * (non-Javadoc) 393 * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String) 394 */ 395 public void setExecutionStats(String jsonStats) { 396 action.setExecutionStats(jsonStats); 397 executed = true; 398 } 399 400 /* 401 * (non-Javadoc) 402 * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String) 403 */ 404 public void setExternalChildIDs(String externalChildIDs) { 405 action.setExternalChildIDs(externalChildIDs); 406 executed = true; 407 } 408 409 /* 410 * (non-Javadoc) 411 * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String) 412 */ 413 public void setEndData(WorkflowAction.Status status, String signalValue) { 414 action.setEndData(status, signalValue); 415 ended = true; 416 } 417 418 /* 419 * (non-Javadoc) 420 * @see org.apache.oozie.action.ActionExecutor.Context#isRetry() 421 */ 422 public boolean isRetry() { 423 return isRetry; 424 } 425 426 /** 427 * Return if the executor invocation is a user retry or not. 428 * 429 * @return if the executor invocation is a user retry or not. 430 */ 431 public boolean isUserRetry() { 432 return isUserRetry; 433 } 434 435 /** 436 * Returns whether setStartData has been called or not. 437 * 438 * @return true if start completion info has been set. 439 */ 440 public boolean isStarted() { 441 return started; 442 } 443 444 /** 445 * Returns whether setExecutionData has been called or not. 446 * 447 * @return true if execution completion info has been set, otherwise false. 448 */ 449 public boolean isExecuted() { 450 return executed; 451 } 452 453 /** 454 * Returns whether setEndData has been called or not. 455 * 456 * @return true if end completion info has been set. 457 */ 458 public boolean isEnded() { 459 return ended; 460 } 461 462 public void setExternalStatus(String externalStatus) { 463 action.setExternalStatus(externalStatus); 464 } 465 466 @Override 467 public String getRecoveryId() { 468 return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun(); 469 } 470 471 /* (non-Javadoc) 472 * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir() 473 */ 474 public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException { 475 String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType(); 476 FileSystem fs = getAppFileSystem(); 477 String actionDirPath = Services.get().getSystemId() + "/" + name; 478 Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath); 479 return fqActionDir; 480 } 481 482 /* (non-Javadoc) 483 * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem() 484 */ 485 public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException { 486 WorkflowJob workflow = getWorkflow(); 487 URI uri = new URI(getWorkflow().getAppPath()); 488 HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); 489 Configuration fsConf = has.createJobConf(uri.getAuthority()); 490 return has.createFileSystem(workflow.getUser(), uri, fsConf); 491 492 } 493 494 /* (non-Javadoc) 495 * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String) 496 */ 497 @Override 498 public void setErrorInfo(String str, String exMsg) { 499 action.setErrorInfo(str, exMsg); 500 } 501 } 502 503}