[[index][::Go back to Oozie Documentation Index::]] ----- ---+!! Oozie Coordinator Specification The goal of this document is to define a coordinator engine system specialized in submitting workflows based on time and data triggers. *Authors:* Anil Pillai and Alejandro Abdelnur %TOC% ---++ Changelog ---+++!! 26/AUG/2010: * Update coordinator rerun ---+++!! 09/JUN/2010: * Clean up unsupported functions ---+++!! 02/JUN/2010: * Update all EL functions in CoordFunctionSpec with "coord:" prefix ---+++!! 02/OCT/2009: * Added Appendix A, Oozie Coordinator XML-Schema * Change #5.3., Datasets definition supports 'include' element ---+++!! 29/SEP/2009: * Change #4.4.1, added =${coord:endOfDays(int n)}= EL function * Change #4.4.2, added =${coord:endOfMonths(int n)}= EL function ---+++!! 11/SEP/2009: * Change #6.6.4. =${coord:tzOffset()}= EL function now returns offset in minutes. Added more explanation on behavior * Removed 'oozie' URL from action workflow invocation, per arch review feedback coord&wf run on the same instance ---+++!! 07/SEP/2009: * Full rewrite of sections #4 and #7 * Added sections #6.1.7, #6.6.2, #6.6.3 & #6.6.4 * Rewording through the spec definitions * Updated all examples and syntax to latest changes ---+++!! 03/SEP/2009: * Change #2. Definitions. Some rewording in the definitions * Change #6.6.4. Replaced =${coord:next(int n)}= with =${coord:version(int n)}= EL Fuction * Added #6.6.5. Dataset Instance Resolution for Instances Before the Initial Instance ---++ 1. Coordinator Overview Users typically run map-reduce, hadoop-streaming, hdfs and/or Pig jobs on the grid. Multiple of these jobs can be combined to form a workflow job. [[https://issues.apache.org/jira/browse/HADOOP-5303][Oozie, Hadoop Workflow System]] defines a workflow system that runs such jobs. Commonly, workflow jobs are run based on regular time intervals and/or data availability. And, in some cases, they can be triggered by an external event. Expressing the condition(s) that trigger a workflow job can be modeled as a predicate that has to be satisfied. The workflow job is started after the predicate is satisfied. A predicate can reference to data, time and/or external events. In the future, the model can be extended to support additional event types. It is also necessary to connect workflow jobs that run regularly, but at different time intervals. The outputs of multiple subsequent runs of a workflow become the input to the next workflow. For example, the outputs of last 4 runs of a workflow that runs every 15 minutes become the input of another workflow that runs every 60 minutes. Chaining together these workflows result it is referred as a data application pipeline. The Oozie *Coordinator* system allows the user to define and execute recurrent and interdependent workflow jobs (data application pipelines). Real world data application pipelines have to account for reprocessing, late processing, catchup, partial processing, monitoring, notification and SLAs. This document defines the functional specification for the Oozie Coordinator system. ---++ 2. Definitions *Actual time:* The actual time indicates the time when something actually happens. *Nominal time:* The nominal time specifies the time when something should happen. In theory the nominal time and the actual time should mach, however, in practice due to delays the actual time may occur later than the nominal time. *Dataset:* Collection of data referred to by a logical name. A dataset normally has several instances of data and each one of them can be referred individually. Each dataset instance is represented by a unique set of URIs. *Synchronous Dataset:* Synchronous datasets instances are generated at fixed time intervals and there is a dataset instance associated with each time interval. Synchronous dataset instances are identified by their nominal time. For example, in the case of a file system based dataset, the nominal time would be somewhere in the file path of the dataset instance: =hdfs://foo:9000/usr/logs/2009/04/15/23/30= . *Coordinator Action:* A coordinator action is a workflow job that is started when a set of conditions are met (input dataset instances are available). *Coordinator Application:* A coordinator application defines the conditions under which coordinator actions should be created (the frequency) and when the actions can be started. The coordinator application also defines a start and an end time. Normally, coordinator applications are parameterized. A Coordinator application is written in XML. *Coordinator Job:* A coordinator job is an executable instance of a coordination definition. A job submission is done by submitting a job configuration that resolves all parameters in the application definition. *Data pipeline:* A data pipeline is a connected set of coordinator applications that consume and produce interdependent datasets. *Coordinator Definition Language:* The language used to describe datasets and coordinator applications. *Coordinator Engine:* A system that executes coordinator jobs. ---++ 3. Expression Language for Parameterization Coordinator application definitions can be parameterized with variables, built-in constants and built-in functions. At execution time all the parameters are resolved into concrete values. The parameterization of workflow definitions it done using JSP Expression Language syntax from the [[http://jcp.org/aboutJava/communityprocess/final/jsr152/][JSP 2.0 Specification (JSP.2.3)]], allowing not only to support variables as parameters but also functions and complex expressions. EL expressions can be used in XML attribute values and XML text element values. They cannot be used in XML element and XML attribute names. Refer to section #6.5 'Parameterization of Coordinator Applications' for more details. ---++ 4. Datetime, Frequency and Time-Period Representation ---+++ 4.1. Datetime All datetime values are always in [[http://en.wikipedia.org/wiki/Coordinated_Universal_Time][UTC]] and [[http://www.w3.org/TR/NOTE-datetime][W3C Date-Time format]] down to a minute precision, 'YYYY-MM-DDTHH:mmZ'. For example =2009-08-10T13:10Z= is August 10th 2009 at 13:10 UTC. ---++++ 4.1.1 End of the day in Datetime Values It is valid to express the end of day as a '24:00' hour (i.e. =2009-08-10T24:00Z=). However, for all calculations and display, Oozie resolves such dates as the zero hour of the following day (i.e. =2009-08-11T00:00Z=). ---+++ 4.2. Timezone Representation There is no widely accepted standard to identify timezones. Oozie Coordinator will understand the following timezone identifiers: * Generic NON-DST timezone identifier: =GMT[+/-]##:##= (i.e.: GMT+05:30) * ZoneInfo identifiers, with DST support, understood by Java JDK (about 600 IDs) (i.e.: America/Los_Angeles) Oozie Coordinator must provide a tool for developers to list all supported timezone identifiers. ---+++ 4.3. Timezones and Daylight-Saving While Oozie coordinator engine works in UTC, it provides DST support for coordinator applications. The baseline datetime for datasets and coordinator applications are expressed in UTC. The baseline datetime is the time of the first occurrence. Datasets and coordinator applications also contain a timezone indicator. The use of UTC as baseline enables a simple way of mix and matching datasets and coordinator applications that use a different timezone by just adding the timezone offset. The timezone indicator enables Oozie coordinator engine to properly compute frequencies that are daylight-saving sensitive. For example: a daily frequency can be 23, 24 or 25 hours for timezones that observe daylight-saving. Weekly and monthly frequencies are also affected by this as the number of hours in the day may change. Section #7 'Handling Timezones and Daylight Saving Time' explains how coordinator applications can be written to handle timezones and daylight-saving-time properly. ---+++ 4.4. Frequency and Time-Period Representation Frequency is used to capture the periodic intervals at which datasets that are produced, and coordinator applications are scheduled to run. This time periods representation is also used to specify non-recurrent time-periods, for example a timeout interval. For datasets and coordinator applications the frequency time-period is applied =N= times to the baseline datetime to compute recurrent times. Frequency is always expressed in minutes. Because the number of minutes in day may vary for timezones that observe daylight saving time, constants cannot be use to express frequencies greater than a day for datasets and coordinator applications for such timezones. For such uses cases, Oozie coordinator provides 2 EL functions, =${coord:days(int n)}= and =${coord:months(int n)}=. Frequencies can be expressed using EL constants and EL functions that evaluate to an positive integer number. *%GREEN% Examples: %ENDCOLOR%* | *EL Constant* | *Value* | *Example* | | =${coord:minutes(int n)}= | _n_ | =${coord:minutes(45)}= --> =45= | | =${coord:hours(int n)}= | _n * 60_ | =${coord:hours(3)}= --> =180= | | =${coord:days(int n)}= | _variable_ | =${coord:days(2)}= --> minutes in 2 full days from the current date | | =${coord:months(int n)}= | _variable_ | =${coord:months(1)}= --> minutes in a 1 full month from the current date | ---++++ 4.4.1. The coord:days(int n) and coord:endOfDays(int n) EL functions The =${coord:days(int n)}= and =${coord:endOfDays(int n)}= EL functions should be used to handle day based frequencies. Constant values should not be used to indicate a day based frequency (every 1 day, every 1 week, etc) because the number of hours in every day is not always the same for timezones that observe daylight-saving time. It is a good practice to use always these EL functions instead of using a constant expression (i.e. =24 * 60=) even if the timezone for which the application is being written for does not support daylight saving time. This makes application foolproof to country legislations changes and also makes applications portable across timezones. ---+++++ 4.4.1.1. The coord:days(int n) EL function The =${coord:days(int n)}= EL function returns the number of minutes for 'n' complete days starting with the day of the specified nominal time for which the computation is being done. The =${coord:days(int n)}= EL function includes *all* the minutes of the current day, regardless of the time of the day of the current nominal time. *%GREEN% Examples: %ENDCOLOR%* | *Starting Nominal UTC time* | *Timezone* | *Usage* | *Value* | *First Occurrence* | *Comments* | | =2009-01-01T08:00Z= | =UTC= | =${coord:days(1)}= | 1440 | =2009-01-01T08:00Z= | total minutes on 2009JAN01 UTC time | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =${coord:days(1)}= | 1440 | =2009-01-01T08:00Z= | total minutes in 2009JAN01 PST8PDT time | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =${coord:days(2)}= | 2880 | =2009-01-01T08:00Z= | total minutes in 2009JAN01 and 2009JAN02 PST8PDT time | | ||||| | =2009-03-08T08:00Z= | =UTC= | =${coord:days(1)}= | =2009-03-08T08:00Z= | 1440 | total minutes on 2009MAR08 UTC time | | =2009-03-08T08:00Z= | =Europe/London= | =${coord:days(1)}= | 1440 | =2009-03-08T08:00Z= | total minutes in 2009MAR08 BST1BDT time | | =2009-03-08T08:00Z= | =America/Los_Angeles= | =${coord:days(1)}= | 1380 | =2009-03-08T08:00Z= | total minutes in 2009MAR08 PST8PDT time
(2009MAR08 is DST switch in the US) | | =2009-03-08T08:00Z= | =UTC= | =${coord:days(2)}= | 2880 | =2009-03-08T08:00Z= | total minutes in 2009MAR08 and 2009MAR09 UTC time | | =2009-03-08T08:00Z= | =America/Los_Angeles= | =${coord:days(2)}= | 2820 | =2009-03-08T08:00Z= | total minutes in 2009MAR08 and 2009MAR09 PST8PDT time
(2009MAR08 is DST switch in the US) | | =2009-03-09T08:00Z= | =America/Los_Angeles= | =${coord:days(1)}= | 1440 | =2009-03-09T07:00Z= | total minutes in 2009MAR09 PST8PDT time
(2009MAR08 is DST ON, frequency tick is earlier in UTC) | For all these examples, the first occurrence of the frequency will be at =08:00Z= (UTC time). ---+++++ 4.4.1.2. The coord:endOfDays(int n) EL function The =${coord:endOfDays(int n)}= EL function is identical to the =${coord:days(int n)}= except that it shifts the first occurrence to the end of the day for the specified timezone before computing the interval in minutes. *%GREEN% Examples: %ENDCOLOR%* | *Starting Nominal UTC time* | *Timezone* | *Usage* | *Value* | *First Occurrence* | *Comments* | | =2009-01-01T08:00Z= | =UTC= | =${coord:endOfDays(1)}= | 1440 | =2009-01-02T00:00Z= | first occurrence in 2009JAN02 00:00 UTC time,
first occurrence shifted to the end of the UTC day | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =${coord:endOfDays(1)}= | 1440 | =2009-01-02T08:00Z= | first occurrence in 2009JAN02 08:00 UTC time,
first occurrence shifted to the end of the PST8PDT day | | =2009-01-01T08:01Z= | =America/Los_Angeles= | =${coord:endOfDays(1)}= | 1440 | =2009-01-02T08:00Z= | first occurrence in 2009JAN02 08:00 UTC time,
first occurrence shifted to the end of the PST8PDT day | | =2009-01-01T18:00Z= | =America/Los_Angeles= | =${coord:endOfDays(1)}= | 1440 | =2009-01-02T08:00Z= | first occurrence in 2009JAN02 08:00 UTC time,
first occurrence shifted to the end of the PST8PDT day | | ||||| | =2009-03-07T09:00Z= | =America/Los_Angeles= | =${coord:endOfDays(1)}= | 1380 | =2009-03-08T08:00Z= | first occurrence in 2009MAR08 08:00 UTC time
first occurrence shifted to the end of the PST8PDT day | | =2009-03-08T07:00Z= | =America/Los_Angeles= | =${coord:endOfDays(1)}= | 1440 | =2009-03-08T08:00Z= | first occurrence in 2009MAR08 08:00 UTC time
first occurrence shifted to the end of the PST8PDT day | | =2009-03-09T07:00Z= | =America/Los_Angeles= | =${coord:endOfDays(1)}= | 1440 | =2009-03-10T07:00Z= | first occurrence in 2009MAR10 07:00 UTC time
(2009MAR08 is DST switch in the US),
first occurrence shifted to the end of the PST8PDT day | 10 ${concurrency_level} ${execution_order} ${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE} ${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE} ${coord:current(0)} ${coord:current(1)} ${wf_app_path} wfInput ${coord:dataIn('input')} wfOutput ${coord:dataOut('output')} ---++++ 4.4.2. The coord:months(int n) and coord:endOfMonths(int n) EL functions The =${coord:months(int n)}= and =${coord:endOfMonths(int n)}= EL functions should be used to handle month based frequencies. Constant values cannot be used to indicate a month based frequency because the number of days in a month changes month to month and on leap years; plus the number of hours in every day of the month are not always the same for timezones that observe daylight-saving time. ---++++ 4.4.2.1. The coord:months(int n) EL function The =${coord:months(int n)}= EL function returns the number of minutes for 'n' complete months starting with the month of the current nominal time for which the computation is being done. The =${coord:months(int n)}= EL function includes *all* the minutes of the current month, regardless of the day of the month of the current nominal time. *%GREEN% Examples: %ENDCOLOR%* | *Starting Nominal UTC time* | *Timezone* | *Usage* | *Value* | *First Occurrence* | *Comments* | | =2009-01-01T08:00Z= | =UTC= | =${coord:months(1)}= | 44640 | =2009-01-01T08:00Z= |total minutes for 2009JAN UTC time | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =${coord:months(1)}= | 44640 | =2009-01-01T08:00Z= | total minutes in 2009JAN PST8PDT time | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =${coord:months(2)}= | 84960 | =2009-01-01T08:00Z= | total minutes in 2009JAN and 2009FEB PST8PDT time | | ||||| | =2009-03-08T08:00Z= | =UTC= | =${coord:months(1)}= | 44640 | =2009-03-08T08:00Z= | total minutes on 2009MAR UTC time | | =2009-03-08T08:00Z= | =Europe/London= | =${coord:months(1)}= | 44580 | =2009-03-08T08:00Z= | total minutes in 2009MAR BST1BDT time
(2009MAR29 is DST switch in Europe) | | =2009-03-08T08:00Z= | =America/Los_Angeles= | =${coord:months(1)}= | 44580 | =2009-03-08T08:00Z= | total minutes in 2009MAR PST8PDT time
(2009MAR08 is DST switch in the US) | | =2009-03-08T08:00Z= | =UTC= | =${coord:months(2)}= | 87840 | =2009-03-08T08:00Z= | total minutes in 2009MAR and 2009APR UTC time | | =2009-03-08T08:00Z= | =America/Los_Angeles= | =${coord:months(2)}= | 87780 | =2009-03-08T08:00Z= | total minutes in 2009MAR and 2009APR PST8PDT time
(2009MAR08 is DST switch in US) | ---++++ 4.4.2.2. The coord:endOfMonths(int n) EL function The =${coord:endOfMonths(int n)}= EL function is identical to the =${coord:months(int n)}= except that it shifts the first occurrence to the end of the month for the specified timezone before computing the interval in minutes. *%GREEN% Examples: %ENDCOLOR%* | *Starting Nominal UTC time* | *Timezone* | *Usage* | *Value* | *First Occurrence* | *Comments* | | =2009-01-01T00:00Z= | =UTC= | =${coord:endOfMonths(1)}= | 40320 | =2009-02-01T00:00Z= | first occurrence in 2009FEB 00:00 UTC time | | =2009-01-01T08:00Z= | =UTC= | =${coord:endOfMonths(1)}= | 40320 | =2009-02-01T00:00Z= | first occurrence in 2009FEB 00:00 UTC time | | =2009-01-31T08:00Z= | =UTC= | =${coord:endOfMonths(1)}= | 40320 | =2009-02-01T00:00Z= | first occurrence in 2009FEB 00:00 UTC time | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =${coord:endOfMonths(1)}= | 40320 | =2009-02-01T08:00Z= | first occurrence in 2009FEB 08:00 UTC time | | =2009-02-02T08:00Z= | =America/Los_Angeles= | =${coord:endOfMonths(1)}= | 44580 | =2009-03-01T08:00Z= | first occurrence in 2009MAR 08:00 UTC time | | =2009-02-01T08:00Z= | =America/Los_Angeles= | =${coord:endOfMonths(1)}= | 44580 | =2009-03-01T08:00Z= | first occurrence in 2009MAR 08:00 UTC time | 10 ${concurrency_level} ${execution_order} ${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE} ${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE} ${coord:current(0)} ${coord:current(1)} ${wf_app_path} wfInput ${coord:dataIn('input')} wfOutput ${coord:dataOut('output')} ---++ 5. Dataset A dataset is a collection of data referred to by a logical name. A dataset instance is a particular occurrence of a dataset and it is represented by a unique set of URIs. A dataset instance can be individually referred. Dataset instances for datasets containing ranges are identified by a set of unique URIs, otherwise a dataset instance is identified by a single unique URI. Datasets are typically defined in some central place for a business domain and can be accessed by the coordinator. Because of this, they can be defined once and used many times. A dataset is a synchronous (produced at regular time intervals, it has an expected frequency) input. A dataset instance is considered to be immutable while it is being consumed by coordinator jobs. ---+++ 5.1. Synchronous Datasets Instances of synchronous datasets are produced at regular time intervals, at an expected frequency. They are also referred to as "clocked datasets". Synchronous dataset instances are identified by their nominal creation time. The nominal creation time is normally specified in the dataset instance URI. A synchronous dataset definition contains the following information: * *%BLUE% name: %ENDCOLOR%* The dataset name. It must be a valid Java identifier. * *%BLUE% frequency: %ENDCOLOR%* It represents the rate, in minutes at which data is _periodically_ created. The granularity is in minutes and can be expressed using EL expressions, for example: ${5 * HOUR}. * *%BLUE% initial-instance: %ENDCOLOR%* The UTC datetime of the initial instance of the dataset. The initial-instance also provides the baseline datetime to compute instances of the dataset using multiples of the frequency. * *%BLUE% timezone:%ENDCOLOR%* The timezone of the dataset. * *%BLUE% uri-template:%ENDCOLOR%* The URI template that identifies the dataset and can be resolved into concrete URIs to identify a particular dataset instance. The URI template is constructed using: * *%BLUE% constants %ENDCOLOR%* See the allowable EL Time Constants below. Ex: ${YEAR}/${MONTH}. * *%BLUE% variables %ENDCOLOR%* Variables must be resolved at the time a coordinator job is submitted to the coordinator engine. They are normally provided a job parameters (configuration properties). Ex: ${market}/${language} The following EL constants can be used within synchronous dataset URI templates: | *EL Constant* | *Resulting Format* | *Comments* | | =YEAR= | _YYYY_ | 4 digits representing the year | | =MONTH= | _DD_ | 2 digits representing the month of the year, January = 1 | | =DAY= | _DD_ | 2 digits representing the day of the month | | =HOUR= | _HH_ | 2 digits representing the hour of the day, in 24 hour format, 0 - 23 | | =MIN= | _mm_ | 2 digits reprensenting the minute of the hour, 0 - 59 | *%PURPLE% Syntax: %ENDCOLOR%* [URI TEMPLATE] IMPORTANT: The values of the EL constants in the dataset URIs (in HDFS) are expected in UTC. Oozie Coordinator takes care of the timezone conversion when performing calculations. *%GREEN% Examples: %ENDCOLOR%* 1. *A dataset produced once every day at 00:15 PST8PDT:* hdfs://foo:9000/app/logs/${market}/${YEAR}${MONTH}/${DAY}/data 2. *A dataset available available on the 10th of each month:* hdfs://foo:9000/usr/app/stats/${YEAR}/${MONTH}/data 3. *A dataset available at the end of every quarter:* hdfs://foo:9000/usr/app/stats/${YEAR}/${MONTH}/data 4. *Normally the URI template of a dataset has a precision similar to the frequency:* hdfs://foo:9000/usr/app/logs/${YEAR}/${MONTH}/${DAY}/data The dataset would resolve to the following URIs: hdfs://foo:9000/usr/app/logs/2009/01/01/data hdfs://foo:9000/usr/app/logs/2009/01/02/data hdfs://foo:9000/usr/app/logs/2009/01/03/data ... 5. *However, if the URI template has a finer precision than the dataset frequency:* hdfs://foo:9000/usr/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/data The dataset resolves to the following URIs with fixed values for the finer precision template variables: hdfs://foo:9000/usr/app/logs/2009/01/01/10/30/data hdfs://foo:9000/usr/app/logs/2009/01/02/10/30/data hdfs://foo:9000/usr/app/logs/2009/01/03/10/30/data ... ---+++ 5.2. Asynchronous Datasets * TBD ---+++ 5.3. Dataset Definitions Dataset definitions are grouped in XML files. *%PURPLE% Syntax: %ENDCOLOR%* [SHARED_DATASETS] ... [URI TEMPLATE] ... *%GREEN% Example: %ENDCOLOR%* . hdfs://foo:9000/app/dataset-definitions/globallogs.xml . hdfs://foo:9000/app/logs/${market}/${YEAR}${MONTH}/${DAY}/${HOUR}/${MINUTE}/data . hdfs://foo:9000/usr/app/stats/${YEAR}/${MONTH}/data . ---++ 6. Coordinator Application ---+++ 6.1. Concepts ---++++ 6.1.1. Coordinator Application A coordinator application is a program that triggers actions (commonly workflow jobs) when a set of conditions are met. Conditions can be a time frequency, the availability of new dataset instances or other external events. Types of coordinator applications: * *Synchronous:* Its coordinator actions are created at specified time intervals. Coordinator applications are normally parameterized. ---++++ 6.1.2. Coordinator Job To create a coordinator job, a job configuration that resolves all coordinator application parameters must be provided to the coordinator engine. A coordinator job is a running instance of a coordinator application running from a start time to an end time. At any time, a coordinator job is in one of the following status: *PREP, PREMATER, RUNNING, SUSPENDED, SUCCEEDED, KILLED, FAILED*. Valid coordinator job status transitions are: * *PREP --> PREMATER | SUSPENDED | KILLED* * *PREMATER --> RUNNING | SUSPENDED | KILLED | FAILED | SUCCEEDED* * *RUNNING --> PREMATER | SUSPENDED | KILLED* * *SUSPENDED --> PREP | KILLED* NOTE: when a coordinator job is resumed, it will transit to *PREP*. A coordinator job creates coordinator actions (commonly workflow jobs) only for the duration of the coordinator job and only if the coordinator job is in *RUNNING* status. If the coordinator job has been suspended, when resumed it will create all the coordinator actions that should have been created during the time it was suspended, actions will not be lost, they will delayed. ---++++ 6.1.3. Coordinator Action A coordinator job creates and executes coordinator actions. A coordinator action is normally a workflow job that consumes and produces dataset instances. Once an coordinator action is created (this is also referred as the action being materialized), the coordinator action will be in waiting until all required inputs for execution are satisfied or until the waiting times out. ---+++++ 6.1.3.1. Coordinator Action Creation (Materialization) A coordinator job has one driver event that determines the creation (materialization) of its coordinator actions (typically a workflow job). * For synchronous coordinator jobs the driver event is the frequency of the coordinator job. ---+++++ 6.1.3.2. Coordinator Action Status Once a coordinator action has been created (materialized) the coordinator action qualifies for execution. At this point, the action status is *WAITING*. A coordinator action in *WAITING* status must wait until all its input events are available before is ready for execution. When a coordinator action is ready for execution its status is *READY*. A coordinator action in *WAITING* status may timeout before it becomes ready for execution. Then the action status is *TIMEDOUT*. A coordinator action may remain in *READY* status for a while, without starting execution, due to the concurrency execution policies of the coordinator job. A coordinator action in *READY* status changes to *SUBMITTED* status if total current *RUNNING* and *SUBMITTED* actions are less than concurrency execution limit. A coordinator action in *SUBMITTED* status changes to *RUNNING* status when the workflow engine start execution of the coordinator action. A coordinator action is in *RUNNING* status until the associated workflow job completes its execution. Depending on the workflow job completion status, the coordinator action will be in *SUCCEEDED*, *KILLED* or *FAILED* status. A coordinator action in *WAITING*, *READY*, *SUBMITTED* or *RUNNING* status can be killed, changing to *KILLED* status. A coordinator action in *SUBMITTED* or *RUNNING* status can also fail, changing to *FAILED* status. Valid coordinator action status transitions are: * *WAITING --> READY | TIMEDOUT | KILLED* * *READY --> SUBMITTED | KILLED* * *SUBMITTED --> RUNNING | KILLED | FAILED* * *RUNNING --> SUCCEEDED | KILLED | FAILED* ---++++ 6.1.4. Input Events The Input events of a coordinator application specify the input conditions that are required in order to execute a coordinator action. In the current specification input events are restricted to dataset instances availability. All the datasets instances defined as input events must be available for the coordinator action to be ready for execution ( *READY* status). Input events are normally parameterized. For example, the last 24 hourly instances of the 'searchlogs' dataset. Input events can be refer to multiple instances of multiple datasets. For example, the last 24 hourly instances of the 'searchlogs' datset and the last weekly instance of the 'celebrityRumours' dataset. ---++++ 6.1.5. Output Events A coordinator action can produce one or more dataset(s) instances as output. Dataset instances produced as output by one coordinator actions may be consumed as input by another coordinator action(s) of other coordinator job(s). The chaining of coordinator jobs via the datasets they produce and consume is referred as a *data pipeline.* In the current specification coordinator job output events are restricted to dataset instances. ---++++ 6.1.6. Coordinator Action Execution Policies The execution policies for the actions of a coordinator job can be defined in the coordinator application. * Timeout: A coordinator job can specify the timeout for its coordinator actions, this is, how long the coordinator action will be in *WAITING* or *READY* status before giving up on its execution. * Concurrency: A coordinator job can specify the concurrency for its coordinator actions, this is, how many coordinator actions are allowed to run concurrently ( *RUNNING* status) before the coordinator engine starts throttling them. * Execution strategy: A coordinator job can specify the execution strategy of its coordinator actions when there is backlog of coordinator actions in the coordinator engine. The different execution strategies are 'oldest first', 'newest first' and 'last one only'. A backlog normally happens because of delayed input data, concurrency control or because manual re-runs of coordinator jobs. ---++++ 6.1.7. Data Pipeline Application Commonly, multiple workflow applications are chained together to form a more complex application. Workflow applications are run on regular basis, each of one of them at their own frequency. The data consumed and produced by these workflow applications is relative to the nominal time of workflow job that is processing the data. This is a *coordinator application*. The output of multiple workflow jobs of a single workflow application is then consumed by a single workflow job of another workflow application, this is done on regular basis as well. These workflow jobs are triggered by recurrent actions of coordinator jobs. This is a set of *coordinator jobs* that inter-depend on each other via the data they produce and consume. This set of interdependent *coordinator applications* is referred as a *data pipeline application*. ---+++ 6.2. Synchronous Coordinator Application Example * The =checkouts= synchronous dataset is created every 15 minutes by an online checkout store. * The =hourlyRevenue= synchronous dataset is created every hour and contains the hourly revenue. * The =dailyRevenue= synchronous dataset is created every day and contains the daily revenue. * The =monthlyRevenue= synchronous dataset is created every month and contains the monthly revenue. * The =revenueCalculator-wf= workflow consumes checkout data and produces as output the corresponding revenue. * The =rollUpRevenue-wf= workflow consumes revenue data and produces a consolidated output. * The =hourlyRevenue-coord= coordinator job triggers, every hour, a =revenueCalculator-wf= workflow. It specifies as input the last 4 =checkouts= dataset instances and it specifies as output a new instance of the =hourlyRevenue= dataset. * The =dailyRollUpRevenue-coord= coordinator job triggers, every day, a =rollUpRevenue-wf= workflow. It specifies as input the last 24 =hourlyRevenue= dataset instances and it specifies as output a new instance of the =dailyRevenue= dataset. * The =monthlyRollUpRevenue-coord= coordinator job triggers, once a month, a =rollUpRevenue-wf= workflow. It specifies as input all the =dailyRevenue= dataset instance of the month and it specifies as ouptut a new instance of the =monthlyRevenue= dataset. This example contains describes all the components that conform a data pipeline: datasets, coordinator jobs and coordinator actions (workflows). The coordinator actions (the workflows) are completely agnostic of datasets and their frequencies, they just use them as input and output data (i.e. HDFS files or directories). Furthermore, as the example shows, the same workflow can be used to process similar datasets of different frequencies. The frequency of the =hourlyRevenue-coord= coordinator job is 1 hour, this means that every hour a coordinator action is created. A coordinator action will be executed only when the 4 =checkouts= dataset instances for the corresponding last hour are available, until then the coordinator action will remain as created (materialized), in *WAITING* status. Once the 4 dataset instances for the corresponding last hour are available, the coordinator action will be executed and it will start a =revenueCalculator-wf= workflow job. ---+++ 6.3. Synchronous Coordinator Application Definition A synchronous coordinator definition is a is defined by a name, start time and end time, the frequency of creation of its coordinator actions, the input events, the output events and action control information: * *%BLUE% start: %ENDCOLOR%* The start datetime for the job. Starting at this time actions will be materialized. Refer to section #3 'Datetime Representation' for syntax details. * *%BLUE% end: %ENDCOLOR%* The end datetime for the job. When actions will stop being materialized. Refer to section #3 'Datetime Representation' for syntax details. * *%BLUE% timezone:%ENDCOLOR%* The timezone of the coordinator application. * *%BLUE% frequency: %ENDCOLOR%* The frequency, in minutes, to materialize actions. Refer to section #4 'Time Interval Representation' for syntax details. * Control information: * *%BLUE% timeout: %ENDCOLOR%* The maximum time, in minutes, that a materialized action will be waiting for the additional conditions to be satisfied before being discarded. A timeout of =0= indicates that at the time of materialization all the other conditions must be satisfied, else the action will be discarded. A timeout of =0= indicates that if all the input events are not satisfied at the time of action materizlization, the action should timeout immediately. A timeout of =-1= indicates no timeout, the materialized action will wait forever for the other conditions to be satisfied. The default value is =-1=. * *%BLUE% concurrency: %ENDCOLOR%* The maximum number of actions for this job that can be running at the same time. This value allows to materialize and submit multiple instances of the coordinator app, and allows operations to catchup on delayed processing. The default value is =1=. * *%BLUE% execution: %ENDCOLOR%* Specifies the execution order if multiple instances of the coordinator job have satisfied their execution criteria. Valid values are: * =FIFO= (oldest first) *default*. * =LIFO= (newest first). * =ONLYLAST= (discards all older materializations). * *%BLUE% datasets: %ENDCOLOR%* The datasets coordinator application uses. * *%BLUE% input-events: %ENDCOLOR%* The coordinator job input events. * *%BLUE% data-in: %ENDCOLOR%* It defines one job input condition that resolves to one or more instances of a dataset. * *%BLUE% name: %ENDCOLOR%* input condition name. * *%BLUE% dataset: %ENDCOLOR%* dataset name. * *%BLUE% instance: %ENDCOLOR%* refers to a single dataset instance (the time for a synchronous dataset). * *%BLUE% start-instance: %ENDCOLOR%* refers to the beginning of an instance range (the time for a synchronous dataset). * *%BLUE% end-instance: %ENDCOLOR%* refers to the end of an instance range (the time for a synchronous dataset). * *%BLUE% output-events: %ENDCOLOR%* The coordinator job output events. * *%BLUE% data-out: %ENDCOLOR%* It defines one job output that resolves to a dataset instance. * *%BLUE% name: %ENDCOLOR%* output name. * *%BLUE% dataset: %ENDCOLOR%* dataset name. * *%BLUE% instance: %ENDCOLOR%* dataset instance that will be generated by coordinator action. * *%BLUE% action: %ENDCOLOR%* The coordinator action to execute. * *%BLUE% workflow: %ENDCOLOR%* The workflow job invocation. Workflow job properties can refer to the defined data-in and data-out elements. *%PURPLE% Syntax: %ENDCOLOR%* [TIME_PERIOD] [CONCURRENCY] [EXECUTION_STRATEGY] . [SHARED_DATASETS] ... . [URI_TEMPLATE] ... . . [INSTANCE] ... ... [INSTANCE] [INSTANCE] ... [INSTANCE] ... [WF-APPLICATION-PATH] [PROPERTY-NAME] [PROPERTY-VALUE] ... *%GREEN% Examples: %ENDCOLOR%* *1. A Coordinator Job that creates an executes a single coordinator action:* The following example describes a synchronous coordinator application that runs once a day for 1 day at the end of the day. It consumes an instance of a daily 'logs' dataset and produces an instance of a daily 'siteAccessStats' dataset. *Coordinator application definition:* hdfs://bar:9000/app/logs/${YEAR}${MONTH}/${DAY}/data hdfs://bar:9000/app/stats/${YEAR}/${MONTH}/${DAY}/data 2009-01-02T08:00Z 2009-01-02T08:00Z hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput ${coord:dataIn('input')} wfOutput ${coord:dataOut('output')} There are 2 synchronous datasets with a daily frequency and they are expected at the end of each PST8PDT day. This coordinator job runs for 1 day on January 1st 2009 at 24:00 PST8PDT. The workflow job invocation for the single coordinator action would resolve to: hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput hdfs://bar:9000/app/logs/200901/02/data wfOutput hdfs://bar:9000/app/stats/2009/01/02/data IMPORTANT: Note Oozie works in UTC datetimes, all URI templates resolve to UTC datetime values. Because of the timezone difference between UTC and PST8PDT, the URIs resolves to =2009-01-02T08:00Z= (UTC) which is equivalent to 2009-01-01T24:00PST8PDT= (PST). There is single input event, which resolves to January 1st PST8PDT instance of the 'logs' dataset. There is single output event, which resolves to January 1st PST8PDT instance of the 'siteAccessStats' dataset. The =${coord:dataIn(String name)}= and =${coord:dataOut(String name)}= EL functions resolve to the dataset instance URIs of the corresponding dataset instances. These EL functions are properly defined in a subsequent section. Because the =${coord:dataIn(String name)}= and =${coord:dataOut(String name)}= EL functions resolve to URIs, which are HDFS URIs, the workflow job itself does not deal with dataset instances, just HDFS URIs. *2. A Coordinator Job that executes its coordinator action multiple times:* A more realistic version of the previous example would be a coordinator job that runs for a year creating a daily action an consuming the daily 'logs' dataset instance and producing the daily 'siteAccessStats' dataset instance. The coordinator application is identical, except for the frequency, 'end' date and parameterization in the input and output events sections: hdfs://bar:9000/app/logs/${YEAR}${MONTH}/${DAY}/data hdfs://bar:9000/app/stats/${YEAR}/${MONTH}/${DAY}/data ${coord:current(0)} ${coord:current(0)} hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput ${coord:dataIn('input')} wfOutput ${coord:dataOut('output')} The =${coord:current(int offset)}= EL function resolves to coordinator action creation time, that would be the current day at the time the coordinator action is created: =2009-01-02T08:00 ... 2010-01-01T08:00=. This EL function is properly defined in a subsequent section. There is single input event, which resolves to the current day instance of the 'logs' dataset. There is single output event, which resolves to the current day instance of the 'siteAccessStats' dataset. The workflow job invocation for the first coordinator action would resolve to: hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput hdfs://bar:9000/app/logs/200901/02/data wfOutput hdfs://bar:9000/app/stats/2009/01/02/data For the second coordinator action it would resolve to: hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput hdfs://bar:9000/app/logs/200901/03/data wfOutput hdfs://bar:9000/app/stats/2009/01/03/data And so on. *3. A Coordinator Job that executes its coordinator action multiple times and as input takes multiple dataset instances:* The following example is a variation of the example #2 where the synchronous coordinator application runs weekly. It consumes the of the last 7 instances of a daily 'logs' dataset and produces an instance of a weekly 'weeklySiteAccessStats' dataset. 'logs' is a synchronous dataset with a daily frequency and it is expected at the end of each day (24:00). 'weeklystats' is a synchronous dataset with a weekly frequency and it is expected at the end (24:00) of every 7th day. The coordinator application frequency is weekly and it starts on the 7th day of the year: hdfs://bar:9000/app/logs/${YEAR}${MONTH}/${DAY} hdfs://bar:9000/app/weeklystats/${YEAR}/${MONTH}/${DAY} ${coord:current(-6)} ${coord:current(0)} ${coord:current(0)} hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput ${coord:dataIn('input')} wfOutput ${coord:dataOut('output')} The =${coord:current(int offset)}= EL function resolves to coordinator action creation time minus the specified offset multiplied by the dataset frequency. This EL function is properly defined in a subsequent section. The input event, instead resolving to a single 'logs' dataset instance, it refers to a range of 7 dataset instances - the instance for 6 days ago, 5 days ago, ... and today's instance. The output event resolves to the current day instance of the 'weeklySiteAccessStats' dataset. As the coordinator job will create a coordinator action every 7 days, dataset instances for the 'weeklySiteAccessStats' dataset will be created every 7 days. The workflow job invocation for the first coordinator action would resolve to: hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput hdfs://bar:9000/app/logs/200901/01,hdfs://bar:9000/app/logs/200901/02, hdfs://bar:9000/app/logs/200901/03,hdfs://bar:9000/app/logs/200901/05, hdfs://bar:9000/app/logs/200901/05,hdfs://bar:9000/app/logs/200901/06, hdfs://bar:9000/app/logs/200901/07 wfOutput hdfs://bar:9000/app/stats/2009/01/07 For the second coordinator action it would resolve to: hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput hdfs://bar:9000/app/logs/200901/08,hdfs://bar:9000/app/logs/200901/09, hdfs://bar:9000/app/logs/200901/10,hdfs://bar:9000/app/logs/200901/11, hdfs://bar:9000/app/logs/200901/12,hdfs://bar:9000/app/logs/200901/13, hdfs://bar:9000/app/logs/200901/16 wfOutput hdfs://bar:9000/app/stats/2009/01/16 And so on. ---+++ 6.4. Asynchronous Coordinator Application Definition * TBD ---+++ 6.5. Parameterization of Coordinator Applications When a coordinator job is submitted to Oozie, the submitter may specify as many coordinator job configuration properties as required (similar to Hadoop JobConf properties). Configuration properties that are a valid Java identifier, [A-Za-z_][0-9A-Za-z_]*, are available as =${NAME}= variables within the coordinator application definition. Configuration Properties that are not a valid Java identifier, for example =job.tracker=, are available via the =${coord:conf(String name)}= function. Valid Java identifier properties are available via this function as well. Using properties that are valid Java identifiers result in a more readable and compact definition. Dataset definitions can be also parameterized, the parameters are resolved using the configuration properties of Job configuration used to submit the coordinator job. If a configuration property used in the definitions is not provided with the job configuration used to submit a coordinator job, the value of the parameter will be undefined and the job submission will fail. *%GREEN% Example: %ENDCOLOR%* Coordinator application definition: hdfs://bar:9000/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY}/${HOUR} ${coord:current(-23)} ${coord:current(0)} ... In the above example there are 6 configuration parameters (variables) that have to be provided when submitting a job: * =jobStart= : start datetime for the job, in UTC * =jobEnd= : end datetime for the job, in UTC * =logsInitialInstance= : expected time of the first logs instance, in UTC * =timezone= : timezone for the job and the dataset * =market= : market to compute by this job, used in the uri-template * =language= : language to compute by this job, used in the uri-template IMPORTANT: Note that this example is not completely correct as it always consumes the last 24 instances of the 'logs' dataset. It is assumed that all days have 24 hours. For timezones that observe daylight saving this application will not work as expected as it will consume the wrong number of dataset instances in DST switch days. To be able to handle these scenarios, the =${coord:hoursInDays(int n)}= and =${coord:daysInMonths(int n)}= EL functions must be used (refer to section #6.6.2 and #6.6.3). ---+++ 6.6. Parameterization of Dataset Instances in Input and Output Events A coordinator application job typically launches several coordinator actions during its lifetime. A coordinator action typically uses its creation (materialization) time to resolve the specific datasets instances required for its input and output events. The following EL functions are the means for binding the coordinator action creation time to the datasets instances of its input and output events. ---++++ 6.6.1. coord:current(int n) EL Function for Synchronous Datasets =${coord:current(int n)}= represents the nth dataset instance for a *synchronous* dataset, relative to the coordinator action creation (materialization) time. The coordinator action creation (materialization) time is computed based on the coordinator job start time and its frequency. The nth dataset instance is computed based on the dataset's initial-instance datetime, its frequency and the (current) coordinator action creation (materialization) time. =n= can be a negative integer, zero or a positive integer. =${coord:current(int n)}= returns the nominal datetime for nth dataset instance relative to the coordinator action creation (materialization) time. =${coord:current(int n)}= performs the following calculation: DS_II : dataset initial-instance (datetime) DS_FREQ: dataset frequency (minutes) CA_NT: coordinator action creation (materialization) nominal time coord:current(int n) = DS_II + DS_FREQ * ( (CA_NT - DS_II) div DS_FREQ + n) NOTE: The formula above is not 100% correct, because DST changes the calculation has to account for hour shifts. Oozie Coordinator must make the correct calculation accounting for DTS hour shifts. When a positive integer is used with the =${coord:current(int n)}=, it refers to a dataset instance in the future from the coordinator action creation (materialization) time. This can be useful when creating dataset instances for future use by other systems. The datetime returned by =${coord:current(int n)}= returns the exact datetime for the computed dataset instance. *IMPORTANT:* The coordinator engine does use output events to keep track of new dataset instances. Workflow jobs triggered from coordinator actions can leverage the coordinator engine capability to synthesize dataset instances URIs to create output directories. *%GREEN% Examples: %ENDCOLOR%* 1. *=${coord:current(int n)}= datetime calculation:* Datasets Definition: . hdfs://bar:9000/app/logs/${YEAR}${MONTH}/${DAY} . hdfs://bar:9000/app/weeklystats/${YEAR}/${MONTH}/${DAY} . For a coordinator action creation time: =2009-05-29T24:00Z= the =${coord:current(int n)}= EL function would resolve to the following datetime values for the 'logs' and 'weeklySiteStats' datasets: | *${coord:current(int offset)}* | *Dataset 'logs'* | *Dataset 'weeklySiteAccessStats'* | | =${coord:current(0)}= | =2009-05-29T24:00Z= | =2009-05-27T24:00Z= | | =${coord:current(1)}= | =2009-05-30T24:00Z= | =2009-06-03T24:00Z= | | =${coord:current(-1)}= | =2009-05-28T24:00Z= | =2009-05-20T24:00Z= | | =${coord:current(-3)}= | =2009-05-26T24:00Z= | =2009-05-06T24:00Z= | Note, in the example above, how the datetimes resolved for the 2 datasets differ when the =${coord:current(int n)}= function is invoked with the same argument. This is because the =${coord:current(int n)}= function takes into consideration the initial-time and the frequency for the dataset for which is performing the calculation. Datasets Definition file 'datasets.xml': hdfs://bar:9000/app/logs/${YEAR}${MONTH}/${DAY}/${HOUR} a. Coordinator application definition that creates a coordinator action once a day for a year, that is 365 coordinator actions: datasets.xml ${coord:current(-23)} ${coord:current(0)} ... Each coordinator action will require as input events the last 24 (-23 to 0) dataset instances for the 'logs' dataset. Because the dataset 'logs' is a hourly dataset, it means all its instances for the last 24 hours. In this case, the dataset instances are used in a rolling window fashion. b. Coordinator application definition that creates a coordinator action once an hour for a year, that is 8760 (24*8760) coordinator actions: datasets.xml ${coord:current(-23)} ${coord:current(0)} ... Each coordinator action will require as input events the last 24 (-23 to 0) dataset instances for the 'logs' dataset. Similarly to the previous coordinator application example, it means all its instances for the last 24 hours. However, because the frequency is hourly instead of daily, each coordinator action will use the last 23 dataset instances used by the previous coordinator action plus a new one. In this case, the dataset instances are used in a sliding window fashion. 3. *Using =${coord:current(int n)}= to specify dataset instances created by a coordinator application:* Datasets Definition file 'datasets.xml': . hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} . hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY} . Coordinator application definition: datasets.xml ${coord:current(-23)} ${coord:current(0)} ${coord:current(0)} ... This coordinator application creates a coordinator action once a day for a year, this is 365 coordinator actions. Each coordinator action will require as input events the last 24 (-23 to 0) dataset instances for the 'logs' dataset. Each coordinator action will create as output event a new dataset instance for the 'stats' dataset. Note that the 'stats' dataset initial-instance and frequency match the coordinator application start and frequency. 4. *Using =${coord:current(int n)}= to create a data-pipeline using a coordinator application:* This example shows how to chain together coordinator applications to create a data pipeline. Dataset definitions file 'datasets.xml': . hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE} . hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} . hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY} Coordinator application definitions. A data-pipeline with two coordinator-applications, one scheduled to run every hour, and another scheduled to run every day: datasets.xml ${coord:current(-3)} ${coord:current(0)} ${coord:current(0)} ... datasets.xml ${coord:current(-23)} ${coord:current(0)} ${coord:current(0)} ... The 'app-coord-hourly' coordinator application runs every every hour, uses 4 instances of the dataset "15MinLogs" to create one instance of the dataset "1HourLogs" The 'app-coord-daily' coordinator application runs every every day, uses 24 instances of "1HourLogs" to create one instance of "1DayLogs" The output datasets from the 'app-coord-hourly' coordinator application are the input to the 'app-coord-daily' coordinator application thereby forming a simple data-pipeline application. ---++++ 6.6.2. coord:hoursInDay(int n) EL Function for Synchronous Datasets The =${coord:hoursInDay(int n)}= EL function returns the number of hours for the specified day, in a timezone/daylight-saving sensitive way. =n= is offset (in days) from the current nominal time. A negative value is the nth previous day. Zero is the current day. A positive number is the nth next day. The returned value is calculated taking into account timezone daylight-saving information. Normally it returns =24=, only DST switch days for the timezone in question it will return either =23= or =25=. *%GREEN% Examples: %ENDCOLOR%* | *Nominal UTC time* | *Timezone* | *EndOfFlag* | *Usage* | *Value* | *Comments* | | =2009-01-01T08:00Z= | =UTC= | =NO= |=${coord:hoursInDay(0)}= | 24 | hours in 2009JAN01 UTC | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =NO= |=${coord:hoursInDay(0)}= | 24 | hours in 2009JAN01 PST8PDT time | | =2009-01-01T08:00Z= | =America/Los_Angeles= | =NO= |=${coord:hoursInDay(-1)}= | 24 | hours in 2008DEC31 PST8PDT time | | ||||| | | =2009-03-08T08:00Z= | =UTC= | =NO= | =${coord:hoursInDay(0)}= | 24 | hours in 2009MAR08 UTC time | | =2009-03-08T08:00Z= | =Europe/London= | =NO= | =${coord:hoursInDay(0)}= | 24 | hours in 2009MAR08 BST1BDT time | | =2009-03-08T08:00Z= | =America/Los_Angeles= | =NO= | =${coord:hoursInDay(0)}= | 23 | hours in 2009MAR08 PST8PDT time
(2009MAR08 is DST switch in the US) | | =2009-03-08T08:00Z= | =America/Los_Angeles= | =NO= | =${coord:hoursInDay(1)}= | 24 | hours in 2009MAR09 PST8PDT time | | =2009-03-07T08:00Z= | =America/Los_Angeles= | =EndOfDay= | =${coord:hoursInDay(0)}= | 24 | hours in 2009MAR07 PST8PDT time | | =2009-03-07T08:00Z= | =America/Los_Angeles= | =EndOfDay= | =${coord:hoursInDay(1)}= | 23 | hours in 2009MAR08 PST8PDT time
(2009MAR08 is DST switch in the US) | Coordinator application definition: hdfs://bar:9000/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY}/${HOUR} ${coord:current( -(coord:hoursInDay(0) - 1) )} ${coord:current(0)} ... This example is the example of section #6.5 but with a minor change. The argument for the =${coord:current(int n)}= function in the 'start-instance' element, instead using =-23=, the example now uses =-(coord:hoursInDay(0) - 1)=. This simple change fully enables this coordinator application to handle daily data (produced hourly) for any timezone, with timezones observing or not daylight saving. For timezones observing daylight saving, on the days of DST switch, the function will resolve to =23= or =25=, thus the dataset instances used will be for for the day in the DST sense. For timezones not observing daylight saving, it always returns =24=. ---++++ 6.6.3. coord:daysInMonth(int n) EL Function for Synchronous Datasets The =${coord:daysInMonth(int n)}= EL function returns the number of days for month of the specified day. =n= is offset (in months) from the current nominal time. A negative value is the nth previous month. Zero is the current month. A positive number is the nth next month. The returned value is calculated taking into account leap years information. The =${coord:daysInMonth(int n)}= EL function can be used to express monthly ranges for dataset instances. *%GREEN% Examples: %ENDCOLOR%* | *Nominal UTC time* | *Timezone* |*EndOfFlag* | *Usage* | *Value* | *Comments* | | =2008-02-01T00:00Z= | =UTC= | =NO= | =${coord:daysInMonth(0)}= | 29 | days in 2008FEB UTC time | | =2009-02-01T00:00Z= | =UTC= | =NO= | =${coord:daysInMonth(0)}= | 28 | days in 2009FEB UTC time | | =2009-02-01T00:00Z= | =UTC= | =NO= | =${coord:daysInMonth(-1)}= | 31 | days in 2009JAN UTC time | | =2009-03-01T00:00Z= | =UTC= | =NO= | =${coord:daysInMonth(1)}= | 30 | days in 2009APR UTC time | | =2009-02-01T00:00Z= | =Americas/Los_Angeles= | =NO= |=${coord:daysInMonth(0)}= | 31 | days in 2009JAN PST8PDT time, note that the nominal time is UTC | ||||||| | =2008-02-01T00:00Z= | =UTC= | =EndOfMonth= | =${coord:daysInMonth(0)}= | 29 | days in 2008FEB UTC time | | =2008-02-01T00:00Z= | =UTC= | =EndOfMonth= | =${coord:daysInMonth(-1)}= | 31 | days in 2008JAN UTC time | | =2009-02-01T00:00Z= | =UTC= | =EndOfMonth= | =${coord:daysInMonth(0)}= | 28 | days in 2009FEB UTC time | | =2009-02-01T00:00Z= | =UTC= | =EndOfMonth= | =${coord:daysInMonth(-1)}= | 31 | days in 2009JAN UTC time | | =2009-03-01T00:00Z= | =UTC= | =EndOfMonth= | =${coord:daysInMonth(1)}= | 30 | days in 2009APR UTC time | | =2009-02-01T00:00Z= | =Americas/Los_Angeles= | =EndOfMonth= |=${coord:daysInMonth(0)}= | 31 | days in 2009JAN PST8PDT time, note that the nominal time is UTC | Coordinator application definition: hdfs://bar:9000/app/logs/${market}/${language}/${YEAR}${MONTH}/${DAY} ${coord:current( -(coord:daysInMonth(0) - 1) )} ${coord:current(0)} ... This example is a coordinator application that runs monthly, and consumes the daily feeds for the last month. ---++++ 6.6.4. coord:tzOffset() EL Function for Synchronous Datasets =${coord:tzOffset()}= EL function returns the difference in *minutes* between a dataset timezone and the coordinator job timezone at the current nominal time. This EL function is useful when dealing with datasets from multiple timezones, but execute in a different timezone. DS_TZ : dataset TZ offset in minutes at the current nominal time (UTC offset) JOB_TZ: coordinator job UTC TZ offset in minutes at the current nominal time (UTC offset). coord:tzOffset() = DS_TZ - JOB_TZ For example: Los Angeles Winter offset (no DST) is =-480= (-08:00 hours). India offset is =-330= (+05:30 hours). The value returned by this function may change because of the daylight saving rules of the 2 timezones. For example, between Continental Europe and The U.S. West coast, most of the year the timezone different is 9 hours, but there are a few day or weeks. IMPORTANT: While the offset is multiples of 60 for most timezones, it can be multiple of 30 mins when one of the timezones is has a =##:30= offset (i.e. India). Refer to section #7, 3nd use case for a detailed example. ---++++ 6.6.5. coord:latest(int n) EL Function for Synchronous Datasets =${coord:latest(int n)}= represents the nth latest currently available instance of a *synchronous* dataset. =${coord:latest(int n)}= is not relative to the coordinator action creation (materialization) time, it is the nth latest instance available when the action is started (when the workflow job is started). If a coordinator job is suspended, when resumed, all usages of =${coord:latest(int n)}= will be resolved to the currently existent instances. Finally, it is not possible to represent the latest dataset when execution reaches a node in the workflow job. The resolution of latest dataset instances happens at action start time (workflow job start time). The parameter =n= can be a negative integer or zero. Where =0= means the latest instance available, =-1= means the second latest instance available, etc. the =${coord:latest(int n)}= ignores gaps in dataset instances, it just looks for the latest nth instance available. *%GREEN% Example: %ENDCOLOR%*: Coordinator application definition: hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:latest(-2)} ${coord:latest(0)} ... If the available dataset instances in HDFS at time of a coordinator action being executed are: hdfs://bar:9000/app/logs/2009/01/01 hdfs://bar:9000/app/logs/2009/01/02 hdfs://bar:9000/app/logs/2009/01/03 (missing) hdfs://bar:9000/app/logs/2009/01/05 (missing) hdfs://bar:9000/app/logs/2009/01/07 (missing) (missing) hdfs://bar:9000/app/logs/2009/01/10 Then, the dataset instances for the input events for the coordinator action will be: hdfs://bar:9000/app/logs/2009/01/05 hdfs://bar:9000/app/logs/2009/01/10 ---++++ 6.6.6. coord:future(int n, int limit) EL Function for Synchronous Datasets =${coord:future(int n, int limit)}= represents the nth currently available future instance of a *synchronous* dataset while looking ahead for 'limit' number of instances. =${coord:future(int n, int limit)}= is relative to the coordinator action creation (materialization) time. The coordinator action creation (materialization) time is computed based on the coordinator job start time and its frequency. The nth dataset instance is computed based on the dataset's initial-instance datetime, its frequency and the (current) coordinator action creation (materialization) time. =n= can be a zero or a positive integer. Where =0= means the immediate instance available, =1= means the second next instance available, etc. =limit= should be a positive integer. Where =3= means search for nth next instance and should not check beyond =3= instance. The =${coord:future(int n, int limit)}= ignores gaps in dataset instances, it just looks for the next nth instance available. *%GREEN% Example: %ENDCOLOR%*: Coordinator application definition: hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:future(0, 10)} ${coord:future(2, 10)} ... If the available dataset instances in HDFS at time of a coordinator action being executed are: hdfs://bar:9000/app/logs/2009/02/01 (missing) (missing) (missing) hdfs://bar:9000/app/logs/2009/02/04 (missing) (missing) hdfs://bar:9000/app/logs/2009/02/07 (missing) (missing) (missing) hdfs://bar:9000/app/logs/2009/02/11 (missing) (missing) hdfs://bar:9000/app/logs/2009/02/14 (missing) hdfs://bar:9000/app/logs/2009/02/16 Then, the dataset instances for the input events for the coordinator action will be: hdfs://bar:9000/app/logs/2009/02/01 hdfs://bar:9000/app/logs/2009/02/07 ---++++ 6.6.7. coord:version(int n) EL Function for Asynchronous Datasets * TBD ---++++ 6.6.8. coord:latest(int n) EL Function for Asynchronous Datasets * TBD ---++++ 6.6.9. Dataset Instance Resolution for Instances Before the Initial Instance When defining input events that refer to dataset instances it may be possible that the resolution of instances is out of it lower bound. This is scenario is likely to happen when the instance resolution is very close to the initial-instance. This is useful for bootstrapping the application. To address this edge scenario, Oozie Coordinator silently ignores dataset instances out of bounds. *%GREEN% Example: %ENDCOLOR%*: Coordinator application definition: hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:current(-23)} ${coord:current(0)} ... In the case of the synchronous 'logs' dataset, for the first action of this coordinator job, the instances referred in the input events will resolve to just 1 instance. For the second action it will resolve to 2 instances. And so on. Only after the 24th action, the input events will resolve constantly to 24 instances. In other words, while =${coord:current(-23)}= resolves to datetimes prior to the 'initial-instance' the required range will start from the 'initial-instance', '2009-01-01T00:00Z' in this example. ---+++ 6.7. Parameterization of Coordinator Application Actions Actions started by a coordinator application normally require access to the dataset instances resolved by the input and output events to be able to propagate them to the the workflow job as parameters. The following EL functions are the mechanism that enables this propagation. ---++++ 6.7.1. coord:dataIn(String name) EL Function The =${coord:dataIn(String name)}= EL function resolves to all the URIs for the dataset instances specified in an input event dataset section. The =${coord:dataIn(String name)}= is commonly used to pass the URIs of dataset instances that will be consumed by a workflow job triggered by a coordinator action. *%GREEN% Example: %ENDCOLOR%*: Coordinator application definition: hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:current( -(coord:hoursInDay(0) - 1) )} ${coord:current(-1)} hdfs://bar:9000/usr/tucu/logsprocessor-wf wfInput ${coord:dataIn('inputLogs')} In this example, each coordinator action will use as input events the the last day hourly instances of the 'logs' dataset. The =${coord:dataIn(String name)}= function enables the coordinator application to pass the URIs of all the dataset instances for the last day to the workflow job triggered by the coordinator action. For the =2009-01-02T00:00Z" run, the =${coord:dataIn('inputLogs')}= function will resolve to: hdfs://bar:9000/app/logs/2009/01/01/01, hdfs://bar:9000/app/logs/2009/01/01/02, ... hdfs://bar:9000/app/logs/2009/01/01/23, hdfs://bar:9000/app/logs/2009/02/00/00 The =${coord:dataIn('inputLogs')}= is used for workflow job configuration property 'wfInput' for the workflow job that will be submitted by the coordinator action on January 2nd 2009. Thus, when the workflow job gets started, the 'wfInput' workflow job configuration property will contain all the above URIs. Note that all the URIs form a single string value and the URIs are separated by commas. Multiple HDFS URIs separated by commas can be specified as input data to a Map/Reduce job. ---++++ 6.7.2. coord:dataOut(String name) EL Function The =${coord:dataOut(String name)}= EL function resolves to all the URIs for the dataset instance specified in an output event dataset section. The =${coord:dataOut(String name)}= is commonly used to pass the URIs of a dataset instance that will be produced by a workflow job triggered by a coordinator action. *%GREEN% Example: %ENDCOLOR%*: Datasets Definition file 'datasets.xml' . hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} . hdfs://bar:9000/app/daily-logs/${YEAR}/${MONTH}/${DAY} Coordinator application definition: datasets.xml ${coord:current( -(coord:hoursInDay(0) -1) )} ${coord:current(0)} ${coord:current(0)} hdfs://bar:9000/usr/tucu/logsaggretor-wf wfInput ${coord:dataIn('inputLogs')} wfOutput ${coord:dataOut('outputLogs')} In this example, each coordinator action will use as input events the the last 24 hourly instances of the 'hourlyLogs' dataset to create a 'dailyLogs' dataset instance. The =${coord:dataOut(String name)}= function enables the coordinator application to pass the URIs of the the dataset instance that will be created by the workflow job triggered by the coordinator action. For the =2009-01-01T24:00Z" run, the =${coord:dataOut('dailyLogs')}= function will resolve to: hdfs://bar:9000/app/logs/2009/01/02 NOTE: The use of =24:00= as hour is useful for human to denote end of the day, but internally Oozie handles it as the zero hour of the next day. The =${coord:dataOut('dailyLogs')}= is used for workflow job configuration property 'wfOutput' for the workflow job that will be submitted by the coordinator action on January 2nd 2009. Thus, when the workflow job gets started, the 'wfOutput' workflow job configuration property will contain the above URI. ---++++ 6.7.3. coord:nominalTime() EL Function The =${coord:nominalTime()}= EL function resolves to the coordinator action creation (materialization) datetime. The nominal times is always the coordinator job start datetime plus a multiple of the coordinator job frequency. This is, when the coordinator action was created based on driver event. For synchronous coordinator applications this would be every tick of the frequency. *%GREEN% Example: %ENDCOLOR%*: Coordinator application definition: hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:current(-23)} ${coord:current(0)} ... The nominal times for the coordinator actions of this coordinator application example are: 2009-01-02T00:00Z 2009-01-03T00:00Z 2009-01-04T00:00Z ... 2010-01-01T00:00Z These are the times the action where created (materialized). ---+++ 6.8. Parameterization of Coordinator Application This section describes the EL functions that could be used to parameterized both data-set and coordination application action. ---++++ 6.8.1. coord:dateOffset(String baseDate, int instance, String timeUnit) EL Function The =${coord:dateOffset(String baseDate, int instance, String timeUnit)}= EL function calculates date based on the following equaltion : =newDate = baseDate + instance, * timeUnit= For example, if baseDate is '2009-01-01T00:00Z', instance is '2' and timeUnit is 'MONTH', the return date will be' 2009-03-01T00:00Z'. *%GREEN% Example: %ENDCOLOR%*: ...... hdfs://bar:9000/usr/tucu/logsaggretor-wf nextInstance ${coord:dateOffset(coord:nominalTime(), 1, 'DAY'} previousInstance ${coord:dateOffset(coord:nominalTime(), -1, 'DAY'} In this example, the 'nextInstance' will be '2009-01-02T24:00Z' for the first action. And the value of 'previousInstance' will be '2008-12-31T24:00Z' for the same instance. ---++ 7. Handling Timezones and Daylight Saving Time As mentioned in section #4.1.1 'Timezones and Daylight-Saving', the coordinator engine works exclusively in UTC, and dataset and application definitions are always expressed in UTC. ---+++ 7.1. Handling Timezones with No Day Light Saving Time For timezones that don't observe day light saving time, handling timezones offsets is trivial. For these timezones, dataset and application definitions, it suffices to express datetimes taking into account the timezone offset. *%GREEN% Example: %ENDCOLOR%*: Coordinator application definition: A daily coordinator job for India timezone (+05:30) that consumes 24 hourly dataset instances from the previous day starting at the beginning of 2009 for a full year. hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:current(-23)} ${coord:current(0)} ... ---+++ 7.2. Handling Timezones with Daylight Saving Time Oozie Coordinator provides all the necessary functionality to write coordinator applications that work properly when data and processing spans across multiple timezones and different daylight saving rules. The following 2 use cases will be used to show how Oozie Coordinator built-in functionality can be used to handle such cases: 1 Process logs hourly data from the last day from US East-coast 1 Process logs hourly data from the last day from US East-coast and Continental Europe *1. Process logs hourly data from the last day from US East-coast:* hdfs://bar:9000/app/logs/eastcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:current( -(coord:hoursInDay(0) - 1) )} ${coord:current(0)} hdfs://bar:9000/usr/tucu/logsaggretor-wf wfInput ${coord:dataIn('EC')} Because the =${coord:days(1)}= EL function is used to specify the job frequency, each coordinator action will be materialized (created) at 00:00 EST5EDT regardless of timezone daylight-saving adjustments (05:00 UTC in Winter and 04:00 UTC in Summer) The =${coord:hoursInDay(-1)}= EL function will resolve to number of hours of the previous day taking into account daylight-saving changes if any. It will resolve to =24= (on regular days), =23= (on spring forward day) or =25= (on fall backward day). Because of the use of the =${coord:hoursInDay(-1)}= EL function, the dataset instances range resolves [-24 .. -1], [-23 .. -1] or [-25 .. -1]. Thus, they will resolve into the exact number of dataset instances for the day taking daylight-saving adjustments into account. Note that because the coordinator application and the dataset are in the same timezone, there is no need to do any hour offset corrections in the dataset instances being used as input for each coordinator action. *2. Process logs hourly data from the last day from US East-coast and the US West-coast:* hdfs://bar:9000/app/logs/eastcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} hdfs://bar:9000/app/logs/westcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:current( -(coord:hoursInDay(0) - 1) -3)} ${coord:current(-3)} $coord:{current(- (coord:hoursInDay(0) - 1) )} ${coord:current(0)} hdfs://bar:9000/usr/tucu/logsaggretor-wf wfInput ${coord:dataIn('EC')},${coord:dataIn('WC')} The additional complexity of this use case over the first use case is because the job and the datasets are not all in the same timezone. The corresponding timezone offset has to accounted for. As the use care requires to process all the daily data for the East coast and the West coast, the processing has to be adjusted to the West coast end of the day because the day there finished 3 hours later and processing will have to wait until then. The data input range for the East coast dataset must be adjusted (with -3) in order to take the data for the previous EST5EDT day. *3. Process logs hourly data from the last day from US East-coast and Continental Europe:* hdfs://bar:9000/app/logs/eastcoast/${YEAR}/${MONTH}/${DAY}/${HOUR} hdfs://bar:9000/app/logs/europe/${YEAR}/${MONTH}/${DAY}/${HOUR} ${coord:current( -(coord:hoursInDay(0) - 1) )} ${coord:current(-1)} ${coord:current( -(coord:hoursInDay(0) -1) - coord:tzOffset()/60)} ${coord:current( - coord:tzOffset()/60)} hdfs://bar:9000/usr/tucu/logsaggretor-wf wfInput ${coord:dataIn('EC')} The additional complexity of this use case over the second use case is because the timezones used for the job and the datasets do not follow the same daylight saving rules (Europe and the US apply the DST changes on different days). Because of this, the timezone offset between Europe and the US is not constant. To obtain the current timezone offset between the coordinator job and a dataset, the =${coord:tzOffset()}= EL function must be used. As the use care requires to process all the daily data for the East coast and the continental Europe, the processing happens on East coast time (thus having daily data already available for both Europe and the East coast). The data input range for the Europe dataset must be adjusted with the =${coord:tzOffset()}= EL function in order to take the data for the previous EST5EDT day. IMPORTANT: The =${coord:tzOffset()}= function returns the offset in minutes, and the datasets in the example are hourly datasets. Because of this, the offset must be divided by =60= to compute the instance offset. ---+++ 7.3. Timezone and Daylight Saving Tools The Coordinator engine should provide tools to help developers convert and compute UTC datetimes to timezone datetimes and to daylight saving aware timezones. ---++ 8. Operational Considerations ---+++ 8.1. Reprocessing * TBD ---++ 9. User Propagation When submitting a coordinator job, the configuration must contain a =user.name= property. If security is enabled, Oozie must ensure that the value of the =user.name= property in the configuration match the user credentials present in the protocol (web services) request. When submitting a coordinator job, the configuration may contain a =group.name= property. If security is enabled, Oozie must ensure that the user of the request belongs to the specified group. The specified user and group names are assigned to the created coordinator job. Oozie must propagate the specified user and group to the system executing the actions (workflow jobs). ---++ 10. Coordinator Application Deployment Coordinator applications consist exclusively of dataset definitions and coordinator application definitions. They must be installed in an HDFS directory. To submit a job for a coordinator application, the full HDFS path to coordinator application definition must be specified. ---+++ 10.1. Organizing Coordinator Applications The usage of Oozie Coordinator can be categorized in 3 different segments: * *Small:* consisting of a single coordinator application with embedded dataset definitions * *Medium:* consisting of a single shared dataset definitions and a few coordinator applications * *Large:* consisting of a single or multiple shared dataset definitions and several coordinator applications Systems that fall in the *medium* and (specially) in the *large* categories are usually referred as data pipeline systems. Oozie Coordinator definition XML schemas provide a convenient and flexible mechanism for all 3 systems categorization define above. For *small* systems: All dataset definitions and the coordinator application definition can be defined in a single XML file. The XML definition file is commonly in its own HDFS directory. For *medium* systems: A single datasets XML file defines all shared/public datasets. Each coordinator application has its own definition file, they may have embedded/private datasets and they may refer, via inclusion, to the shared datasets XML file. All the XML definition files are grouped in a single HDFS directory. For *large* systems: Multiple datasets XML file define all shared/public datasets. Each coordinator application has its own definition file, they may have embedded/private datasets and they may refer, via inclusion, to multiple shared datasets XML files. XML definition files are logically grouped in different HDFS directories. NOTE: Oozie Coordinator does not enforce any specific organization, grouping or naming for datasets and coordinator application definition files. The fact that each coordinator application is in a separate XML definition file simplifies coordinator job submission, monitoring and managing of jobs. Tools to support groups of jobs can be built on of the basic, per job, commands provided by the Oozie coordinator engine. ---++++ 10.1.1. Dataset Names Collision Resolution Embedded dataset definitions within a coordinator application cannot have the same name. Dataset definitions within a dataset definition XML file cannot have the same name. If a coordinator application includes one or more dataset definition XML files, there cannot be datasets with the same names in the 2 dataset definition XML files. If any of the dataset name collisions occurs the coordinator job submission must fail. If a coordinator application includes one or more dataset definition XML files and it has embedded dataset definitions, in case of dataset name collision between the included and the embedded definition files, the embedded dataset takes precedence over the included dataset. ---++ 11. Coordinator Job Submission When a coordinator job is submitted to Oozie Coordinator, the submitter must specified all the required job properties plus the HDFS path to the coordinator application definition for the job. The coordinator application definition HDFS path must be specified in the 'oozie.coord.application.path' job property. All the coordinator job properties, the HDFS path for the coordinator application, the 'user.name' and 'group.name' must be submitted to the Oozie coordinator engine using an XML configuration file (Hadoop XML configuration file). *%GREEN% Example: %ENDCOLOR%*: user.name tucu oozie.coord.application.path hdfs://foo:9000/user/tucu/myapps/hello-coord.xml ... ---++ 12. SLA Handling Oozie 2.0 is integrated with GMS (Grid Monitoring System). If you add *sla* tags to the Coordinator or Workflow XML files, then the SLA information will be propagated to the GMS system. ---+++ Coordinator SLA Example hdfs://bar:9000/app/logs/${YEAR}/${MONTH}/${DAY}/${HOUR}/data ${coord:current( -(coord:hoursInDay(0) - 1) )} ${coord:current(0)} hdfs://bar:9000/usr/tucu/hello-wf input ${coord:dataIn('input')} ${coord:nominalTime()} ${5 * MINUTES} ${55 * MINUTES} log processor run for: ${coord:nominalTime()} tucu@yahoo-inc.com abc@yahoo-inc.com abc@yahoo-inc.com abc@yahoo-inc.com application-a,application-b 99 ${24 * LAST_HOUR} ---+++ Workflow SLA Example ${jobtracker} ${namenode} mapred.input.dir ${input} mapred.output.dir /usr/foo/${wf:id()}/temp1 ${nominal-time} ${10 * MINUTES} ${30 * MINUTES} abc.grouper for input ${input} tucu@yahoo-inc.com abc@yahoo-inc.com abc@yahoo-inc.com abc@yahoo-inc.com applicaion-a,application-b 99 ${24 * LAST_HOUR} * TBD ---++ 13. Web Services API ---+++ 13.1 System Status *Request:* GET oozie/v1/admin/status *Response:* {"systemMode":"NORMAL"} ---+++ 13.2 List Jobs ---++++ Workflow Jobs *Request:* POST oozie/v1/jobs?jobtype=wf&len=50&offset=1 *Response:* { "total":2, "workflows":[ { "appPath":null, "status":"KILLED", "createdTime":"Tue, 27 Apr 2010 01:50:45 GMT", "conf":null, "lastModTime":"Tue, 27 Apr 2010 01:51:04 GMT", "endTime":"Tue, 27 Apr 2010 01:51:04 GMT", "run":0, "externalId":null, "appName":"map-reduce-wf", "id":"0000000-100426185037406-oozie-dani-W", "startTime":"Tue, 27 Apr 2010 01:50:47 GMT", "group":"users", "consoleUrl":"http:\/\/localhost:8080\/oozie?job=jobid1-W", "user":"danielwo", "actions":[ ] }, { "appPath":null, "status":"KILLED", "createdTime":"Mon, 26 Apr 2010 22:31:15 GMT", "conf":null, "lastModTime":"Wed, 28 Apr 2010 22:39:18 GMT", "endTime":"Wed, 28 Apr 2010 22:39:18 GMT", "run":0, "externalId":null, "appName":"map-reduce-wf", "id":"0000005-100426151754515-oozie-dani-W", "startTime":null, "group":"users", "consoleUrl":"http:\/\/localhost:8080\/oozie?job=jobd2-W", "user":"danielwo", "actions":[ ] } ], "len":50, "offset":1 } ---++++ Coordinator Jobs *Request:* POST oozie/v1/jobs?jobtype=coord&len=50&offset=1 *Response:* { "total":2, "coordinatorjobs":[ { "lastAction":"Sun, 01 Feb 2009 01:00:00 GMT", "coordJobName":"my_coord_job", "status":"SUCCEEDED", "coordJobPath":"hdfs:\/\/localhost:9000\/user\/danielwo\/coord", "timeZone":"UTC", "conf":null, "frequency":60, "endTime":"Sun, 01 Feb 2009 00:05:00 GMT", "executionPolicy":"LIFO", "startTime":"Sun, 01 Feb 2009 00:00:00 GMT", "timeOut":-1, "nextMaterializedTime":"Sun, 01 Feb 2009 01:00:00 GMT", "timeUnit":"MINUTE", "concurrency":-1, "coordJobId":"0000000-100426180048624-oozie-dani-C", "coordExternalId":null, "group":"users", "consoleUrl":null, "user":"danielwo", "actions":[ ] }, { "lastAction":"Sun, 01 Feb 2009 01:00:00 GMT", "coordJobName":"my_coord_job", "status":"SUCCEEDED", "coordJobPath":"hdfs:\/\/localhost:9000\/user\/danielwo\/coord", "timeZone":"UTC", "conf":null, "frequency":60, "endTime":"Sun, 01 Feb 2009 00:05:00 GMT", "executionPolicy":"LIFO", "startTime":"Sun, 01 Feb 2009 00:00:00 GMT", "timeOut":-1, "nextMaterializedTime":"Sun, 01 Feb 2009 01:00:00 GMT", "timeUnit":"MINUTE", "concurrency":-1, "coordJobId":"0000000-100426145525486-oozie-dani-C", "coordExternalId":null, "group":"users", "consoleUrl":null, "user":"danielwo", "actions":[ ] } ], "len":50, "offset":1 } ---+++ 13.3 Job Submission PUT oozie/v1/jobs?action=start ---+++ 13.4 Job Information ---++++ Workflow Job Information *Request:* GET oozie/v1/job/0000000-100426185037406-oozie-jobid-W?show=info&len=0&offset=0 *Response:* { "appPath":"hdfs:\/\/localhost:9000\/user\/danielwo\/workflow", "status":"KILLED", "createdTime":"Tue, 27 Apr 2010 01:50:45 GMT", "conf":"...<\/configuration>", "lastModTime":"Tue, 27 Apr 2010 01:51:04 GMT", "endTime":"Tue, 27 Apr 2010 01:51:04 GMT", "run":0, "externalId":null, "appName":"map-reduce-wf", "id":"0000000-100426185037406-oozie-dani-W", "startTime":"Tue, 27 Apr 2010 01:50:47 GMT", "group":"users", "consoleUrl":"http:\/\/localhost:8080\/oozie?job=...", "user":"danielwo", "actions":[ { "errorMessage":"Output directory already exists", "status":"ERROR", "data":null, "transition":"fail", "externalStatus":"FAILED\/KILLED", "conf":" ...<\/map-reduce>", "type":"map-reduce", "endTime":"Tue, 27 Apr 2010 01:51:04 GMT", "externalId":"job_201004261212_0025", "id":"0000000-100426185037406-oozie-dani-W@hadoop1", "startTime":"Tue, 27 Apr 2010 01:50:49 GMT", "name":"hadoop1", "errorCode":"JA018", "retries":0, "trackerUri":"localhost:9001", "consoleUrl":"http:\/\/localhost:50030\/jobdetails.jsp?jobid=..." } ] } ---++++ Coordinator Job Information *Request:* GET oozie/v1/job/0000000-100426185037406-oozie-jobid-C?show=info&len=0&offset=0 *Response:* { "lastAction":null, "coordJobName":"my_coord_job", "status":"SUCCEEDED", "coordJobPath":"hdfs:\/\/localhost:9000\/user\/danielwo\/coord", "timeZone":"UTC", "conf":"...<\/configuration>", "frequency":60, "endTime":null, "executionPolicy":"LIFO", "startTime":null, "timeOut":-1, "nextMaterializedTime":null, "timeUnit":"MINUTE", "concurrency":-1, "coordJobId":"0000000-100426180048624-oozie-dani-C", "coordExternalId":null, "group":"users", "consoleUrl":null, "user":"danielwo", "actions":[ { "errorMessage":null, "lastModifiedTime":"Tue, 27 Apr 2010 01:01:30 GMT", "createdTime":"Tue, 27 Apr 2010 01:01:12 GMT", "status":"KILLED", "externalStatus":null, "type":null, "externalId":"0000001-100426180048624-oozie-dani-W", "id":"0000000-100426180048624-oozie-dani-C@1", "createdConf":"...<\/configuration>", "actionNumber":1, "errorCode":null, "trackerUri":null, "coordJobId":"0000000-100426180048624-oozie-dani-C", "consoleUrl":null } ] } ---+++ 13.5 Job Definition GET oozie/v1/job/0000005-100426151754515-oozie-jobid-W?show=definition ---+++ 13.6 Job Log GET oozie/v1/job/0000005-100426151754515-oozie-jobid-W?show=log ---+++ 13.7 KIll a Job PUT oozie/v1/job/0000005-100426151754515-oozie-jobid-W?action=kill ---++ 14. Coordinator Rerun ---+++ Rerunning a Coordinator Action or Multiple Actions Example: $oozie job -rerun [-nocleanup] [-refresh] [-action 1, 3-4, 7-40] (-action or -date is required to rerun.) [-date 2009-01-01T01:00Z::2009-05-31T23:59Z, 2009-11-10T01:00Z, 2009-12-31T22:00Z] (if neither -action nor -date is given, the exception will be thrown.) The =rerun= option reruns a terminated (=TIMEDOUT=, =SUCCEEDED=, =KILLED=, =FAILED=) coordiantor action when coordiator job is not in =FAILED= or =KILLED= state. After the command is executed the rerun coordiator action will be in =WAITING= status. Refer to the [[DG_CoordinatorRerun][Rerunning Coordinator Actions]] for details on rerun. ---++ Appendixes ---+++ Appendix A, Oozie Coordinator XML-Schema ---++++ Oozie Coordinator Schema ---++++ Oozie SLA Schema [[index][::Go back to Oozie Documentation Index::]]