Skip navigation links

Package org.apache.hive.streaming

HCatalog Streaming API -- high level description

See: Description

Package org.apache.hive.streaming Description

HCatalog Streaming API -- high level description

NOTE: The Streaming API feature is provided as a technology preview. The API may undergo incompatible changes in upcoming releases.

Traditionally adding new data into hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a batch insertion. Insertion of new data into an existing partition or table is not done in a way that gives consistent results to readers. Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches (of records) into a Hive partition. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.

This API is intended for streaming clients such as NiFi, Flume and Storm, which continuously generate data. Streaming support is built on top of ACID based insert/update support in Hive.

The classes and interfaces part of the Hive streaming API are broadly categorized into two. The first set provides support for connection and transaction management while the second set provides I/O support. Transactions are managed by the Hive MetaStore. Writes are performed to HDFS via Hive wrapper APIs that bypass MetaStore.

Note on packaging: The APIs are defined in the org.apache.hive.streaming Java package and included as the hive-streaming jar.

STREAMING REQUIREMENTS

A few things are currently required to use streaming.

  1. Currently, only ORC storage format is supported. So 'stored as orc' must be specified during table creation.
  2. The hive table may be bucketed but must not be sorted.
  3. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  4. Currently, when issuing queries on streaming tables, query client must set
    1. hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat
  5. The above client settings are a temporary requirement and the intention is to drop the need for them in the near future.
  6. Settings required in hive-site.xml for Metastore:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.support.concurrency = true
    3. hive.compactor.initiator.on = true
    4. hive.compactor.worker.threads > 0

Note: Streaming to unpartitioned tables is also supported.

Transaction and Connection management

The class HiveEndPoint is a Hive end point to connect to. An endpoint is either a Hive table or partition. An endpoint is cheap to create and does not internally hold on to any network connections. Invoking the newConnection method on it creates a new connection to the Hive MetaStore for streaming purposes. It returns a StreamingConnection object. Multiple connections can be established on the same endpoint. StreamingConnection can then be used to initiate new transactions for performing I/O.

Dynamic Partition Creation:

It is very likely that a setup in which data is being streamed continuously (e.g. Flume), it is desirable to have new partitions created automatically (say on a hourly basis). In such cases requiring the Hive admin to pre-create the necessary partitions may not be reasonable. Consequently the streaming API allows streaming clients to create partitions as needed. HiveEndPoind.newConnection() accepts a argument to indicate if the partition should be auto created. Partition creation being an atomic action, multiple clients can race to create the partition, but only one would succeed, so streaming clients need not synchronize when creating a partition. The user of the client process needs to be given write permissions on the Hive table in order to create partitions.

Batching Transactions:

Transactions are implemented slightly differently than traditional database systems. Multiple transactions are grouped into a Transaction Batch and each transaction has an id. Data from each transaction batch gets a single file on HDFS, which eventually gets compacted with other files into a larger file automatically for efficiency.

Basic Steps:

After connection is established, a streaming client first requests for a new batch of transactions. In response it receives a set of transaction ids that are part of the transaction batch. Subsequently the client proceeds to consume one transaction at a time by initiating new transactions. Client will write() one or more records per transactions and either commit or abort the current transaction before switching to the next one. Each TransactionBatch.write() invocation automatically associates the I/O attempt with the current transaction id. The user of the streaming client needs to have write permissions to the partition or table.

Concurrency Note: I/O can be performed on multiple TransactionBatchs concurrently. However the transactions within a transaction batch much be consumed sequentially.

Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction. RecordWriter is the interface implemented by all writers. A writer is responsible for taking a record in the form of a byte[] containing data in a known format (e.g. CSV) and writing it out in the format supported by Hive streaming. A RecordWriter may reorder or drop fields from the incoming record if necessary to map them to the corresponding columns in the Hive Table. A streaming client will instantiate an appropriate RecordWriter type and pass it to StreamingConnection.fetchTransactionBatch(). The streaming client does not directly interact with the RecordWriter therafter, but relies on the TransactionBatch to do so.

Currently, out of the box, the streaming API provides two implementations of the RecordWriter interface. One handles delimited input data (such as CSV, tab separated, etc. and the other for JSON (strict syntax). Support for other input formats can be provided by additional implementations of the RecordWriter interface.

Performance, Concurrency, Etc.

Each StreamingConnection is writing data at the rate the underlying FileSystem can accept it. If that is not sufficient, multiple StreamingConnection objects can be created concurrently.

Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch may have at most 2 threads operaing on it. See TransactionBatch

Skip navigation links

Copyright © 2022 The Apache Software Foundation. All rights reserved.