001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action;
020
021import org.apache.commons.lang.StringUtils;
022import org.apache.hadoop.fs.FileSystem;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.oozie.client.WorkflowAction;
026import org.apache.oozie.client.WorkflowJob;
027import org.apache.oozie.service.ConfigurationService;
028import org.apache.oozie.util.ELEvaluator;
029import org.apache.oozie.util.ParamChecker;
030import org.apache.oozie.util.XLog;
031import org.apache.oozie.service.HadoopAccessorException;
032import org.apache.oozie.service.Services;
033
034import java.io.ByteArrayOutputStream;
035import java.io.IOException;
036import java.io.PrintStream;
037import java.net.URISyntaxException;
038import java.util.HashMap;
039import java.util.Map;
040import java.util.Properties;
041import java.util.LinkedHashMap;
042
043/**
044 * Base action executor class. <p/> All the action executors should extend this class.
045 */
046public abstract class ActionExecutor {
047
048    /**
049     * Configuration prefix for action executor (sub-classes) properties.
050     */
051        public static final String CONF_PREFIX = "oozie.action.";
052
053    public static final String MAX_RETRIES = CONF_PREFIX + "retries.max";
054
055    public static final String ACTION_RETRY_INTERVAL = CONF_PREFIX + "retry.interval";
056
057    public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy";
058
059    /**
060     * Error code used by {@link #convertException} when there is not register error information for an exception.
061     */
062    public static final String ERROR_OTHER = "OTHER";
063    
064    public boolean requiresNNJT = false;
065
066    public static enum RETRYPOLICY {
067        EXPONENTIAL, PERIODIC
068    }
069
070    private static class ErrorInfo {
071        ActionExecutorException.ErrorType errorType;
072        String errorCode;
073        Class<?> errorClass;
074
075        private ErrorInfo(ActionExecutorException.ErrorType errorType, String errorCode, Class<?> errorClass) {
076            this.errorType = errorType;
077            this.errorCode = errorCode;
078            this.errorClass = errorClass;
079        }
080    }
081
082    private static boolean initMode = false;
083    private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new HashMap<String, Map<String, ErrorInfo>>();
084
085    /**
086     * Context information passed to the ActionExecutor methods.
087     */
088    public interface Context {
089
090        /**
091         * Create the callback URL for the action.
092         *
093         * @param externalStatusVar variable for the caller to inject the external status.
094         * @return the callback URL.
095         */
096        public String getCallbackUrl(String externalStatusVar);
097
098        /**
099         * Return a proto configuration for actions with auth properties already set.
100         *
101         * @return a proto configuration for actions with auth properties already set.
102         */
103        public Configuration getProtoActionConf();
104
105        /**
106         * Return the workflow job.
107         *
108         * @return the workflow job.
109         */
110        public WorkflowJob getWorkflow();
111
112        /**
113         * Return an ELEvaluator with the context injected.
114         *
115         * @return configured ELEvaluator.
116         */
117        public ELEvaluator getELEvaluator();
118
119        /**
120         * Set a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
121         * plus a '.'.
122         *
123         * @param name variable name.
124         * @param value variable value, <code>null</code> removes the variable.
125         */
126        public void setVar(String name, String value);
127
128        /**
129         * Get a workflow action variable. <p/> Convenience method that prefixes the variable name with the action name
130         * plus a '.'.
131         *
132         * @param name variable name.
133         * @return the variable value, <code>null</code> if not set.
134         */
135        public String getVar(String name);
136
137        /**
138         * Set the action tracking information for an successfully started action.
139         *
140         * @param externalId the action external ID.
141         * @param trackerUri the action tracker URI.
142         * @param consoleUrl the action console URL.
143         */
144        void setStartData(String externalId, String trackerUri, String consoleUrl);
145
146        /**
147         * Set the action execution completion information for an action. The action status is set to {@link
148         * org.apache.oozie.client.WorkflowAction.Status#DONE}
149         *
150         * @param externalStatus the action external end status.
151         * @param actionData the action data on completion, <code>null</code> if none.
152         */
153        void setExecutionData(String externalStatus, Properties actionData);
154
155        /**
156         * Set execution statistics information for a particular action. The action status is set to {@link
157         * org.apache.oozie.client.WorkflowAction.Status#DONE}
158         *
159         * @param jsonStats the JSON string representation of the stats.
160         */
161        void setExecutionStats(String jsonStats);
162
163        /**
164         * Set external child IDs for a particular action (Eg: pig). The action status is set to {@link
165         * org.apache.oozie.client.WorkflowAction.Status#DONE}
166         *
167         * @param externalChildIDs the external child IDs as a comma-delimited string.
168         */
169        void setExternalChildIDs(String externalChildIDs);
170
171        /**
172         * Set the action end completion information for a completed action.
173         *
174         * @param status the action end status, it can be {@link org.apache.oozie.client.WorkflowAction.Status#OK} or
175         * {@link org.apache.oozie.client.WorkflowAction.Status#ERROR}.
176         * @param signalValue the action external end status.
177         */
178        void setEndData(WorkflowAction.Status status, String signalValue);
179
180        /**
181         * Return if the executor invocation is a retry or not.
182         *
183         * @return if the executor invocation is a retry or not.
184         */
185        boolean isRetry();
186
187        /**
188         * Sets the external status for the action in context.
189         *
190         * @param externalStatus the external status.
191         */
192        void setExternalStatus(String externalStatus);
193
194        /**
195         * Get the Action Recovery ID.
196         *
197         * @return recovery ID.
198         */
199        String getRecoveryId();
200
201        /*
202         * @return the path that will be used to store action specific data
203         * @throws IOException @throws URISyntaxException @throws HadoopAccessorException
204         */
205        public Path getActionDir() throws HadoopAccessorException, IOException, URISyntaxException;
206
207        /**
208         * @return filesystem handle for the application deployment fs.
209         * @throws IOException
210         * @throws URISyntaxException
211         * @throws HadoopAccessorException
212         */
213        public FileSystem getAppFileSystem() throws HadoopAccessorException, IOException, URISyntaxException;
214
215        public void setErrorInfo(String str, String exMsg);
216    }
217
218
219    /**
220     * Define the default inteval in seconds between retries.
221     */
222    public static final long RETRY_INTERVAL = 60;
223
224    private String type;
225    private int maxRetries;
226    private long retryInterval;
227    private RETRYPOLICY retryPolicy;
228
229    /**
230     * Create an action executor with default retry parameters.
231     *
232     * @param type action executor type.
233     */
234    protected ActionExecutor(String type) {
235        this(type, RETRY_INTERVAL);
236    }
237
238    /**
239     * Create an action executor.
240     *
241     * @param type action executor type.
242     * @param defaultRetryInterval retry interval, in seconds.
243     */
244    protected ActionExecutor(String type, long defaultRetryInterval) {
245        this.type = ParamChecker.notEmpty(type, "type");
246        this.maxRetries = ConfigurationService.getInt(MAX_RETRIES);
247        int retryInterval = ConfigurationService.getInt(ACTION_RETRY_INTERVAL);
248        this.retryInterval = retryInterval > 0 ? retryInterval : defaultRetryInterval;
249        this.retryPolicy = getRetryPolicyFromConf();
250    }
251
252    private RETRYPOLICY getRetryPolicyFromConf() {
253        String retryPolicy = ConfigurationService.get(ACTION_RETRY_POLICY);
254        if (StringUtils.isBlank(retryPolicy)) {
255            return RETRYPOLICY.PERIODIC;
256        } else {
257            try {
258                return RETRYPOLICY.valueOf(retryPolicy.toUpperCase().trim());
259            } catch (IllegalArgumentException e) {
260                return RETRYPOLICY.PERIODIC;
261            }
262        }
263    }
264
265    /**
266     * Clear all init settings for all action types.
267     */
268    public static void resetInitInfo() {
269        if (!initMode) {
270            throw new IllegalStateException("Error, action type info locked");
271        }
272        ERROR_INFOS.clear();
273    }
274
275    /**
276     * Enable action type initialization.
277     */
278    public static void enableInit() {
279        initMode = true;
280    }
281
282    /**
283     * Disable action type initialization.
284     */
285    public static void disableInit() {
286        initMode = false;
287    }
288
289    /**
290     * Invoked once at system initialization time. <p/> It can be used to register error information for the expected
291     * exceptions. Exceptions should be register from subclasses to superclasses to ensure proper detection, same thing
292     * that it is done in a normal catch. <p/> This method should invoke the {@link #registerError} method to register
293     * all its possible errors. <p/> Subclasses overriding must invoke super.
294     */
295    public void initActionType() {
296        XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
297        ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
298    }
299
300    /**
301     * Return the system ID, this ID is defined in Oozie configuration.
302     *
303     * @return the system ID.
304     */
305    public String getOozieSystemId() {
306        return Services.get().getSystemId();
307    }
308
309    /**
310     * Return the runtime directory of the Oozie instance. <p/> The directory is created under TMP and it is always a
311     * new directory per system initialization.
312     *
313     * @return the runtime directory of the Oozie instance.
314     */
315    public String getOozieRuntimeDir() {
316        return Services.get().getRuntimeDir();
317    }
318
319    /**
320     * Return Oozie configuration. <p/> This is useful for actions that need access to configuration properties.
321     *
322     * @return Oozie configuration.
323     */
324    public Configuration getOozieConf() {
325        return Services.get().getConf();
326    }
327
328    /**
329     * Register error handling information for an exception.
330     *
331     * @param exClass excpetion class name (to work in case of a particular exception not being in the classpath, needed
332     * to be able to handle multiple version of Hadoop  or other JARs used by executors with the same codebase).
333     * @param errorType error type for the exception.
334     * @param errorCode error code for the exception.
335     */
336    protected void registerError(String exClass, ActionExecutorException.ErrorType errorType, String errorCode) {
337        if (!initMode) {
338            throw new IllegalStateException("Error, action type info locked");
339        }
340        try {
341            Class errorClass = Thread.currentThread().getContextClassLoader().loadClass(exClass);
342            Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
343            executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode, errorClass));
344        }
345        catch (ClassNotFoundException cnfe) {
346            XLog.getLog(getClass()).warn(
347                    "Exception [{0}] not in classpath, ActionExecutor [{1}] will handle it as ERROR", exClass,
348                    getType());
349        }
350        catch (java.lang.NoClassDefFoundError err) {
351            ByteArrayOutputStream baos = new ByteArrayOutputStream();
352            err.printStackTrace(new PrintStream(baos));
353            XLog.getLog(getClass()).warn(baos.toString());
354        }
355    }
356
357    /**
358     * Return the action executor type.
359     *
360     * @return the action executor type.
361     */
362    public String getType() {
363        return type;
364    }
365
366    /**
367     * Return the maximum number of retries for the action executor.
368     *
369     * @return the maximum number of retries for the action executor.
370     */
371    public int getMaxRetries() {
372        return maxRetries;
373    }
374
375    /**
376     * Set the maximum number of retries for the action executor.
377     *
378     * @param maxRetries the maximum number of retries.
379     */
380    public void setMaxRetries(int maxRetries) {
381        this.maxRetries = maxRetries;
382    }
383
384    /**
385     * Return the retry policy for the action executor.
386     *
387     * @return the retry policy for the action executor.
388     */
389    public RETRYPOLICY getRetryPolicy() {
390        return retryPolicy;
391    }
392
393    /**
394     * Sets the retry policy for the action executor.
395     *
396     * @param retryPolicy retry policy for the action executor.
397     */
398    public void setRetryPolicy(RETRYPOLICY retryPolicy) {
399        this.retryPolicy = retryPolicy;
400    }
401
402    /**
403     * Return the retry interval for the action executor in seconds.
404     *
405     * @return the retry interval for the action executor in seconds.
406     */
407    public long getRetryInterval() {
408        return retryInterval;
409    }
410
411    /**
412     * Sets the retry interval for the action executor.
413     *
414     * @param retryInterval retry interval in seconds.
415     */
416    public void setRetryInterval(long retryInterval) {
417        this.retryInterval = retryInterval;
418    }
419
420    /**
421     * Utility method to handle exceptions in the {@link #start}, {@link #end}, {@link #kill} and {@link #check} methods
422     * <p/> It uses the error registry to convert exceptions to {@link ActionExecutorException}s.
423     *
424     * @param ex exception to convert.
425     * @return ActionExecutorException converted exception.
426     */
427    @SuppressWarnings({"ThrowableInstanceNeverThrown"})
428    protected ActionExecutorException convertException(Exception ex) {
429        if (ex instanceof ActionExecutorException) {
430            return (ActionExecutorException) ex;
431        }
432
433        ActionExecutorException aee = null;
434        // Check the cause of the exception first
435        if (ex.getCause() != null) {
436            aee = convertExceptionHelper(ex.getCause());
437        }
438        // If the cause isn't registered or doesn't exist, check the exception itself
439        if (aee == null) {
440            aee = convertExceptionHelper(ex);
441            // If the cause isn't registered either, then just create a new ActionExecutorException
442            if (aee == null) {
443                String exClass = ex.getClass().getName();
444                String errorCode = exClass.substring(exClass.lastIndexOf(".") + 1);
445                aee = new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode, "{0}", ex.getMessage(), ex);
446            }
447        }
448        return aee;
449    }
450
451    private ActionExecutorException convertExceptionHelper(Throwable ex) {
452        Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
453        // Check if we have registered ex
454        ErrorInfo classErrorInfo = executorErrorInfo.get(ex.getClass().getName());
455        if (classErrorInfo != null) {
456            return new ActionExecutorException(classErrorInfo.errorType, classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
457        }
458        // Else, check if a parent class of ex is registered
459        else {
460            for (ErrorInfo errorInfo : executorErrorInfo.values()) {
461                if (errorInfo.errorClass.isInstance(ex)) {
462                    return new ActionExecutorException(errorInfo.errorType, errorInfo.errorCode, "{0}", ex.getMessage(), ex);
463                }
464            }
465        }
466        return null;
467    }
468
469    /**
470     * Convenience method that return the signal for an Action based on the action status.
471     *
472     * @param status action status.
473     * @return the action signal.
474     */
475    protected String getActionSignal(WorkflowAction.Status status) {
476        switch (status) {
477            case OK:
478                return "OK";
479            case ERROR:
480            case KILLED:
481                return "ERROR";
482            default:
483                throw new IllegalArgumentException("Action status for signal can only be OK or ERROR");
484        }
485    }
486
487    /**
488     * Return the path that will be used to store action specific data
489     *
490     * @param jobId Worfklow ID
491     * @param action Action
492     * @param key An Identifier
493     * @param temp temp directory flag
494     * @return A string that has the path
495     */
496    protected String getActionDirPath(String jobId, WorkflowAction action, String key, boolean temp) {
497        String name = jobId + "/" + action.getName() + "--" + key;
498        if (temp) {
499            name += ".temp";
500        }
501        return getOozieSystemId() + "/" + name;
502    }
503
504    /**
505     * Return the path that will be used to store action specific data.
506     *
507     * @param jobId Workflow ID
508     * @param action Action
509     * @param key An identifier
510     * @param temp Temp directory flag
511     * @return Path to the directory
512     */
513    public Path getActionDir(String jobId, WorkflowAction action, String key, boolean temp) {
514        return new Path(getActionDirPath(jobId, action, key, temp));
515    }
516
517    /**
518     * Start an action. <p/> The {@link Context#setStartData} method must be called within this method. <p/> If the
519     * action has completed, the {@link Context#setExecutionData} method must be called within this method.
520     *
521     * @param context executor context.
522     * @param action the action to start.
523     * @throws ActionExecutorException thrown if the action could not start.
524     */
525    public abstract void start(Context context, WorkflowAction action) throws ActionExecutorException;
526
527    /**
528     * End an action after it has executed. <p/> The {@link Context#setEndData} method must be called within this
529     * method.
530     *
531     * @param context executor context.
532     * @param action the action to end.
533     * @throws ActionExecutorException thrown if the action could not end.
534     */
535    public abstract void end(Context context, WorkflowAction action) throws ActionExecutorException;
536
537    /**
538     * Check if an action has completed. This method must be implemented by Async Action Executors. <p/> If the action
539     * has completed, the {@link Context#setExecutionData} method must be called within this method. <p/> If the action
540     * has not completed, the {@link Context#setExternalStatus} method must be called within this method.
541     *
542     * @param context executor context.
543     * @param action the action to end.
544     * @throws ActionExecutorException thrown if the action could not be checked.
545     */
546    public abstract void check(Context context, WorkflowAction action) throws ActionExecutorException;
547
548    /**
549     * Kill an action. <p/> The {@link Context#setEndData} method must be called within this method.
550     *
551     * @param context executor context.
552     * @param action the action to kill.
553     * @throws ActionExecutorException thrown if the action could not be killed.
554     */
555    public abstract void kill(Context context, WorkflowAction action) throws ActionExecutorException;
556
557    /**
558     * Return if the external status indicates that the action has completed.
559     *
560     * @param externalStatus external status to check.
561     * @return if the external status indicates that the action has completed.
562     */
563    public abstract boolean isCompleted(String externalStatus);
564
565}