Onboarding Steps

  • Create cluster definition for the cluster, specifying name node, job tracker, workflow engine endpoint, messaging endpoint. Refer to cluster definition for details.
  • Create Feed definitions for each of the input and output specifying frequency, data path, ownership. Refer to feed definition for details.
  • Create Process definition for your job. Process defines configuration for the workflow job. Important attributes are frequency, inputs/outputs and workflow path. Refer to process definition for process details.
  • Define workflow for your job using the workflow engine(only oozie is supported as of now). Refer Oozie Workflow Specification. The libraries required for the workflow should be available in lib folder in workflow path.
  • Set-up workflow definition, libraries and referenced scripts on hadoop.
  • Submit cluster definition
  • Submit and schedule feed and process definitions

Sample Pipeline

Cluster

Cluster definition that contains end points for name node, job tracker, oozie and jms server:

<?xml version="1.0"?>
<!--
    Cluster configuration
  -->
<cluster colo="ua2" description="" name="corp" xmlns="uri:falcon:cluster:0.1"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">    
    <interfaces>
        <interface type="readonly" endpoint="hftp://name-node.com:50070" version="0.20.2-cdh3u0" />

        <interface type="write" endpoint="hdfs://name-node.com:54310" version="0.20.2-cdh3u0" />

        <interface type="execute" endpoint="job-tracker:54311" version="0.20.2-cdh3u0" />

        <interface type="workflow" endpoint="http://oozie.com:11000/oozie/" version="3.1.4" />

        <interface type="messaging" endpoint="tcp://jms-server.com:61616?daemon=true" version="5.1.6" />
    </interfaces>

    <locations>
        <location name="staging" path="/projects/falcon/staging" />
        <location name="temp" path="/tmp" />
        <location name="working" path="/projects/falcon/working" />
    </locations>
</cluster>

Input Feed

Hourly feed that defines feed path, frequency, ownership and validity:

<?xml version="1.0" encoding="UTF-8"?>
<!--
    Hourly sample input data
  -->

<feed description="sample input data" name="SampleInput" xmlns="uri:falcon:feed:0.1"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <groups>group</groups>

    <frequency>hours(1)</frequency>

    <late-arrival cut-off="hours(6)" />

    <clusters>
        <cluster name="corp" type="source">
            <validity start="2009-01-01T00:00Z" end="2099-12-31T00:00Z" timezone="UTC" />
            <retention limit="months(24)" action="delete" />
        </cluster>
    </clusters>

    <locations>
        <location type="data" path="/projects/bootcamp/data/${YEAR}-${MONTH}-${DAY}-${HOUR}/SampleInput" />
        <location type="stats" path="/projects/bootcamp/stats/SampleInput" />
        <location type="meta" path="/projects/bootcamp/meta/SampleInput" />
    </locations>

    <ACL owner="suser" group="users" permission="0755" />

    <schema location="/none" provider="none" />
</feed>

Output Feed

Daily feed that defines feed path, frequency, ownership and validity:

<?xml version="1.0" encoding="UTF-8"?>
<!--
    Daily sample output data
  -->

<feed description="sample output data" name="SampleOutput" xmlns="uri:falcon:feed:0.1"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <groups>group</groups>

    <frequency>days(1)</frequency>

    <late-arrival cut-off="hours(6)" />

    <clusters>
        <cluster name="corp" type="source">
            <validity start="2009-01-01T00:00Z" end="2099-12-31T00:00Z" timezone="UTC" />
            <retention limit="months(24)" action="delete" />
        </cluster>
    </clusters>

    <locations>
        <location type="data" path="/projects/bootcamp/output/${YEAR}-${MONTH}-${DAY}/SampleOutput" />
        <location type="stats" path="/projects/bootcamp/stats/SampleOutput" />
        <location type="meta" path="/projects/bootcamp/meta/SampleOutput" />
    </locations>

    <ACL owner="suser" group="users" permission="0755" />

    <schema location="/none" provider="none" />
</feed>

Process

Sample process which runs daily at 6th hour on corp cluster. It takes one input - SampleInput for the previous day(24 instances). It generates one output - SampleOutput for previous day. The workflow is defined at /projects/bootcamp/workflow/workflow.xml. Any libraries available for the workflow should be at /projects/bootcamp/workflow/lib. The process also defines properties queueName, ssh.host, and fileTimestamp which are passed to the workflow. In addition, Falcon exposes the following properties to the workflow: nameNode, jobTracker(hadoop properties), input and output(Input/Output properties).

<?xml version="1.0" encoding="UTF-8"?>
<!--
    Daily sample process. Runs at 6th hour every day. Input - last day's hourly data. Generates output for yesterday
 -->
<process name="SampleProcess">
    <cluster name="corp" />

    <frequency>days(1)</frequency>

    <validity start="2012-04-03T06:00Z" end="2022-12-30T00:00Z" timezone="UTC" />

    <inputs>
        <input name="input" feed="SampleInput" start="yesterday(0,0)" end="today(-1,0)" />
    </inputs>

    <outputs>
            <output name="output" feed="SampleOutput" instance="yesterday(0,0)" />
    </outputs>

    <properties>
        <property name="queueName" value="reports" />
        <property name="ssh.host" value="host.com" />
        <property name="fileTimestamp" value="${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}" />
    </properties>

    <workflow engine="oozie" path="/projects/bootcamp/workflow" />

    <retry policy="backoff" delay="minutes(5)" attempts="3" />
    
    <late-process policy="exp-backoff" delay="hours(1)">
        <late-input input="input" workflow-path="/projects/bootcamp/workflow/lateinput" />
    </late-process>
</process>

Oozie Workflow

The sample user workflow contains 3 actions:

  • Pig action - Executes pig script /projects/bootcamp/workflow/script.pig
  • concatenator - Java action that concatenates part files and generates a single file
  • file upload - ssh action that gets the concatenated file from hadoop and sends the file to a remote host
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sample-wf">
        <start to="pig" />

        <action name="pig">
                <pig>
                        <job-tracker>${jobTracker}</job-tracker>
                        <name-node>${nameNode}</name-node>
                        <prepare>
                                <delete path="${output}"/>
                        </prepare>
                        <configuration>
                                <property>
                                        <name>mapred.job.queue.name</name>
                                        <value>${queueName}</value>
                                </property>
                                <property>
                                        <name>mapreduce.fileoutputcommitter.marksuccessfuljobs</name>
                                        <value>true</value>
                                </property>
                        </configuration>
                        <script>${nameNode}/projects/bootcamp/workflow/script.pig</script>
                        <param>input=${input}</param>
                        <param>output=${output}</param>
                        <file>lib/dependent.jar</file>
                </pig>
                <ok to="concatenator" />
                <error to="fail" />
        </action>

        <action name="concatenator">
                <java>
                        <job-tracker>${jobTracker}</job-tracker>
                        <name-node>${nameNode}</name-node>
                        <prepare>
                                <delete path="${nameNode}/projects/bootcamp/concat/data-${fileTimestamp}.csv"/>
                        </prepare>
                        <configuration>
                                <property>
                                        <name>mapred.job.queue.name</name>
                                        <value>${queueName}</value>
                                </property>
                        </configuration>
                        <main-class>com.wf.Concatenator</main-class>
                        <arg>${output}</arg>
                        <arg>${nameNode}/projects/bootcamp/concat/data-${fileTimestamp}.csv</arg>
                </java>
                <ok to="fileupload" />
                <error to="fail"/>
        </action>
                        
        <action name="fileupload">
                <ssh>
                        <host>localhost</host>
                        <command>/tmp/fileupload.sh</command>
                        <args>${nameNode}/projects/bootcamp/concat/data-${fileTimestamp}.csv</args>
                        <args>${wf:conf("ssh.host")}</args>
                        <capture-output/>
                </ssh>
                <ok to="fileUploadDecision" />
                <error to="fail"/>
        </action>

        <decision name="fileUploadDecision">
                <switch>
                        <case to="end">
                                ${wf:actionData('fileupload')['output'] == '0'}
                        </case>
                        <default to="fail"/>
                </switch>
        </decision>

        <kill name="fail">
                <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
        </kill>

        <end name="end" />
</workflow-app>

File Upload Script

The script gets the file from hadoop, rsyncs the file to /tmp on remote host and deletes the file from hadoop

#!/bin/bash

trap 'echo "output=$?"; exit $?' ERR INT TERM

echo "Arguments: $@"
SRCFILE=$1
DESTHOST=$3

FILENAME=`basename $SRCFILE`
rm -f /tmp/$FILENAME
hadoop fs -copyToLocal $SRCFILE /tmp/
echo "Copied $SRCFILE to /tmp"

rsync -ztv --rsh=ssh --stats /tmp/$FILENAME $DESTHOST:/tmp
echo "rsynced $FILENAME to $DESTUSER@$DESTHOST:$DESTFILE"

hadoop fs -rmr $SRCFILE
echo "Deleted $SRCFILE"

rm -f /tmp/$FILENAME
echo "output=0"