001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.net.URI;
024import java.net.URISyntaxException;
025import java.util.Date;
026import java.util.Properties;
027import java.util.Set;
028
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.oozie.DagELFunctions;
033import org.apache.oozie.ErrorCode;
034import org.apache.oozie.WorkflowActionBean;
035import org.apache.oozie.WorkflowJobBean;
036import org.apache.oozie.action.ActionExecutor;
037import org.apache.oozie.client.WorkflowAction;
038import org.apache.oozie.client.WorkflowJob;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.service.CallbackService;
041import org.apache.oozie.service.ELService;
042import org.apache.oozie.service.HadoopAccessorException;
043import org.apache.oozie.service.HadoopAccessorService;
044import org.apache.oozie.service.JPAService;
045import org.apache.oozie.service.LiteWorkflowStoreService;
046import org.apache.oozie.service.Services;
047import org.apache.oozie.util.ELEvaluator;
048import org.apache.oozie.util.InstrumentUtils;
049import org.apache.oozie.util.Instrumentation;
050import org.apache.oozie.util.XConfiguration;
051import org.apache.oozie.workflow.WorkflowException;
052import org.apache.oozie.workflow.WorkflowInstance;
053import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
054
055/**
056 * Base class for Action execution commands. Provides common functionality to handle different types of errors while
057 * attempting to start or end an action.
058 */
059public abstract class ActionXCommand<T> extends WorkflowXCommand<Void> {
060    private static final String INSTRUMENTATION_GROUP = "action.executors";
061
062    protected static final String RECOVERY_ID_SEPARATOR = "@";
063
064    public ActionXCommand(String name, String type, int priority) {
065        super(name, type, priority);
066    }
067
068    /**
069     * Takes care of Transient failures. Sets the action status to retry and increments the retry count if not enough
070     * attempts have been made. Otherwise returns false.
071     *
072     * @param context the execution context.
073     * @param executor the executor instance being used.
074     * @param status the status to be set for the action.
075     * @return true if the action is scheduled for another retry. false if the number of retries has exceeded the
076     *         maximum number of configured retries.
077     * @throws CommandException thrown if unable to handle transient
078     */
079    protected boolean handleTransient(ActionExecutor.Context context, ActionExecutor executor,
080            WorkflowAction.Status status) throws CommandException {
081        LOG.debug("Attempting to retry");
082        ActionExecutorContext aContext = (ActionExecutorContext) context;
083        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
084        incrActionErrorCounter(action.getType(), "transient", 1);
085
086        int actionRetryCount = action.getRetries();
087        if (actionRetryCount >= executor.getMaxRetries()) {
088            LOG.warn("Exceeded max retry count [{0}]. Suspending Job", executor.getMaxRetries());
089            return false;
090        }
091        else {
092            action.setStatus(status);
093            action.setPending();
094            action.incRetries();
095            long retryDelayMillis = getRetryDelay(actionRetryCount, executor.getRetryInterval(), executor.getRetryPolicy());
096            action.setPendingAge(new Date(System.currentTimeMillis() + retryDelayMillis));
097            LOG.info("Next Retry, Attempt Number [{0}] in [{1}] milliseconds", actionRetryCount + 1, retryDelayMillis);
098            this.resetUsed();
099            queue(this, retryDelayMillis);
100            return true;
101        }
102    }
103
104    /**
105     * Takes care of non transient failures. The job is suspended, and the state of the action is changed to *MANUAL and
106     * set pending flag of action to false
107     *
108     * @param context the execution context.
109     * @param executor the executor instance being used.
110     * @param status the status to be set for the action.
111     * @throws CommandException thrown if unable to suspend job
112     */
113    protected void handleNonTransient(ActionExecutor.Context context, ActionExecutor executor,
114            WorkflowAction.Status status) throws CommandException {
115        ActionExecutorContext aContext = (ActionExecutorContext) context;
116        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
117        incrActionErrorCounter(action.getType(), "nontransient", 1);
118        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
119        String id = workflow.getId();
120        action.setStatus(status);
121        action.resetPendingOnly();
122        LOG.warn("Suspending Workflow Job id=" + id);
123        try {
124            SuspendXCommand.suspendJob(Services.get().get(JPAService.class), workflow, id, action.getId(), null);
125        }
126        catch (Exception e) {
127            throw new CommandException(ErrorCode.E0727, id, e.getMessage());
128        }
129        finally {
130            updateParentIfNecessary(workflow, 3);
131        }
132    }
133
134    /**
135     * Takes care of errors. </p> For errors while attempting to start the action, the job state is updated and an
136     * {@link ActionEndCommand} is queued. </p> For errors while attempting to end the action, the job state is updated.
137     * </p>
138     *
139     * @param context the execution context.
140     * @param executor the executor instance being used.
141     * @param message
142     * @param isStart whether the error was generated while starting or ending an action.
143     * @param status the status to be set for the action.
144     * @throws CommandException thrown if unable to handle action error
145     */
146    protected void handleError(ActionExecutor.Context context, ActionExecutor executor, String message,
147            boolean isStart, WorkflowAction.Status status) throws CommandException {
148        LOG.warn("Setting Action Status to [{0}]", status);
149        ActionExecutorContext aContext = (ActionExecutorContext) context;
150        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
151
152        if (!handleUserRetry(action)) {
153            incrActionErrorCounter(action.getType(), "error", 1);
154            action.setPending();
155            if (isStart) {
156                action.setExecutionData(message, null);
157                queue(new ActionEndXCommand(action.getId(), action.getType()));
158            }
159            else {
160                action.setEndData(status, WorkflowAction.Status.ERROR.toString());
161            }
162        }
163    }
164
165    /**
166     * Fail the job due to failed action
167     *
168     * @param context the execution context.
169     * @throws CommandException thrown if unable to fail job
170     */
171    public void failJob(ActionExecutor.Context context) throws CommandException {
172        ActionExecutorContext aContext = (ActionExecutorContext) context;
173        WorkflowActionBean action = (WorkflowActionBean) aContext.getAction();
174        failJob(context, action);
175    }
176
177    /**
178     * Fail the job due to failed action
179     *
180     * @param context the execution context.
181     * @param action the action that caused the workflow to fail
182     * @throws CommandException thrown if unable to fail job
183     */
184    public void failJob(ActionExecutor.Context context, WorkflowActionBean action) throws CommandException {
185        WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
186        if (!handleUserRetry(action)) {
187            incrActionErrorCounter(action.getType(), "failed", 1);
188            LOG.warn("Failing Job due to failed action [{0}]", action.getName());
189            try {
190                workflow.getWorkflowInstance().fail(action.getName());
191                WorkflowInstance wfInstance = workflow.getWorkflowInstance();
192                ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.FAILED);
193                workflow.setWorkflowInstance(wfInstance);
194                workflow.setStatus(WorkflowJob.Status.FAILED);
195                action.setStatus(WorkflowAction.Status.FAILED);
196                action.resetPending();
197                queue(new WorkflowNotificationXCommand(workflow, action));
198                queue(new KillXCommand(workflow.getId()));
199                InstrumentUtils.incrJobCounter(INSTR_FAILED_JOBS_COUNTER_NAME, 1, getInstrumentation());
200            }
201            catch (WorkflowException ex) {
202                throw new CommandException(ex);
203            }
204        }
205    }
206
207    /**
208     * Execute retry for action if this action is eligible for user-retry
209     *
210     * @param context the execution context.
211     * @return true if user-retry has to be handled for this action
212     * @throws CommandException thrown if unable to fail job
213     */
214    public boolean handleUserRetry(WorkflowActionBean action) throws CommandException {
215        String errorCode = action.getErrorCode();
216        Set<String> allowedRetryCode = LiteWorkflowStoreService.getUserRetryErrorCode();
217
218        if ((allowedRetryCode.contains(LiteWorkflowStoreService.USER_ERROR_CODE_ALL) || allowedRetryCode.contains(errorCode))
219                && action.getUserRetryCount() < action.getUserRetryMax()) {
220            LOG.info("Preparing retry this action [{0}], errorCode [{1}], userRetryCount [{2}], "
221                    + "userRetryMax [{3}], userRetryInterval [{4}]", action.getId(), errorCode, action
222                    .getUserRetryCount(), action.getUserRetryMax(), action.getUserRetryInterval());
223            int interval = action.getUserRetryInterval() * 60 * 1000;
224            action.setStatus(WorkflowAction.Status.USER_RETRY);
225            action.incrmentUserRetryCount();
226            action.setPending();
227            queue(new ActionStartXCommand(action.getId(), action.getType()), interval);
228            return true;
229        }
230        return false;
231    }
232
233        /*
234         * In case of action error increment the error count for instrumentation
235         */
236    private void incrActionErrorCounter(String type, String error, int count) {
237        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#ex." + error, count);
238    }
239
240        /**
241         * Increment the action counter in the instrumentation log. indicating how
242         * many times the action was executed since the start Oozie server
243         */
244    protected void incrActionCounter(String type, int count) {
245        getInstrumentation().incr(INSTRUMENTATION_GROUP, type + "#" + getName(), count);
246    }
247
248        /**
249         * Adding a cron for the instrumentation time for the given Instrumentation
250         * group
251         */
252    protected void addActionCron(String type, Instrumentation.Cron cron) {
253        getInstrumentation().addCron(INSTRUMENTATION_GROUP, type + "#" + getName(), cron);
254    }
255
256    /*
257     * Returns the next retry time in milliseconds, based on retry policy algorithm.
258     */
259    private long getRetryDelay(int retryCount, long retryInterval, ActionExecutor.RETRYPOLICY retryPolicy) {
260        switch (retryPolicy) {
261            case EXPONENTIAL:
262                long retryTime = ((long) Math.pow(2, retryCount) * retryInterval * 1000L);
263                return retryTime;
264            case PERIODIC:
265                return retryInterval * 1000L;
266            default:
267                throw new UnsupportedOperationException("Retry policy not supported");
268        }
269    }
270
271    /**
272     * Workflow action executor context
273     *
274     */
275    public static class ActionExecutorContext implements ActionExecutor.Context {
276        private final WorkflowJobBean workflow;
277        private Configuration protoConf;
278        private final WorkflowActionBean action;
279        private final boolean isRetry;
280        private final boolean isUserRetry;
281        private boolean started;
282        private boolean ended;
283        private boolean executed;
284
285                /**
286                 * Constructing the ActionExecutorContext, setting the private members
287                 * and constructing the proto configuration
288                 */
289        public ActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, boolean isUserRetry) {
290            this.workflow = workflow;
291            this.action = action;
292            this.isRetry = isRetry;
293            this.isUserRetry = isUserRetry;
294            try {
295                protoConf = new XConfiguration(new StringReader(workflow.getProtoActionConf()));
296            }
297            catch (IOException ex) {
298                throw new RuntimeException("It should not happen", ex);
299            }
300        }
301
302        /*
303         * (non-Javadoc)
304         * @see org.apache.oozie.action.ActionExecutor.Context#getCallbackUrl(java.lang.String)
305         */
306        public String getCallbackUrl(String externalStatusVar) {
307            return Services.get().get(CallbackService.class).createCallBackUrl(action.getId(), externalStatusVar);
308        }
309
310        /*
311         * (non-Javadoc)
312         * @see org.apache.oozie.action.ActionExecutor.Context#getProtoActionConf()
313         */
314        public Configuration getProtoActionConf() {
315            return protoConf;
316        }
317
318        /*
319         * (non-Javadoc)
320         * @see org.apache.oozie.action.ActionExecutor.Context#getWorkflow()
321         */
322        public WorkflowJob getWorkflow() {
323            return workflow;
324        }
325
326        /**
327         * Returns the workflow action of the given action context
328         *
329         * @return the workflow action of the given action context
330         */
331        public WorkflowAction getAction() {
332            return action;
333        }
334
335        /*
336         * (non-Javadoc)
337         * @see org.apache.oozie.action.ActionExecutor.Context#getELEvaluator()
338         */
339        public ELEvaluator getELEvaluator() {
340            ELEvaluator evaluator = Services.get().get(ELService.class).createEvaluator("workflow");
341            DagELFunctions.configureEvaluator(evaluator, workflow, action);
342            return evaluator;
343        }
344
345        /*
346         * (non-Javadoc)
347         * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String)
348         */
349        public void setVar(String name, String value) {
350            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
351            WorkflowInstance wfInstance = workflow.getWorkflowInstance();
352            wfInstance.setVar(name, value);
353            workflow.setWorkflowInstance(wfInstance);
354        }
355
356        /*
357         * (non-Javadoc)
358         * @see org.apache.oozie.action.ActionExecutor.Context#getVar(java.lang.String)
359         */
360        public String getVar(String name) {
361            name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name;
362            return workflow.getWorkflowInstance().getVar(name);
363        }
364
365        /*
366         * (non-Javadoc)
367         * @see org.apache.oozie.action.ActionExecutor.Context#setStartData(java.lang.String, java.lang.String, java.lang.String)
368         */
369        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
370            action.setStartData(externalId, trackerUri, consoleUrl);
371            started = true;
372        }
373
374        /**
375         * Setting the start time of the action
376         */
377        public void setStartTime() {
378            Date now = new Date();
379            action.setStartTime(now);
380        }
381
382        /*
383         * (non-Javadoc)
384         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionData(java.lang.String, java.util.Properties)
385         */
386        public void setExecutionData(String externalStatus, Properties actionData) {
387            action.setExecutionData(externalStatus, actionData);
388            executed = true;
389        }
390
391        /*
392         * (non-Javadoc)
393         * @see org.apache.oozie.action.ActionExecutor.Context#setExecutionStats(java.lang.String)
394         */
395        public void setExecutionStats(String jsonStats) {
396            action.setExecutionStats(jsonStats);
397            executed = true;
398        }
399
400        /*
401         * (non-Javadoc)
402         * @see org.apache.oozie.action.ActionExecutor.Context#setExternalChildIDs(java.lang.String)
403         */
404        public void setExternalChildIDs(String externalChildIDs) {
405            action.setExternalChildIDs(externalChildIDs);
406            executed = true;
407        }
408
409        /*
410         * (non-Javadoc)
411         * @see org.apache.oozie.action.ActionExecutor.Context#setEndData(org.apache.oozie.client.WorkflowAction.Status, java.lang.String)
412         */
413        public void setEndData(WorkflowAction.Status status, String signalValue) {
414            action.setEndData(status, signalValue);
415            ended = true;
416        }
417
418        /*
419         * (non-Javadoc)
420         * @see org.apache.oozie.action.ActionExecutor.Context#isRetry()
421         */
422        public boolean isRetry() {
423            return isRetry;
424        }
425
426        /**
427         * Return if the executor invocation is a user retry or not.
428         *
429         * @return if the executor invocation is a user retry or not.
430         */
431        public boolean isUserRetry() {
432            return isUserRetry;
433        }
434
435        /**
436         * Returns whether setStartData has been called or not.
437         *
438         * @return true if start completion info has been set.
439         */
440        public boolean isStarted() {
441            return started;
442        }
443
444        /**
445         * Returns whether setExecutionData has been called or not.
446         *
447         * @return true if execution completion info has been set, otherwise false.
448         */
449        public boolean isExecuted() {
450            return executed;
451        }
452
453        /**
454         * Returns whether setEndData has been called or not.
455         *
456         * @return true if end completion info has been set.
457         */
458        public boolean isEnded() {
459            return ended;
460        }
461
462        public void setExternalStatus(String externalStatus) {
463            action.setExternalStatus(externalStatus);
464        }
465
466        @Override
467        public String getRecoveryId() {
468            return action.getId() + RECOVERY_ID_SEPARATOR + workflow.getRun();
469        }
470
471        /* (non-Javadoc)
472         * @see org.apache.oozie.action.ActionExecutor.Context#getActionDir()
473         */
474        public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException {
475            String name = getWorkflow().getId() + "/" + action.getName() + "--" + action.getType();
476            FileSystem fs = getAppFileSystem();
477            String actionDirPath = Services.get().getSystemId() + "/" + name;
478            Path fqActionDir = new Path(fs.getHomeDirectory(), actionDirPath);
479            return fqActionDir;
480        }
481
482        /* (non-Javadoc)
483         * @see org.apache.oozie.action.ActionExecutor.Context#getAppFileSystem()
484         */
485        public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException {
486            WorkflowJob workflow = getWorkflow();
487            URI uri = new URI(getWorkflow().getAppPath());
488            HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
489            Configuration fsConf = has.createJobConf(uri.getAuthority());
490            return has.createFileSystem(workflow.getUser(), uri, fsConf);
491
492        }
493
494        /* (non-Javadoc)
495         * @see org.apache.oozie.action.ActionExecutor.Context#setErrorInfo(java.lang.String, java.lang.String)
496         */
497        @Override
498        public void setErrorInfo(String str, String exMsg) {
499            action.setErrorInfo(str, exMsg);
500        }
501    }
502
503}