Falcon provides data management functions for feeds declaratively. It allows users to represent feed locations as time-based partition directories on HDFS containing files.
Hive provides a simple and familiar database like tabular model of data management to its users, which are backed by HDFS. It supports two classes of tables, managed tables and external tables.
Falcon allows users to represent feed location as Hive tables. Falcon supports both managed and external tables and provide data management services for tables such as replication, eviction, archival, etc. Falcon will notify HCatalog as a side effect of either acquiring, replicating or evicting a data set instance and adds the missing capability of HCatalog table replication.
In the near future, Falcon will allow users to express pipeline processing in Hive scripts apart from Pig and Oozie workflows.
Falcon provides a system level option to enable Hive integration. Falcon must be configured with an implementation for the catalog registry. The default implementation for Hive is shipped with Falcon.
catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
Falcon depends heavily on data-availability triggers for scheduling Falcon workflows. Oozie must support data-availability triggers based on HCatalog partition availability. This is only available in oozie 4.x.
Hence, Falcon for Hive support needs Oozie 4.x.
Falcon post Hive integration depends heavily on the shared library feature of Oozie. Since the sheer number of jars for HCatalog, Pig and Hive are in the many 10s in numbers, its quite daunting to redistribute the dependent jars from Falcon.
This is a one time effort in Oozie setup and is quite straightforward.
thrift://hcatalog-server:port
catalog:database_name:table_name#partitions(key=value?)*
The user owns all data managed by Falcon. Falcon runs as the user who submitted the feed. Falcon will authenticate with HCatalog as the end user who owns the entity and the data.
For Hive managed tables, the table may be owned by the end user or âhiveâ. For âhiveâ owned tables, user will have to configure the feed as âhiveâ.
It generally depends on the frequency of the feeds configured in Falcon and how often data is ingested, replicated, or processed.
bin/hadoop dfs -copyFromLocal $LFS/share/lib/hcatalog/hcatalog-pig-adapter-0.5.0-incubating.jar share/lib/hcatalog
org.apache.hadoop.security.token.SecretManager$InvalidToken: Password not found for ApplicationAttempt appattempt_1395965672651_0010_000002
Make sure all oozie servers that falcon talks to has the hadoop configs configured in oozie-site.xml
<property> <name>oozie.service.HadoopAccessorService.hadoop.configurations</name> <value>*=/etc/hadoop/conf,arpit-new-falcon-1.cs1cloud.internal:8020=/etc/hadoop-1,arpit-new-falcon-1.cs1cloud.internal:8032=/etc/hadoop-1,arpit-new-falcon-2.cs1cloud.internal:8020=/etc/hadoop-2,arpit-new-falcon-2.cs1cloud.internal:8032=/etc/hadoop-2,arpit-new-falcon-5.cs1cloud.internal:8020=/etc/hadoop-3,arpit-new-falcon-5.cs1cloud.internal:8032=/etc/hadoop-3</value> <description> Comma separated AUTHORITY=HADOOP_CONF_DIR, where AUTHORITY is the HOST:PORT of the Hadoop service (JobTracker, HDFS). The wildcard '*' configuration is used when there is no exact match for an authority. The HADOOP_CONF_DIR contains the relevant Hadoop *-site.xml files. If the path is relative is looked within the Oozie configuration directory; though the path can be absolute (i.e. to point to Hadoop client conf/ directories in the local filesystem. </description> </property>
catalog:default:table4#year=${YEAR};month=${MONTH};day=${DAY};hour=${HOUR};minute=${MINUTE}
fileFormat=TextFile, inputformat=org.apache.hadoop.mapred.TextInputFormat, outputformat=org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
But, when hive fetches the table from the metastore, it replaces the output format with org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat and the comparison between source and target table fails.
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer#checkTable // check IF/OF/Serde String existingifc = table.getInputFormatClass().getName(); String importedifc = tableDesc.getInputFormat(); String existingofc = table.getOutputFormatClass().getName(); String importedofc = tableDesc.getOutputFormat(); if ((!existingifc.equals(importedifc)) || (!existingofc.equals(importedofc))) { throw new SemanticException( ErrorMsg.INCOMPATIBLE_SCHEMA .getMsg(" Table inputformat/outputformats do not match")); }
The above is not an issue with Hive 0.13.
Following is an example entity configuration for lifecycle management functions for tables in Hive.
<?xml version="1.0"?> <!-- Primary cluster configuration for demo vm --> <cluster colo="west-coast" description="Primary Cluster" name="primary-cluster" xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <interfaces> <interface type="readonly" endpoint="hftp://localhost:10070" version="1.1.1" /> <interface type="write" endpoint="hdfs://localhost:10020" version="1.1.1" /> <interface type="execute" endpoint="localhost:10300" version="1.1.1" /> <interface type="workflow" endpoint="http://localhost:11010/oozie/" version="4.0.1" /> <interface type="registry" endpoint="thrift://localhost:19083" version="0.11.0" /> <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3" /> </interfaces> <locations> <location name="staging" path="/apps/falcon/staging" /> <location name="temp" path="/tmp" /> <location name="working" path="/apps/falcon/working" /> </locations> </cluster>
<?xml version="1.0"?> <!-- BCP cluster configuration for demo vm --> <cluster colo="east-coast" description="BCP Cluster" name="bcp-cluster" xmlns="uri:falcon:cluster:0.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <interfaces> <interface type="readonly" endpoint="hftp://localhost:20070" version="1.1.1" /> <interface type="write" endpoint="hdfs://localhost:20020" version="1.1.1" /> <interface type="execute" endpoint="localhost:20300" version="1.1.1" /> <interface type="workflow" endpoint="http://localhost:11020/oozie/" version="4.0.1" /> <interface type="registry" endpoint="thrift://localhost:29083" version="0.11.0" /> <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" version="5.4.3" /> </interfaces> <locations> <location name="staging" path="/apps/falcon/staging" /> <location name="temp" path="/tmp" /> <location name="working" path="/apps/falcon/working" /> </locations> </cluster>
<?xml version="1.0"?> <!-- Replicating Hourly customer table from primary to secondary cluster. --> <feed description="Replicating customer table feed" name="customer-table-replicating-feed" xmlns="uri:falcon:feed:0.1"> <frequency>hours(1)</frequency> <timezone>UTC</timezone> <clusters> <cluster name="primary-cluster" type="source"> <validity start="2013-09-24T00:00Z" end="2013-10-26T00:00Z"/> <retention limit="hours(2)" action="delete"/> </cluster> <cluster name="bcp-cluster" type="target"> <validity start="2013-09-24T00:00Z" end="2013-10-26T00:00Z"/> <retention limit="days(30)" action="delete"/> <table uri="catalog:tgt_demo_db:customer_bcp#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> </cluster> </clusters> <table uri="catalog:src_demo_db:customer_raw#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> <ACL owner="seetharam" group="users" permission="0755"/> <schema location="" provider="hcatalog"/> </feed>
<?xml version="1.0"?> <feed description="clicks log table " name="input-table" xmlns="uri:falcon:feed:0.1"> <groups>online,bi</groups> <frequency>hours(1)</frequency> <timezone>UTC</timezone> <clusters> <cluster name="##cluster##" type="source"> <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> <retention limit="hours(24)" action="delete"/> </cluster> </clusters> <table uri="catalog:falcon_db:input_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> <ACL owner="testuser" group="group" permission="0x755"/> <schema location="/schema/clicks" provider="protobuf"/> </feed>
<?xml version="1.0"?> <feed description="clicks log identity table" name="output-table" xmlns="uri:falcon:feed:0.1"> <groups>online,bi</groups> <frequency>hours(1)</frequency> <timezone>UTC</timezone> <clusters> <cluster name="##cluster##" type="source"> <validity start="2010-01-01T00:00Z" end="2012-04-21T00:00Z"/> <retention limit="hours(24)" action="delete"/> </cluster> </clusters> <table uri="catalog:falcon_db:output_table#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" /> <ACL owner="testuser" group="group" permission="0x755"/> <schema location="/schema/clicks" provider="protobuf"/> </feed>
<?xml version="1.0"?> <process name="##processName##" xmlns="uri:falcon:process:0.1"> <clusters> <cluster name="##cluster##"> <validity end="2012-04-22T00:00Z" start="2012-04-21T00:00Z"/> </cluster> </clusters> <parallel>1</parallel> <order>FIFO</order> <frequency>days(1)</frequency> <timezone>UTC</timezone> <inputs> <input end="today(0,0)" start="today(0,0)" feed="input-table" name="input"/> </inputs> <outputs> <output instance="now(0,0)" feed="output-table" name="output"/> </outputs> <properties> <property name="blah" value="blah"/> </properties> <workflow engine="pig" path="/falcon/test/apps/pig/table-id.pig"/> <retry policy="periodic" delay="minutes(10)" attempts="3"/> </process>