001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.Date;
026import java.util.HashMap;
027import java.util.Map;
028import java.util.Properties;
029import java.util.Set;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.oozie.DagELFunctions;
035import org.apache.oozie.ErrorCode;
036import org.apache.oozie.WorkflowActionBean;
037import org.apache.oozie.WorkflowJobBean;
038import org.apache.oozie.action.ActionExecutor;
039import org.apache.oozie.client.Job;
040import org.apache.oozie.client.WorkflowAction;
041import org.apache.oozie.client.WorkflowJob;
042import org.apache.oozie.client.rest.JsonTags;
043import org.apache.oozie.command.CommandException;
044import org.apache.oozie.service.CallbackService;
045import org.apache.oozie.service.ConfigurationService;
046import org.apache.oozie.service.ELService;
047import org.apache.oozie.service.HadoopAccessorException;
048import org.apache.oozie.service.HadoopAccessorService;
049import org.apache.oozie.service.JPAService;
050import org.apache.oozie.service.LiteWorkflowStoreService;
051import org.apache.oozie.service.Services;
052import org.apache.oozie.util.ELEvaluator;
053import org.apache.oozie.util.InstrumentUtils;
054import org.apache.oozie.util.Instrumentation;
055import org.apache.oozie.util.JobUtils;
056import org.apache.oozie.util.XConfiguration;
057import org.apache.oozie.workflow.WorkflowException;
058import org.apache.oozie.workflow.WorkflowInstance;
059import org.apache.oozie.workflow.lite.LiteWorkflowApp;
060import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
061import org.apache.oozie.workflow.lite.NodeDef;
062
063/**
064 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
065 * attempting to start or end an action.
066 */
067public abstract class ActionXCommand<T> extends WorkflowXCommand<T> {
068    private static final String INSTRUMENTATION_GROUP = "action.executors";
069    public static final String RETRY = "retry.";
070
071    protected static final String RECOVERY_ID_SEPARATOR = "@";
072
073    public ActionXCommand(String name, String type, int priority) {
074        super(name, type, priority);
075    }
076
077    /**
078     * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
079     * attempts have been made. Otherwise returns false.
080     *
081     * @param context the execution context.
082     * @param executor the executor instance being used.
083     * @param status the status to be set for the action.
084     * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
085     *         maximum number of configured retries.
086     * @throws CommandException thrown if unable to handle transient
087     */
088    protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
089            WorkflowAction.Status status) throws CommandException {
090        LOG.debug("Attempting to retry");
091        ActionExecutorContext aContext = (ActionExecutorContext) context;
092        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
093        incrActionErrorCounter(action.getType(), "transient", 1);
094
095        int actionRetryCount = action.getRetries();
096        if (actionRetryCount >= executor.getMaxRetries()) {
097            LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
098            return false;
099        }
100        else {
101            action.setStatus(status);
102            action.setPending();
103            action.incRetries();
104            long retryDelayMillis = getRetryDelay(actionRetryCount, executor.getRetryInterval(), executor.getRetryPolicy());
105            action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
106            LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
107            this.resetUsed();
108            queueCommandForTransientFailure(retryDelayMillis);
109            return true;
110        }
111    }
112
113    protected void queueCommandForTransientFailure(long retryDelayMillis){
114        queue(this, retryDelayMillis);
115    }
116    /**
117     * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
118     * set pending flag of action to false
119     *
120     * @param context the execution context.
121     * @param executor the executor instance being used.
122     * @param status the status to be set for the action.
123     * @throws CommandException thrown if unable to suspend job
124     */
125    protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
126            WorkflowAction.Status status) throws CommandException {
127        ActionExecutorContext aContext = (ActionExecutorContext) context;
128        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
129        incrActionErrorCounter(action.getType(), "nontransient", 1);
130        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
131        String id = workflow.getId();
132        action.setStatus(status);
133        action.resetPendingOnly();
134        LOG.warn("Suspending Workflow Job id=" + id);
135        try {
136            SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
137        }
138        catch (Exception e) {
139            throw new CommandException(ErrorCode.E0727, id, e.getMessage());
140        }
141        finally {
142            updateParentIfNecessary(workflow, 3);
143        }
144    }
145
146    /**
147     * Takes care of errors. <p> For errors while attempting to start the action, the job state is updated and an
148     * {@link ActionEndXCommand} is queued. <p> For errors while attempting to end the action, the job state is updated.
149     * <p>
150     *
151     * @param context the execution context.
152     * @param executor the executor instance being used.
153     * @param message
154     * @param isStart whether the error was generated while starting or ending an action.
155     * @param status the status to be set for the action.
156     * @throws CommandException thrown if unable to handle action error
157     */
158    protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
159            boolean isStart, WorkflowAction.Status status) throws CommandException {
160        LOG.warn("Setting Action Status to [{0}]", status);
161        ActionExecutorContext aContext = (ActionExecutorContext) context;
162        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
163        if (!handleUserRetry(context, action)) {
164            incrActionErrorCounter(action.getType(), "error", 1);
165            action.setPending();
166            if (isStart) {
167                action.setExecutionData(message, null);
168                queue(new ActionEndXCommand(action.getId(), action.getType()));
169            }
170            else {
171                action.setEndData(status, WorkflowAction.Status.ERROR.toString());
172            }
173        }
174    }
175
176    /**
177     * Fail the job due to failed action
178     *
179     * @param context the execution context.
180     * @throws CommandException thrown if unable to fail job
181     */
182    public void failJob(ActionExecutor.Context context) throws CommandException {
183        ActionExecutorContext aContext = (ActionExecutorContext) context;
184        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
185        failJob(context, action);
186    }
187
188    /**
189     * Fail the job due to failed action
190     *
191     * @param context the execution context.
192     * @param action the action that caused the workflow to fail
193     * @throws CommandException thrown if unable to fail job
194     */
195    public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
196        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
197        if (!handleUserRetry(context, action)) {
198            incrActionErrorCounter(action.getType(), "failed", 1);
199            LOG.warn("Failing Job due to failed action [{0}]", action.getName());
200            try {
201                workflow.getWorkflowInstance().fail(action.getName());
202                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
203                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
204                workflow.setWorkflowInstance(wfInstance);
205                workflow.setStatus(WorkflowJob.Status.FAILED);
206                action.setStatus(WorkflowAction.Status.FAILED);
207                action.resetPending();
208                queue(new WorkflowNotificationXCommand(workflow, action));
209                queue(new KillXCommand(workflow.getId()));
210                InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation());
211            }
212            catch (WorkflowException ex) {
213                throw new CommandException(ex);
214            }
215        }
216    }
217
218    /**
219     * Execute retry for action if this action is eligible for user-retry
220     *
221     * @param action the Workflow action bean
222     * @return true if user-retry has to be handled for this action
223     * @throws CommandException thrown if unable to fail job
224     */
225    public boolean handleUserRetry(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
226        WorkflowJobBean wfJob = (WorkflowJobBean) context.getWorkflow();
227        String errorCode = action.getErrorCode();
228        Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
229
230        if ((allowedRetryCode.contains(LiteWorkflowStoreService.USER_ERROR_CODE_ALL) || allowedRetryCode.contains(errorCode))
231                && action.getUserRetryCount() < action.getUserRetryMax()) {
232            LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
233                    + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
234                    .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
235            ActionExecutor.RETRYPOLICY retryPolicy = getUserRetryPolicy(action, wfJob);
236            long interval = getRetryDelay(action.getUserRetryCount(), action.getUserRetryInterval() * 60, retryPolicy);
237            action.setStatus(WorkflowAction.Status.USER_RETRY);
238            context.setVar(JobUtils.getRetryKey(action, JsonTags.WORKFLOW_ACTION_END_TIME), String.valueOf(new Date().getTime()));
239            action.incrmentUserRetryCount();
240            action.setPending();
241            queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
242            return true;
243        }
244        return false;
245    }
246
247        /*
248         * In case of action error increment the error count for instrumentation
249         */
250    private void incrActionErrorCounter(String type, String error, int count) {
251        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
252    }
253
254        /**
255         * Increment the action counter in the instrumentation log. indicating how
256         * many times the action was executed since the start Oozie server
257         */
258    protected void incrActionCounter(String type, int count) {
259        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
260    }
261
262        /**
263         * Adding a cron for the instrumentation time for the given Instrumentation
264         * group
265         */
266    protected void addActionCron(String type, Instrumentation.Cron cron) {
267        getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
268    }
269
270    /*
271     * Returns the next retry time in milliseconds, based on retry policy algorithm.
272     */
273    private long getRetryDelay(int retryCount, long retryInterval, ActionExecutor.RETRYPOLICY retryPolicy) {
274        switch (retryPolicy) {
275            case EXPONENTIAL:
276                long retryTime = ((long) Math.pow(2, retryCount) * retryInterval * 1000L);
277                return retryTime;
278            case PERIODIC:
279                return retryInterval * 1000L;
280            default:
281                throw new UnsupportedOperationException("Retry policy not supported");
282        }
283    }
284
285    /**
286     * Workflow action executor context
287     *
288     */
289    public static class ActionExecutorContext implements ActionExecutor.Context {
290        protected final WorkflowJobBean workflow;
291        private Configuration protoConf;
292        protected final WorkflowActionBean action;
293        private final boolean isRetry;
294        private final boolean isUserRetry;
295        private boolean started;
296        private boolean ended;
297        private boolean executed;
298        private boolean shouldEndWF;
299        private Job.Status jobStatus;
300
301        /**
302         * Constructing the ActionExecutorContext, setting the private members
303         * and constructing the proto configuration
304         */
305        public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry,
306                boolean isUserRetry) {
307            this.workflow = workflow;
308            this.action = action;
309            this.isRetry = isRetry;
310            this.isUserRetry = isUserRetry;
311            if (null != workflow.getProtoActionConf()) {
312                try {
313                    protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
314                }
315                catch (IOException ex) {
316                    throw new RuntimeException("It should not happen", ex);
317                }
318            }
319        }
320
321        public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action) {
322            this(workflow, action, false, false);
323        }
324
325        /*
326         * (non-Javadoc)
327         * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
328         */
329        public String getCallbackUrl(String externalStatusVar) {
330            return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
331        }
332
333        /*
334         * (non-Javadoc)
335         * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
336         */
337        public Configuration getProtoActionConf() {
338            return protoConf;
339        }
340
341        /*
342         * (non-Javadoc)
343         * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
344         */
345        public WorkflowJob getWorkflow() {
346            return workflow;
347        }
348
349        /**
350         * Returns the workflow action of the given action context
351         *
352         * @return the workflow action of the given action context
353         */
354        public WorkflowAction getAction() {
355            return action;
356        }
357
358        /*
359         * (non-Javadoc)
360         * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
361         */
362        public ELEvaluator getELEvaluator() {
363            ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
364            DagELFunctions.configureEvaluator(evaluator, workflow, action);
365            return evaluator;
366        }
367
368        /*
369         * (non-Javadoc)
370         * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
371         */
372        public void setVar(String name, String value) {
373            setVarToWorkflow(name, value);
374        }
375
376        /**
377         * This is not thread safe, don't use if workflowjob is shared among multiple actions command
378         * @param name
379         * @param value
380         */
381        public void setVarToWorkflow(String name, String value) {
382            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
383            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
384            wfInstance.setVar(name, value);
385            workflow.setWorkflowInstance(wfInstance);
386        }
387
388        /*
389         * (non-Javadoc)
390         * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
391         */
392        public String getVar(String name) {
393            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
394            return workflow.getWorkflowInstance().getVar(name);
395        }
396
397        /*
398         * (non-Javadoc)
399         * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
400         */
401        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
402            setVar(JobUtils.getRetryKey(action, JsonTags.WORKFLOW_ACTION_CONSOLE_URL), consoleUrl);
403            action.setStartData(externalId, trackerUri, consoleUrl);
404            started = true;
405        }
406
407        /**
408         * Setting the start time of the action
409         */
410        public void setStartTime() {
411            Date now = new Date();
412            action.setStartTime(now);
413        }
414
415        /*
416         * (non-Javadoc)
417         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
418         */
419        public void setExecutionData(String externalStatus, Properties actionData) {
420            action.setExecutionData(externalStatus, actionData);
421            executed = true;
422        }
423
424        /*
425         * (non-Javadoc)
426         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
427         */
428        public void setExecutionStats(String jsonStats) {
429            action.setExecutionStats(jsonStats);
430            executed = true;
431        }
432
433        /*
434         * (non-Javadoc)
435         * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
436         */
437        public void setExternalChildIDs(String externalChildIDs) {
438            setVar(JobUtils.getRetryKey(action, JsonTags.WORKFLOW_ACTION_EXTERNAL_CHILD_IDS), externalChildIDs);
439            action.setExternalChildIDs(externalChildIDs);
440            executed = true;
441        }
442
443        /*
444         * (non-Javadoc)
445         * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
446         */
447        public void setEndData(WorkflowAction.Status status, String signalValue) {
448            action.setEndData(status, signalValue);
449            ended = true;
450        }
451
452        /*
453         * (non-Javadoc)
454         * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
455         */
456        public boolean isRetry() {
457            return isRetry;
458        }
459
460        /**
461         * Return if the executor invocation is a user retry or not.
462         *
463         * @return if the executor invocation is a user retry or not.
464         */
465        public boolean isUserRetry() {
466            return isUserRetry;
467        }
468
469        /**
470         * Returns whether setStartData has been called or not.
471         *
472         * @return true if start completion info has been set.
473         */
474        public boolean isStarted() {
475            return started;
476        }
477
478        /**
479         * Returns whether setExecutionData has been called or not.
480         *
481         * @return true if execution completion info has been set, otherwise false.
482         */
483        public boolean isExecuted() {
484            return executed;
485        }
486
487        /**
488         * Returns whether setEndData has been called or not.
489         *
490         * @return true if end completion info has been set.
491         */
492        public boolean isEnded() {
493            return ended;
494        }
495
496        public void setExternalStatus(String externalStatus) {
497            action.setExternalStatus(externalStatus);
498        }
499
500        @Override
501        public String getRecoveryId() {
502            return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
503        }
504
505        /* (non-Javadoc)
506         * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
507         */
508        public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
509            String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
510            FileSystem fs = getAppFileSystem();
511            String actionDirPath = Services.get().getSystemId() + "/" + name;
512            Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
513            return fqActionDir;
514        }
515
516        /* (non-Javadoc)
517         * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
518         */
519        public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
520            WorkflowJob workflow = getWorkflow();
521            URI uri = new URI(getWorkflow().getAppPath());
522            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
523            Configuration fsConf = has.createConfiguration(uri.getAuthority());
524            return has.createFileSystem(workflow.getUser(), uri, fsConf);
525
526        }
527
528        /* (non-Javadoc)
529         * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
530         */
531        @Override
532        public void setErrorInfo(String str, String exMsg) {
533            action.setErrorInfo(str, exMsg);
534        }
535
536        public boolean isShouldEndWF() {
537            return shouldEndWF;
538        }
539
540        public void setShouldEndWF(boolean shouldEndWF) {
541            this.shouldEndWF = shouldEndWF;
542        }
543
544        public Job.Status getJobStatus() {
545            return jobStatus;
546        }
547
548        public void setJobStatus(Job.Status jobStatus) {
549            this.jobStatus = jobStatus;
550        }
551    }
552
553    public static class ForkedActionExecutorContext extends ActionExecutorContext {
554        private Map<String, String> contextVariableMap = new HashMap<String, String>();
555
556        public ForkedActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry,
557                boolean isUserRetry) {
558            super(workflow, action, isRetry, isUserRetry);
559        }
560
561        public void setVar(String name, String value) {
562            if (value == null) {
563                contextVariableMap.remove(name);
564            }
565            else {
566                contextVariableMap.put(name, value);
567            }
568        }
569
570        public String getVar(String name) {
571            return contextVariableMap.get(name);
572        }
573
574        public Map<String, String> getContextMap() {
575            return contextVariableMap;
576        }
577    }
578
579    /*
580     * Returns user retry policy
581     */
582    private ActionExecutor.RETRYPOLICY getUserRetryPolicy(WorkflowActionBean wfAction, WorkflowJobBean wfJob) {
583        WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
584        LiteWorkflowApp wfApp = (LiteWorkflowApp) wfInstance.getApp();
585        NodeDef nodeDef = wfApp.getNode(wfAction.getName());
586        if (nodeDef == null) {
587            return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
588        }
589        String userRetryPolicy = nodeDef.getUserRetryPolicy().toUpperCase();
590        String userRetryPolicyInSysConfig = ConfigurationService.get(LiteWorkflowStoreService.CONF_USER_RETRY_POLICY)
591                .toUpperCase();
592        if (isValidRetryPolicy(userRetryPolicy)) {
593            return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicy);
594        }
595        else if (isValidRetryPolicy(userRetryPolicyInSysConfig)) {
596            return ActionExecutor.RETRYPOLICY.valueOf(userRetryPolicyInSysConfig);
597        }
598        else {
599            return ActionExecutor.RETRYPOLICY.valueOf(LiteWorkflowStoreService.DEFAULT_USER_RETRY_POLICY);
600        }
601    }
602
603    /*
604     * Returns true if policy is valid, otherwise false
605     */
606    private static boolean isValidRetryPolicy(String policy) {
607        try {
608            ActionExecutor.RETRYPOLICY.valueOf(policy.toUpperCase().trim());
609        }
610        catch (IllegalArgumentException e) {
611            // Invalid Policy
612            return false;
613        }
614        return true;
615    }
616
617}