Hive Streaming with Storm

With the release of Hive 0.13.1 and HCatalog, a new Streaming API was released as a Technical Preview to support continuous data ingestion into Hive tables. This API is intended to support streaming clients like Flume or Storm to better store data in Hive, which traditionally has been a batch oriented storage.

Based on the newly given ACID insert/update capabilities of Hive, the Streaming API is breaking down a stream of data into smaller batches which get committed in a transaction to the underlying storage. Once committed the data becomes immediately available for other queries.

Broadly speaking the API consists of two parts. One part is handling the transaction while the other is dealing with the underlying storage (HDFS). Transactions in Hive are handled by the the Metastore. Kerberos is supported from the beginning!

Some of the current limitations are:

  • Only delimited input data and JSON (strict syntax) are supported
  • Only ORC support
  • Hive table must be bucketed (unpartitioned tables are supported)

In this post I would like to demonstrate the use of a newly created Storm HiveBolt that makes use of the streaming API and is quite straightforward to use. The source of the here described example is provided at GitHub. To run this demo you would need a HDP 2.2 Sandbox, which can be downloaded for various virtualization environments here.

Stock Price Use Case

As a sample use case we will use some stock price data we have downloaded from Yahoo! Finance to demonstrate a possible finance application in Storm. In this scenario we simply downloaded the data as a CSV file contrary to connecting to a possible live stream. Instead we use a simple Kafka Producer to feed this info line by line to a Kafka queue. Each line contains of 8 fields namely: Date (date), Open (float), High (float), Low (float), Close (float), Volume (int), Adj Close (float), Name (string).

Storm Hive Streaming Topology
Storm Hive Streaming Topology

For Hive we will map this 8 fields one-to-one into a stock_prices table. We will also partition the table by name for demonstrating purposes. While partitioning is not required for the Hive Streaming; bucketing is. The table is therefor bucketed by day.

CREATE TABLE stock_prices(
  day DATE,
  open FLOAT,
  high FLOAT,
  low FLOAT,
  close FLOAT,
  volume INT,
  adj_close FLOAT
)
PARTITIONED BY (name STRING)
CLUSTERED BY (day) into 5 buckets
STORED AS ORC
TBLPROPERTIES ("orc.compress"="SNAPPY");

We can create this table using HCatalog this way:

hcat -f hive_stock.schema

Where hive_stock.schema is a file containing the above SQL/DDL. Alternatively you can use the -e flag and past in the above SQL/DDL as a String.

hcat -e "CREATE TABLE ..."

Hive Streaming Topology

As illustrated by the given diagram our Storm topology is very simple and straightforward. A Kafka Spoout is reading the the stock price information from a given topic in a Kafka queue. To create the Kafka queue we use the below command:

/usr/hdp/2.2.0.0-2041/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stock_topic

The implemented Kafka Producer to read the information form a CSV file is also quite simple. After each line it halts for some seconds just to give us enough time to verify the different results a request on the stock_prices table reveals during the time our topology is running. As we are using streaming we would expect that while the whole topology is busy we receive different counts for example for intermediate queries.

The Kafka producer:

public class KafkaStockProducer {

    public static void main(String... args) throws IOException, InterruptedException {
        // Kafka Properties
        Properties props = new Properties();
        // HDP uses 6667 as the broker port. Sometimes the binding is not resolved as expected, therefor this list.
        props.put("metadata.broker.list", "10.0.2.15:6667,127.0.0.1:6667,sandbox.hortonworks.com:6667");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");     
        // Topic Name
        String topic = "stock_topic";        
        // Create Kafka Producer
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<>(config);     
        // Read Data from File Line by Line
        List<String> stockPrices = IOUtils.readLines(
                KafkaStockProducer.class.getResourceAsStream("stock.csv"),
                Charset.forName("UTF-8")
        );
        // Send Each Line to Kafka Producer and Sleep
        for(String line : stockPrices){
            System.out.println(line);
            if(line.startsWith("Date")) continue;
            producer.send(new KeyedMessage<String, String>(topic, line));
            Thread.sleep(10L);
        }
        producer.close();
    }
}

While our topology runs to verify our yet to configure HiveBolt is working correctly we will query the table counting the rows:

USE default;
SET hive.execution.engine=mr;
SET hive.vectorized.execution.enabled=false;
SELECT COUNT(*) FROM stock_prices;
...
SELECT COUNT(*) FROM stock_prices;

We connect using beeline:

$ beeline
beeline> !connect jdbc:hive2://localhost:10000/default hive hive # no sasl

HiveBolt

First the HiveBolt needs to be configured to be able to connect to the metastore as well as the database to write to.

String metaStoreURI = "thrift://sandbox.hortonworks.com:9083";
String dbName = "default";
String tblName = "stock_prices";

Next we configure which Fields received from the previous Bolt is used as part of a possible partition (partition are not necessary) or which of the fields are used as column data.

String[] partNames = {"name"};
String[] colNames = {"day", "open", "high", "low", "close", "volume","adj_close"};
// Record Writer configuration
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
       .withColumnFields(new Fields(colNames))
       .withPartitionFields(new Fields(partNames));

As the Hive Streaming API divides a stream into a set of batches we would also need to configure the parameters for this batches. We could also provide a appropriate Kerberos keytab file and principale, if running in a secured cluster:

HiveOptions hiveOptions;
hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
      .withTxnsPerBatch(2)
      .withBatchSize(100)
      .withIdleTimeout(10);
      //.withKerberosKeytab(path_to_keytab)
      //.withKerberosPrincipal(krb_principal);

For all configurations HiveBolt uses a helper class HiveOptions. Configured correctly there is little more but to wire together our topology to make the whole thing fly.

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout);
builder.setBolt(STOCK_PROCESS_BOLT_ID, new StockDataBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
builder.setBolt(HIVE_BOLT_ID, new HiveBolt(hiveOptions)).shuffleGrouping(STOCK_PROCESS_BOLT_ID);

Giving It a Try

To run this a try we create the Kafka queue as describe and deploy the topology to the Sandbox Storm cluster.

$ /usr/hdp/2.2.0.0-2041/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stock_topic
$ /usr/hdp/2.2.0.0-2041/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
default
stock_topic

# deploy topology
$ /usr/hdp/2.2.0.0-2041/storm/bin/storm jar storm-hive-streaming-example-1.0-SNAPSHOT.jar storm_hive_streaming_example.Topology

Then we execute our Kafka producer,

$ java -cp storm-hive-streaming-example-1.0-SNAPSHOT.jar storm_hive_streaming_example.KafkaStockProducer

and watch the results using beeline:

0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM stock_prices;
+------+--+
| _c0  |
+------+--+
| 550  |
+------+--+
1 row selected (468.833 seconds)
0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM stock_prices;
....
+------+--+
| _c0  |
+------+--+
| 650  |
+------+--+
1 row selected (468.833 seconds)
0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM stock_prices;

Further Readings

10 thoughts on “Hive Streaming with Storm

  1. Thanks for sharing, I looked at your pom file on github. I am trying to compile mvn package on 0.9.3 cluster running into lot of already available classes issue. Could you help me with that, whta i was doing wrong

    Like

    1. Do you have this issues when running mvn clean package or do you have them when deploying the Toplogoy? For the later it could help to use <scope>provided</scope> for the remaining packages, like I did with the storm-core package. Let me know if this helps, if not please provide some more details.

      Like

  2. I have installed latest 2.2 sandbox and followed your steps but received this error on hive bolt and no message was stored in Hive table. Please find the error.
    2015-05-12 05:17:31 h.metastore [INFO] Trying to connect to metastore with URI thrift://sandbox.hortonworks.com:9083
    2015-05-12 05:17:31 h.metastore [INFO] Connected to metastore.
    2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
    2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
    2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
    2015-05-12 05:17:31 STDIO [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
    2015-05-12 05:17:31 o.a.h.h.q.Driver [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
    java.lang.NullPointerException: Non-local session path expected to be non-null
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
    at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:590)
    at org.apache.hadoop.hive.ql.Context.(Context.java:129)
    at org.apache.hadoop.hive.ql.Context.(Context.java:116)
    at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:382)
    at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:303)
    at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1067)
    at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1129)

    Like

    1. Can you go through the log file starting before that Exception to see if there is another error occurring that explains why there is no local session?
      Look for an Exception during Bolt start-up.

      Like

    2. What helps to surface the error of this, is to extend the connection timeout of the Writer like so:

      hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
                      .withTxnsPerBatch(2)
                      .withBatchSize(100)
                      .withIdleTimeout(10)
                      .withCallTimeout(10000000);
      

      I will update the source later today.

      The most common issue is not having sufficient access rights for hdfs://tmp/hive and file:///tmp/hive Set the access rights to 777 like this:

      $ sudo su - hdfs -c "hdfs dfs -chmod 777 /tmp/hive"
      $ sudo chmod 777 /tmp/hive
      

      Like

Leave a comment