---++ Contents * Cluster Specification * Feed Specification * Process Specification ---++ Cluster Specification The cluster XSD specification is available here: A cluster contains different interfaces which are used by Falcon like readonly, write, workflow and messaging. A cluster is referenced by feeds and processes which are on-boarded to Falcon by its name. Following are the tags defined in a cluster.xml: The colo specifies the colo to which this cluster belongs to and name is the name of the cluster which has to be unique. ---+++ Interfaces A cluster has various interfaces as described below: A readonly interface specifies the endpoint for Hadoop's HFTP protocol, this would be used in the context of feed replication. A write interface specifies the interface to write to hdfs, it's endpoint is the value of fs.defaultFS. Falcon uses this interface to write system data to hdfs and feeds referencing this cluster are written to hdfs using the same write interface. An execute interface specifies the interface for job tracker, it's endpoint is the value of mapreduce.jobtracker.address. Falcon uses this interface to submit the processes as jobs on !JobTracker defined here. A workflow interface specifies the interface for workflow engine, example of its endpoint is the value for OOZIE_URL. Falcon uses this interface to schedule the processes referencing this cluster on workflow engine defined here. A registry interface specifies the interface for metadata catalog, such as Hive Metastore (or HCatalog). Falcon uses this interface to register/de-register partitions for a given database and table. Also, uses this information to schedule data availability events based on partitions in the workflow engine. Although Hive metastore supports both RPC and HTTP, Falcon comes with an implementation for RPC over thrift. A messaging interface specifies the interface for sending feed availability messages, it's endpoint is broker url with tcp address. ---+++ Locations A cluster has a list of locations defined: Location has the name and the path, name is the type of locations .Allowed values of name are staging, temp and working. Path is the hdfs path for each location. Falcon would use the location to do intermediate processing of entities in hdfs and hence Falcon should have read/write/execute permission on these locations. These locations MUST be created prior to submitting a cluster entity to Falcon. *staging* should have 777 permissions and is a mandatory location .The parent dirs must have execute permissions so multiple users can write to this location. *working* must have 755 permissions and is a optional location. If *working* is not specified, falcon creates a sub directory in the *staging* location with 755 perms. The parent dir for *working* must have execute permissions so multiple users can read from this location ---+++ ACL A cluster has ACL (Access Control List) useful for implementing permission requirements and provide a way to set different permissions for specific users or named groups. ACL indicates the Access control list for this cluster. owner is the Owner of this entity. group is the one which has access to read. permission indicates the permission. ---+++ Custom Properties A cluster has a list of properties: A key-value pair, which are propagated to the workflow engine. Ideally JMS impl class name of messaging engine (brokerImplClass) should be defined here. ---++ Feed Specification The Feed XSD specification is available here. A Feed defines various attributes of feed like feed location, frequency, late-arrival handling and retention policies. A feed can be scheduled on a cluster, once a feed is scheduled its retention and replication process are triggered in a given cluster. A feed should have a unique name and this name is referenced by processes as input or output feed. ---+++ Storage Falcon introduces a new abstraction to encapsulate the storage for a given feed which can either be expressed as a path on the file system, File System Storage or a table in a catalog such as Hive, Catalog Storage. Feed should contain one of the two storage options. Locations on File System or Table in a Catalog. ---++++ File System Storage ..... more clusters Feed references a cluster by it's name, before submitting a feed all the referenced cluster should be submitted to Falcon. type: specifies whether the referenced cluster should be treated as a source or target for a feed. A feed can have multiple source and target clusters. If the type of cluster is not specified then the cluster is not considered for replication. Validity of a feed on cluster specifies duration for which this feed is valid on this cluster. Retention specifies how long the feed is retained on this cluster and the action to be taken on the feed after the expiry of retention period. The retention limit is specified by expression frequency(times), ex: if feed should be retained for at least 6 hours then retention's limit="hours(6)". The field partitionExp contains partition tags. Number of partition tags has to be equal to number of partitions specified in feed schema. A partition tag can be a wildcard(*), a static string or an expression. Atleast one of the strings has to be an expression. sla specifies sla for the feed on this cluster. This is an optional parameter and sla can be same or different from the global sla tag (mentioned outside the clusters tag ). This tag provides the user to flexibility to have different sla for different clusters e.g. in case of replication. If this attribute is missing then the default global sla is picked from the feed definition. Location specifies where the feed is available on this cluster. This is an optional parameter and path can be same or different from the global locations tag value ( it is mentioned outside the clusters tag ) . This tag provides the user to flexibility to have feed at different locations on different clusters. If this attribute is missing then the default global location is picked from the feed definition. Also the individual location tags data, stats, meta are optional. A location tag specifies the type of location like data, meta, stats and the corresponding paths for them. A feed should at least define the location for type data, which specifies the HDFS path pattern where the feed is generated periodically. ex: type="data" path="/projects/TrafficHourly/${YEAR}-${MONTH}-${DAY}/traffic" The granularity of date pattern in the path should be at least that of a frequency of a feed. Other location type which are supported are stats and meta paths, if a process references a feed then the meta and stats paths are available as a property in a process. ---++++ Catalog Storage (Table) A table tag specifies the table URI in the catalog registry as: catalog:$database-name:$table-name#partition-key=partition-value);partition-key=partition-value);* This is modeled as a URI (similar to an ISBN URI). It does not have any reference to Hive or HCatalog. Its quite generic so it can be tied to other implementations of a catalog registry. The catalog implementation specified in the startup config provides implementation for the catalog URI. Top-level partition has to be a dated pattern and the granularity of date pattern should be at least that of a frequency of a feed. catalog specifies the uri of a Hive table along with the partition spec. uri="catalog:$database:$table#(partition-key=partition-value);+" Example: catalog:logs-db:clicks#ds=${YEAR}-${MONTH}-${DAY} Examples:
---+++ Partitions A feed can define multiple partitions, if a referenced cluster defines partitions then the number of partitions in feed has to be equal to or more than the cluster partitions. *Note:* This will only apply for !FileSystem storage but not Table storage as partitions are defined and maintained in Hive (HCatalog) registry. ---+++ Groups online,bi A feed specifies a list of comma separated groups, a group is a logical grouping of feeds and a group is said to be available if all the feeds belonging to a group are available. The frequency of all the feed which belong to the same group must be same. ---+++ Availability Flags _SUCCESS An availabilityFlag specifies the name of a file which when present/created in a feeds data directory, the feed is termed as available. ex: _SUCCESS, if this element is ignored then Falcon would consider the presence of feed's data directory as feed availability. ---+++ Frequency minutes(20) A feed has a frequency which specifies the frequency by which this feed is generated. ex: it can be generated every hour, every 5 minutes, daily, weekly etc. valid frequency type for a feed are minutes, hours, days, months. The values can be negative, zero or positive. ---+++ SLA A feed can have SLA and each SLA has two properties - slaLow and slaHigh. Both slaLow and slaHigh are written using expressions like frequency. slaLow is intended to serve for alerting for feed instances which are in danger of missing their availability SLAs. slaHigh is intended to serve for reporting the feeds which missed their SLAs. SLAs are relative to feed instance time. ---+++ Late Arrival A late-arrival specifies the cut-off period till which the feed is expected to arrive late and should be honored be processes referring to it as input feed by rerunning the instances in case the data arrives late with in a cut-off period. The cut-off period is specified by expression frequency(times), ex: if the feed can arrive late upto 8 hours then late-arrival's cut-off="hours(8)" *Note:* This will only apply for !FileSystem storage but not Table storage until a future time. ---+++ ACL A feed has ACL (Access Control List) useful for implementing permission requirements and provide a way to set different permissions for specific users or named groups. ACL indicates the Access control list for this cluster. owner is the Owner of this entity. group is the one which has access to read. permission indicates the permission. ---+++ Custom Properties A key-value pair, which are propagated to the workflow engine. "queueName" and "jobPriority" are special properties available to user to specify the Hadoop job queue and priority, the same values are used by Falcon's launcher job. "timeout", "parallel" and "order" are other special properties which decides replication instance's timeout value while waiting for the feed instance, parallel decides the concurrent replication instances that can run at any given time and order decides the execution order for replication instances like FIFO, LIFO and LAST_ONLY. "maxMaps" represents the maximum number of maps used during replication. "mapBandwidth" represents the bandwidth in MB/s used by each mapper during replication. ---++ Process Specification A process defines configuration for a workflow. A workflow is a directed acyclic graph(DAG) which defines the job for the workflow engine. A process definition defines the configurations required to run the workflow job. For example, process defines the frequency at which the workflow should run, the clusters on which the workflow should run, the inputs and outputs for the workflow, how the workflow failures should be handled, how the late inputs should be handled and so on. The different details of process are: ---+++ Name Each process is identified with a unique name. Syntax: ... ---+++ Tags An optional list of comma separated tags which are used for classification of processes. Syntax: ... consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting ---+++ Pipelines An optional list of comma separated word strings, specifies the data processing pipeline(s) to which this process belongs. Only letters, numbers and underscore are allowed for pipeline string. Syntax: ... test_Pipeline, dataReplication, clickStream_pipeline ---+++ Cluster The cluster on which the workflow should run. A process should contain one or more clusters. Cluster definition for the cluster name gives the end points for workflow execution, name node, job tracker, messaging and so on. Each cluster inturn has validity mentioned, which tell the times between which the job should run on that specified cluster. Syntax: ... .... .... ... ---+++ Parallel Parallel defines how many instances of the workflow can run concurrently. It should be a positive integer > 0. For example, parallel of 1 ensures that only one instance of the workflow can run at a time. The next instance will start only after the running instance completes. Syntax: ... [parallel] ... ---+++ Order Order defines the order in which the ready instances are picked up. The possible values are FIFO(First In First Out), LIFO(Last In First Out), and ONLYLAST(Last Only). Syntax: ... [order] ... ---+++ Timeout A optional Timeout specifies the maximum time an instance waits for a dataset before being killed by the workflow engine, a time out is specified like frequency. If timeout is not specified, falcon computes a default timeout for a process based on its frequency, which is six times of the frequency of process or 30 minutes if computed timeout is less than 30 minutes. ... [timeunit]([frequency]) ... ---+++ Frequency Frequency defines how frequently the workflow job should run. For example, hours(1) defines the frequency as hourly, days(7) defines weekly frequency. The values for timeunit can be minutes/hours/days/months and the frequency number should be a positive integer > 0. Syntax: ... [timeunit]([frequency]) ... ---+++ SLA A process can have SLA which is defined by 2 optional attributes - shouldStartIn and shouldEndIn. All the attributes are written using expressions like frequency. shouldStartIn is the time by which the process should have started. shouldEndIn is the time by which the process should have finished. ---+++ Validity Validity defines how long the workflow should run. It has 3 components - start time, end time and timezone. Start time and end time are timestamps defined in yyyy-MM-dd'T'HH:mm'Z' format and should always be in UTC. Timezone is used to compute the next instances starting from start time. The workflow will start at start time and end before end time specified on a given cluster. So, there will not be a workflow instance at end time. Syntax: ... ... Examples: ... days(1) ... The daily workflow will start on Jan 1st 2012 at 00:40 UTC, it will run at 40th minute of every hour and the last instance will be at March 31st 2012 at 23:40 UTC. ... hours(1) ... The hourly workflow will start on March 11th 2012 at 00:40 PST, the next instances will be at 01:40 PST, 03:40 PDT, 04:40 PDT and so on till 23:40 PDT. So, there will be just 23 instances of the workflow for March 11th 2012 because of DST switch. ---+++ Inputs Inputs define the input data for the workflow. The workflow job will start executing only after the schedule time and when all the inputs are available. There can be 0 or more inputs and each of the input maps to a feed. The path and frequency of input data is picked up from feed definition. Each input should also define start and end instances in terms of [[FalconDocumentation][EL expressions]] and can optionally specify specific partition of input that the workflow requires. The components in partition should be subset of partitions defined in the feed. For each input, Falcon will create a property with the input name that contains the comma separated list of input paths. This property can be used in workflow actions like pig scripts and so on. Syntax: ... ... ... Example: ... hours(1) ... ... ... ... ... The input for the workflow is a hourly feed and takes 0th and 1st hour data of today(the day when the workflow runs). If the workflow is running for 2012-03-01T06:40Z, the inputs are /projects/bootcamp/feed1/2012-03-01-00/*/US and /projects/bootcamp/feed1/2012-03-01-01/*/US. The property for this input is input1=/projects/bootcamp/feed1/2012-03-01-00/*/US,/projects/bootcamp/feed1/2012-03-01-01/*/US Also, feeds with Hive table storage can be used as inputs to a process. Several parameters from inputs are passed as params to the user workflow or pig script. ${wf:conf('falcon_input_database')} - database name associated with the feed for a given input ${wf:conf('falcon_input_table')} - table name associated with the feed for a given input ${wf:conf('falcon_input_catalog_url')} - Hive metastore URI for this input feed ${wf:conf('falcon_input_partition_filter_pig')} - value of ${coord:dataInPartitionFilter('$input', 'pig')} ${wf:conf('falcon_input_partition_filter_hive')} - value of ${coord:dataInPartitionFilter('$input', 'hive')} ${wf:conf('falcon_input_partition_filter_java')} - value of ${coord:dataInPartitionFilter('$input', 'java')} *NOTE:* input is the name of the input configured in the process, which is input.getName(). Example workflow configuration: falcon_input_database falcon_db falcon_input_table input_table falcon_input_catalog_url thrift://localhost:29083 falcon_input_storage_type TABLE feedInstancePaths hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00 falcon_input_partition_filter_java (ds='2012-04-21-00') falcon_input_partition_filter_hive (ds='2012-04-21-00') falcon_input_partition_filter_pig (ds=='2012-04-21-00') ... ---+++ Optional Inputs User can mention one or more inputs as optional inputs. In such cases the job does not wait on those inputs which are mentioned as optional. If they are present it considers them otherwise continue with the compulsory ones. Example: ... hours(1) ... ... ... ... ... *Note:* This is only supported for !FileSystem storage but not Table storage at this point. ---+++ Outputs Outputs define the output data that is generated by the workflow. A process can define 0 or more outputs. Each output is mapped to a feed and the output path is picked up from feed definition. The output instance that should be generated is specified in terms of [[FalconDocumentation][EL expression]]. For each output, Falcon creates a property with output name that contains the path of output data. This can be used in workflows to store in the path. Syntax: ... ... ... Example: ... days(1) ... ... ... ... ... The output of the workflow is feed instance for today. If the workflow is running for 2012-03-01T06:40Z, the workflow generates output /projects/bootcamp/feed2/2012-03-01. The property for this output that is available for workflow is: output1=/projects/bootcamp/feed2/2012-03-01 Also, feeds with Hive table storage can be used as outputs to a process. Several parameters from outputs are passed as params to the user workflow or pig script. ${wf:conf('falcon_output_database')} - database name associated with the feed for a given output ${wf:conf('falcon_output_table')} - table name associated with the feed for a given output ${wf:conf('falcon_output_catalog_url')} - Hive metastore URI for the given output feed ${wf:conf('falcon_output_dataout_partitions')} - value of ${coord:dataOutPartitions('$output')} *NOTE:* output is the name of the output configured in the process, which is output.getName(). Example workflow configuration: falcon_output_database falcon_db falcon_output_table output_table falcon_output_catalog_url thrift://localhost:29083 falcon_output_storage_type TABLE feedInstancePaths hcat://localhost:29083/falcon_db/output_table/ds=2012-04-21-00 falcon_output_dataout_partitions 'ds=2012-04-21-00' .... ---+++ Custom Properties The properties are key value pairs that are passed to the workflow. These properties are optional and can be used in workflow to parameterize the workflow. Syntax: ... ... ... queueName and jobPriority are special properties, which when present are used by the Falcon's launcher job, the same property is also available in workflow which can be used to propagate to pig or M/R job. ---+++ Workflow The workflow defines the workflow engine that should be used and the path to the workflow on hdfs. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path. The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also be available for the workflow. There are 2 engines supported today. ---++++ Oozie As part of oozie workflow engine support, users can embed a oozie workflow. Refer to oozie [[http://oozie.apache.org/docs/4.0.1/DG_Overview.html][workflow overview]] and [[http://oozie.apache.org/docs/4.0.1/WorkflowFunctionalSpec.html][workflow specification]] for details. Syntax: ... ... Example: ... ... This defines the workflow engine to be oozie and the workflow xml is defined at /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib. ---++++ Pig Falcon also adds the Pig engine which enables users to embed a Pig script as a process. Example: ... ... This defines the workflow engine to be pig and the pig script is defined at /projects/bootcamp/pig.script. Feeds with Hive table storage will send one more parameter apart from the general ones: $input_filter ---++++ Hive Falcon also adds the Hive engine as part of Hive Integration which enables users to embed a Hive script as a process. This would enable users to create materialized queries in a declarative way. Example: ... ... This defines the workflow engine to be hive and the hive script is defined at /projects/bootcamp/hive-script.hql. Feeds with Hive table storage will send one more parameter apart from the general ones: $input_filter ---+++ Retry Retry policy defines how the workflow failures should be handled. Two retry policies are defined: backoff and exp-backoff(exponential backoff). Depending on the delay and number of attempts, the workflow is re-tried after specific intervals. Syntax: ... ... Examples: ... ... The workflow is re-tried after 10 mins, 20 mins and 30 mins. With exponential backoff, the workflow will be re-tried after 10 mins, 20 mins and 40 mins. ---+++ Late data Late data handling defines how the late data should be handled. Each feed is defined with a late cut-off value which specifies the time till which late data is valid. For example, late cut-off of hours(6) means that data for nth hour can get delayed by upto 6 hours. Late data specification in process defines how this late data is handled. Late data policy defines how frequently check is done to detect late data. The policies supported are: backoff, exp-backoff(exponention backoff) and final(at feed's late cut-off). The policy along with delay defines the interval at which late data check is done. Late input specification for each input defines the workflow that should run when late data is detected for that input. Syntax: ... ... ... Example: ... hours(1) ... ... ... ... ... This late handling specifies that late data detection should run at feed's late cut-off which is 6 hours in this case. If there is late data, Falcon should run the workflow specified at /projects/bootcamp/workflow/lateinput1/workflow.xml *Note:* This is only supported for !FileSystem storage but not Table storage at this point. ---+++ ACL A process has ACL (Access Control List) useful for implementing permission requirements and provide a way to set different permissions for specific users or named groups. ACL indicates the Access control list for this cluster. owner is the Owner of this entity. group is the one which has access to read. permission indicates the permission.