Hive Integration

Overview

Falcon provides data management functions for feeds declaratively. It allows users to represent feed locations as time-based partition directories on HDFS containing files.

Hive provides a simple and familiar database like tabular model of data management to its users, which are backed by HDFS. It supports two classes of tables, managed tables and external tables.

Falcon allows users to represent feed location as Hive tables. Falcon supports both managed and external tables and provide data management services for tables such as replication, eviction, archival, etc. Falcon will notify HCatalog as a side effect of either acquiring, replicating or evicting a data set instance and adds the missing capability of HCatalog table replication.

In the near future, Falcon will allow users to express pipeline processing in Hive scripts apart from Pig and Oozie workflows.

Assumptions

  • Date is a mandatory first-level partition for Hive tables
    • Data availability triggers are based on date pattern in Oozie
  • Tables must be created in Hive prior to adding it as a Feed in Falcon.
    • Duplicating this in Falcon will create confusion on the real source of truth. Also propagating schema changes
between systems is a hard problem.
  • Falcon does not know about the encoding of the data and data should be in HCatalog supported format.

Configuration

Falcon provides a system level option to enable Hive integration. Falcon must be configured with an implementation for the catalog registry. The default implementation for Hive is shipped with Falcon.

catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService

Incompatible changes

Falcon depends heavily on data-availability triggers for scheduling Falcon workflows. Oozie must support data-availability triggers based on HCatalog partition availability. This is only available in oozie 4.x.

Hence, Falcon for Hive support needs Oozie 4.x.

Oozie Shared Library setup

Falcon post Hive integration depends heavily on the shared library feature of Oozie. Since the sheer number of jars for HCatalog, Pig and Hive are in the many 10s in numbers, its quite daunting to redistribute the dependent jars from Falcon.

This is a one time effort in Oozie setup and is quite straightforward.

Approach

Entity Changes

  • Cluster DSL will have an additional registry-interface section, specifying the endpoint for the
HCatalog server. If this is absent, no HCatalog publication will be done from Falcon for this cluster.
thrift://hcatalog-server:port

  • Feed DSL will allow users to specify the URI (location) for HCatalog tables as:
catalog:database_name:table_name#partitions(key=value?)*

  • Failure to publish to HCatalog will be retried (configurable # of retires) with back off. Permanent failures
after all the retries are exhausted will fail the Falcon workflow

Eviction

  • Falcon will construct DDL statements to filter candidate partitions eligible for eviction drop partitions
  • Falcon will construct DDL statements to drop the eligible partitions
  • Additionally, Falcon will nuke the data on HDFS for external tables

Replication

  • Falcon will use HCatalog (Hive) API to export the data for a given table and the partition,
which will result in a data collection that includes metadata on the data's storage format, the schema, how the data is sorted, what table the data came from, and values of any partition keys from that table.
  • Falcon will use discp tool to copy the exported data collection into the secondary cluster into a staging
directory used by Falcon.
  • Falcon will then import the data into HCatalog (Hive) using the HCatalog (Hive) API. If the specified table does
not yet exist, Falcon will create it, using the information in the imported metadata to set defaults for the table such as schema, storage format, etc.
  • The partition is not complete and hence not visible to users until all the data is committed on the secondary
cluster, (no dirty reads)
  • Data collection is staged by Falcon and retries for copy continues from where it left off.
  • Failure to register with Hive will be retired. After all the attempts are exhausted,
the data will be cleaned up by Falcon.

Security

The user owns all data managed by Falcon. Falcon runs as the user who submitted the feed. Falcon will authenticate with HCatalog as the end user who owns the entity and the data.

For Hive managed tables, the table may be owned by the end user or “hive”. For “hive” owned tables, user will have to configure the feed as “hive”.

Load on HCatalog from Falcon

It generally depends on the frequency of the feeds configured in Falcon and how often data is ingested, replicated, or processed.

User Impact

  • There should not be any impact to user due to this integration
  • Falcon will be fully backwards compatible
  • Users have a choice to either choose storage based on files on HDFS as they do today or use HCatalog for
accessing the data in tables

Known Limitations

Oozie

  • Falcon with Hadoop 1.x requires copying guava jars manually to sharelib in oozie. Hadoop 2.x ships this.
  • hcatalog-pig-adapter needs to be copied manually to oozie sharelib.
bin/hadoop dfs -copyFromLocal $LFS/share/lib/hcatalog/hcatalog-pig-adapter-0.5.0-incubating.jar share/lib/hcatalog

  • Oozie 4.x with Hadoop-2.x
Replication jobs are submitted to oozie on the destination cluster. Oozie runs a table export job on RM on source cluster. Oozie server on the target cluster must be configured with source hadoop configs else jobs fail with errors on secure and non-secure clusters as below:
org.apache.hadoop.security.token.SecretManager$InvalidToken: Password not found for ApplicationAttempt appattempt_1395965672651_0010_000002

Make sure all oozie servers that falcon talks to has the hadoop configs configured in oozie-site.xml

<property>
      <name>oozie.service.HadoopAccessorService.hadoop.configurations</name>
      <value>*=/etc/hadoop/conf,arpit-new-falcon-1.cs1cloud.internal:8020=/etc/hadoop-1,arpit-new-falcon-1.cs1cloud.internal:8032=/etc/hadoop-1,arpit-new-falcon-2.cs1cloud.internal:8020=/etc/hadoop-2,arpit-new-falcon-2.cs1cloud.internal:8032=/etc/hadoop-2,arpit-new-falcon-5.cs1cloud.internal:8020=/etc/hadoop-3,arpit-new-falcon-5.cs1cloud.internal:8032=/etc/hadoop-3</value>
      <description>
          Comma separated AUTHORITY=HADOOP_CONF_DIR, where AUTHORITY is the HOST:PORT of
          the Hadoop service (JobTracker, HDFS). The wildcard '*' configuration is
          used when there is no exact match for an authority. The HADOOP_CONF_DIR contains
          the relevant Hadoop *-site.xml files. If the path is relative is looked within
          the Oozie configuration directory; though the path can be absolute (i.e. to point
          to Hadoop client conf/ directories in the local filesystem.
      </description>
    </property>

Hive

  • Dated Partitions
Falcon does not work well when table partition contains multiple dated columns. Falcon only works with a single dated partition. This is being tracked in FALCON-357 which is a limitation in Oozie.
catalog:default:table4#year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR};minute=${MINUTE}

For some arcane reason, hive substitutes the output format for text and sequence to be prefixed with Hive. Hive table import fails since it compares against the input and output formats of the source table and they are different. Say, a table was created with out specifying the file format, it defaults to:
fileFormat=TextFile, inputformat=org.apache.hadoop.mapred.TextInputFormat, outputformat=org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat

But, when hive fetches the table from the metastore, it replaces the output format with org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat and the comparison between source and target table fails.

org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer#checkTable
      // check IF/OF/Serde
      String existingifc = table.getInputFormatClass().getName();
      String importedifc = tableDesc.getInputFormat();
      String existingofc = table.getOutputFormatClass().getName();
      String importedofc = tableDesc.getOutputFormat();
      if ((!existingifc.equals(importedifc))
          || (!existingofc.equals(importedofc))) {
        throw new SemanticException(
            ErrorMsg.INCOMPATIBLE_SCHEMA
                .getMsg(" Table inputformat/outputformats do not match"));
      }

The above is not an issue with Hive 0.13.

Hive Examples

Following is an example entity configuration for lifecycle management functions for tables in Hive.

Hive Table Lifecycle Management - Replication and Retention

Primary Cluster
<?xml version="1.0"?>
<!--
    Primary cluster configuration for demo vm
  -->
<cluster colo="west-coast" description="Primary Cluster"
         name="primary-cluster"
         xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <interfaces>
        <interface type="readonly" endpoint="hftp://localhost:10070"
                   version="1.1.1" />
        <interface type="write" endpoint="hdfs://localhost:10020"
                   version="1.1.1" />
        <interface type="execute" endpoint="localhost:10300"
                   version="1.1.1" />
        <interface type="workflow" endpoint="http://localhost:11010/oozie/"
                   version="4.0.1" />
        <interface type="registry" endpoint="thrift://localhost:19083"
                   version="0.11.0" />
        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
                   version="5.4.3" />
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/staging" />
        <location name="temp" path="/tmp" />
        <location name="working" path="/apps/falcon/working" />
    </locations>
</cluster>

BCP Cluster
<?xml version="1.0"?>
<!--
    BCP cluster configuration for demo vm
  -->
<cluster colo="east-coast" description="BCP Cluster"
         name="bcp-cluster"
         xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <interfaces>
        <interface type="readonly" endpoint="hftp://localhost:20070"
                   version="1.1.1" />
        <interface type="write" endpoint="hdfs://localhost:20020"
                   version="1.1.1" />
        <interface type="execute" endpoint="localhost:20300"
                   version="1.1.1" />
        <interface type="workflow" endpoint="http://localhost:11020/oozie/"
                   version="4.0.1" />
        <interface type="registry" endpoint="thrift://localhost:29083"
                   version="0.11.0" />
        <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true"
                   version="5.4.3" />
    </interfaces>
    <locations>
        <location name="staging" path="/apps/falcon/staging" />
        <location name="temp" path="/tmp" />
        <location name="working" path="/apps/falcon/working" />
    </locations>
</cluster>

Feed with replication and eviction policy
<?xml version="1.0"?>
<!--
    Replicating Hourly customer table from primary to secondary cluster.
  -->
<feed description="Replicating customer table feed" name="customer-table-replicating-feed"
      xmlns="uri:falcon:feed:0.1">
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>

    <clusters>
        <cluster name="primary-cluster" type="source">
            <validity start="2013-09-24T00:00Z" end="2013-10-26T00:00Z"/>
            <retention limit="hours(2)" action="delete"/>
        </cluster>
        <cluster name="bcp-cluster" type="target">
            <validity start="2013-09-24T00:00Z" end="2013-10-26T00:00Z"/>
            <retention limit="days(30)" action="delete"/>

            <table uri="catalog:tgt_demo_db:customer_bcp#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
        </cluster>
    </clusters>

    <table uri="catalog:src_demo_db:customer_raw#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />

    <ACL owner="seetharam" group="users" permission="0755"/>
    <schema location="" provider="hcatalog"/>
</feed>

Hive Table used in Processing Pipelines

Primary Cluster

The cluster definition from the lifecycle example can be used.

Input Feed
<?xml version="1.0"?>
<feed description="clicks log table " name="input-table" xmlns="uri:falcon:feed:0.1">
    <groups>online,bi</groups>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>

    <clusters>
        <cluster name="##cluster##" type="source">
            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
            <retention limit="hours(24)" action="delete"/>
        </cluster>
    </clusters>

    <table uri="catalog:falcon_db:input_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />

    <ACL owner="testuser" group="group" permission="0x755"/>
    <schema location="/schema/clicks" provider="protobuf"/>
</feed>

Output Feed
<?xml version="1.0"?>
<feed description="clicks log identity table" name="output-table" xmlns="uri:falcon:feed:0.1">
    <groups>online,bi</groups>
    <frequency>hours(1)</frequency>
    <timezone>UTC</timezone>

    <clusters>
        <cluster name="##cluster##" type="source">
            <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/>
            <retention limit="hours(24)" action="delete"/>
        </cluster>
    </clusters>

    <table uri="catalog:falcon_db:output_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />

    <ACL owner="testuser" group="group" permission="0x755"/>
    <schema location="/schema/clicks" provider="protobuf"/>
</feed>

Process
<?xml version="1.0"?>
<process name="##processName##" xmlns="uri:falcon:process:0.1">
    <clusters>
        <cluster name="##cluster##">
            <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/>
        </cluster>
    </clusters>

    <parallel>1</parallel>
    <order>FIFO</order>
    <frequency>days(1)</frequency>
    <timezone>UTC</timezone>

    <inputs>
        <input end="today(0,0)" start="today(0,0)" feed="input-table" name="input"/>
    </inputs>

    <outputs>
        <output instance="now(0,0)" feed="output-table" name="output"/>
    </outputs>

    <properties>
        <property name="blah" value="blah"/>
    </properties>

    <workflow engine="pig" path="/falcon/test/apps/pig/table-id.pig"/>

    <retry policy="periodic" delay="minutes(10)" attempts="3"/>
</process>

Pig Script
A = load '$input_database.$input_table' using org.apache.hcatalog.pig.HCatLoader();
B = FILTER A BY $input_filter;
C = foreach B generate id, value;
store C into '$output_database.$output_table' USING org.apache.hcatalog.pig.HCatStorer('$output_dataout_partitions');