Hive Streaming with Storm

With the release of Hive 0.13.1 and HCatalog, a new Streaming API was released as a Technical Preview to support continuous data ingestion into Hive tables. This API is intended to support streaming clients like Flume or Storm to better store data in Hive, which traditionally has been a batch oriented storage.

Based on the newly given ACID insert/update capabilities of Hive, the Streaming API is breaking down a stream of data into smaller batches which get committed in a transaction to the underlying storage. Once committed the data becomes immediately available for other queries.

Broadly speaking the API consists of two parts. One part is handling the transaction while the other is dealing with the underlying storage (HDFS). Transactions in Hive are handled by the the Metastore. Kerberos is supported from the beginning!

Some of the current limitations are:

  • Only delimited input data and JSON (strict syntax) are supported
  • Only ORC support
  • Hive table must be bucketed (unpartitioned tables are supported)

In this post I would like to demonstrate the use of a newly created Storm HiveBolt that makes use of the streaming API and is quite straightforward to use. The source of the here described example is provided at GitHub. To run this demo you would need a HDP 2.2 Sandbox, which can be downloaded for various virtualization environments here.

Stock Price Use Case

As a sample use case we will use some stock price data we have downloaded from Yahoo! Finance to demonstrate a possible finance application in Storm. In this scenario we simply downloaded the data as a CSV file contrary to connecting to a possible live stream. Instead we use a simple Kafka Producer to feed this info line by line to a Kafka queue. Each line contains of 8 fields namely: Date (date), Open (float), High (float), Low (float), Close (float), Volume (int), Adj Close (float), Name (string).

Storm Hive Streaming Topology
Storm Hive Streaming Topology

For Hive we will map this 8 fields one-to-one into a  stock_prices table. We will also partition the table by  name for demonstrating purposes. While partitioning is not required for the Hive Streaming; bucketing is. The table is therefor bucketed by day.

We can create this table using HCatalog this way:

Where  hive_stock.schema is a file containing the above SQL/DDL. Alternatively you can use the -e flag and past in the above SQL/DDL as a String.

Hive Streaming Topology

As illustrated by the given diagram our Storm topology is very simple and straightforward. A Kafka Spoout is reading the the stock price information from a given topic in a Kafka queue. To create the Kafka queue we use the below command:

The implemented Kafka Producer to read the information form a CSV file is also quite simple. After each line it halts for some seconds just to give us enough time to verify the different results a request on the  stock_prices table reveals during the time our topology is running. As we are using streaming we would expect that while the whole topology is busy we receive different counts for example for intermediate queries.

The Kafka producer:

While our topology runs to verify our yet to configure HiveBolt is working correctly we will query the table counting the rows:

We connect using beeline:


First the  HiveBolt needs to be configured to be able to connect to the metastore as well as the database to write to.

Next we configure which Fields received from the previous Bolt is used as part of a possible partition (partition are not necessary) or which of the fields are used as column data.

As the Hive Streaming API divides a stream into a set of batches we would also need to configure the parameters for this batches. We could also provide a appropriate Kerberos keytab file and principale, if running in a secured cluster:

For all configurations HiveBolt uses a helper class HiveOptions. Configured correctly there is little more but to wire together our topology to make the whole thing fly.

Giving It a Try

To run this a try we create the Kafka queue as describe and deploy the topology to the Sandbox Storm cluster.

Then we execute our Kafka producer,

and watch the results using beeline:

Further Readings

9 thoughts on “Hive Streaming with Storm”

  1. Thanks for sharing, I looked at your pom file on github. I am trying to compile mvn package on 0.9.3 cluster running into lot of already available classes issue. Could you help me with that, whta i was doing wrong

    1. Do you have this issues when running mvn clean package or do you have them when deploying the Toplogoy? For the later it could help to use <scope>provided</scope> for the remaining packages, like I did with the storm-core package. Let me know if this helps, if not please provide some more details.

  2. I have installed latest 2.2 sandbox and followed your steps but received this error on hive bolt and no message was stored in Hive table. Please find the error.
    2015-05-12 05:17:31 h.metastore [INFO] Trying to connect to metastore with URI thrift://
    2015-05-12 05:17:31 h.metastore [INFO] Connected to metastore.
    2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
    2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
    2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
    2015-05-12 05:17:31 STDIO [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
    2015-05-12 05:17:31 o.a.h.h.q.Driver [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
    java.lang.NullPointerException: Non-local session path expected to be non-null
    at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(
    at org.apache.hadoop.hive.ql.Context.(
    at org.apache.hadoop.hive.ql.Context.(
    at org.apache.hadoop.hive.ql.Driver.compile(
    at org.apache.hadoop.hive.ql.Driver.compile(
    at org.apache.hadoop.hive.ql.Driver.compileInternal(
    at org.apache.hadoop.hive.ql.Driver.runInternal(

    1. Can you go through the log file starting before that Exception to see if there is another error occurring that explains why there is no local session?
      Look for an Exception during Bolt start-up.

    2. What helps to surface the error of this, is to extend the connection timeout of the Writer like so:

      I will update the source later today.

      The most common issue is not having sufficient access rights for hdfs://tmp/hive and file:///tmp/hive Set the access rights to 777 like this:

Leave a Reply