With the release of Hive 0.13.1 and HCatalog, a new Streaming API was released as a Technical Preview to support continuous data ingestion into Hive tables. This API is intended to support streaming clients like Flume or Storm to better store data in Hive, which traditionally has been a batch oriented storage.
Based on the newly given ACID insert/update capabilities of Hive, the Streaming API is breaking down a stream of data into smaller batches which get committed in a transaction to the underlying storage. Once committed the data becomes immediately available for other queries.
Broadly speaking the API consists of two parts. One part is handling the transaction while the other is dealing with the underlying storage (HDFS). Transactions in Hive are handled by the the Metastore. Kerberos is supported from the beginning!
Some of the current limitations are:
- Only delimited input data and JSON (strict syntax) are supported
- Only ORC support
- Hive table must be bucketed (unpartitioned tables are supported)
In this post I would like to demonstrate the use of a newly created Storm HiveBolt that makes use of the streaming API and is quite straightforward to use. The source of the here described example is provided at GitHub. To run this demo you would need a HDP 2.2 Sandbox, which can be downloaded for various virtualization environments here.
Stock Price Use Case
As a sample use case we will use some stock price data we have downloaded from Yahoo! Finance to demonstrate a possible finance application in Storm. In this scenario we simply downloaded the data as a CSV file contrary to connecting to a possible live stream. Instead we use a simple Kafka Producer to feed this info line by line to a Kafka queue. Each line contains of 8 fields namely: Date (date), Open (float), High (float), Low (float), Close (float), Volume (int), Adj Close (float), Name (string).

For Hive we will map this 8 fields one-to-one into a stock_prices table. We will also partition the table by name for demonstrating purposes. While partitioning is not required for the Hive Streaming; bucketing is. The table is therefor bucketed by day.
CREATE TABLE stock_prices( day DATE, open FLOAT, high FLOAT, low FLOAT, close FLOAT, volume INT, adj_close FLOAT ) PARTITIONED BY (name STRING) CLUSTERED BY (day) into 5 buckets STORED AS ORC TBLPROPERTIES ("orc.compress"="SNAPPY");
We can create this table using HCatalog this way:
hcat -f hive_stock.schema
Where hive_stock.schema is a file containing the above SQL/DDL. Alternatively you can use the -e flag and past in the above SQL/DDL as a String.
hcat -e "CREATE TABLE ..."
Hive Streaming Topology
As illustrated by the given diagram our Storm topology is very simple and straightforward. A Kafka Spoout is reading the the stock price information from a given topic in a Kafka queue. To create the Kafka queue we use the below command:
/usr/hdp/2.2.0.0-2041/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stock_topic
The implemented Kafka Producer to read the information form a CSV file is also quite simple. After each line it halts for some seconds just to give us enough time to verify the different results a request on the stock_prices table reveals during the time our topology is running. As we are using streaming we would expect that while the whole topology is busy we receive different counts for example for intermediate queries.
The Kafka producer:
public class KafkaStockProducer { public static void main(String... args) throws IOException, InterruptedException { // Kafka Properties Properties props = new Properties(); // HDP uses 6667 as the broker port. Sometimes the binding is not resolved as expected, therefor this list. props.put("metadata.broker.list", "10.0.2.15:6667,127.0.0.1:6667,sandbox.hortonworks.com:6667"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "1"); // Topic Name String topic = "stock_topic"; // Create Kafka Producer ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<>(config); // Read Data from File Line by Line List<String> stockPrices = IOUtils.readLines( KafkaStockProducer.class.getResourceAsStream("stock.csv"), Charset.forName("UTF-8") ); // Send Each Line to Kafka Producer and Sleep for(String line : stockPrices){ System.out.println(line); if(line.startsWith("Date")) continue; producer.send(new KeyedMessage<String, String>(topic, line)); Thread.sleep(10L); } producer.close(); } }
While our topology runs to verify our yet to configure HiveBolt is working correctly we will query the table counting the rows:
USE default; SET hive.execution.engine=mr; SET hive.vectorized.execution.enabled=false; SELECT COUNT(*) FROM stock_prices; ... SELECT COUNT(*) FROM stock_prices;
We connect using beeline:
$ beeline beeline> !connect jdbc:hive2://localhost:10000/default hive hive # no sasl
HiveBolt
First the HiveBolt needs to be configured to be able to connect to the metastore as well as the database to write to.
String metaStoreURI = "thrift://sandbox.hortonworks.com:9083"; String dbName = "default"; String tblName = "stock_prices";
Next we configure which Fields received from the previous Bolt is used as part of a possible partition (partition are not necessary) or which of the fields are used as column data.
String[] partNames = {"name"}; String[] colNames = {"day", "open", "high", "low", "close", "volume","adj_close"}; // Record Writer configuration DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withPartitionFields(new Fields(partNames));
As the Hive Streaming API divides a stream into a set of batches we would also need to configure the parameters for this batches. We could also provide a appropriate Kerberos keytab file and principale, if running in a secured cluster:
HiveOptions hiveOptions; hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper) .withTxnsPerBatch(2) .withBatchSize(100) .withIdleTimeout(10); //.withKerberosKeytab(path_to_keytab) //.withKerberosPrincipal(krb_principal);
For all configurations HiveBolt uses a helper class HiveOptions. Configured correctly there is little more but to wire together our topology to make the whole thing fly.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(KAFKA_SPOUT_ID, kafkaSpout); builder.setBolt(STOCK_PROCESS_BOLT_ID, new StockDataBolt()).shuffleGrouping(KAFKA_SPOUT_ID); builder.setBolt(HIVE_BOLT_ID, new HiveBolt(hiveOptions)).shuffleGrouping(STOCK_PROCESS_BOLT_ID);
Giving It a Try
To run this a try we create the Kafka queue as describe and deploy the topology to the Sandbox Storm cluster.
$ /usr/hdp/2.2.0.0-2041/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic stock_topic $ /usr/hdp/2.2.0.0-2041/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181 default stock_topic # deploy topology $ /usr/hdp/2.2.0.0-2041/storm/bin/storm jar storm-hive-streaming-example-1.0-SNAPSHOT.jar storm_hive_streaming_example.Topology
Then we execute our Kafka producer,
$ java -cp storm-hive-streaming-example-1.0-SNAPSHOT.jar storm_hive_streaming_example.KafkaStockProducer
and watch the results using beeline:
0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM stock_prices; +------+--+ | _c0 | +------+--+ | 550 | +------+--+ 1 row selected (468.833 seconds) 0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM stock_prices; .... +------+--+ | _c0 | +------+--+ | 650 | +------+--+ 1 row selected (468.833 seconds) 0: jdbc:hive2://localhost:10000> SELECT COUNT(*) FROM stock_prices;
Hive Streaming with Storm http://t.co/CEcwnVYKPh
LikeLike
Thanks for sharing, I looked at your pom file on github. I am trying to compile mvn package on 0.9.3 cluster running into lot of already available classes issue. Could you help me with that, whta i was doing wrong
LikeLike
Do you have this issues when running
mvn clean package
or do you have them when deploying the Toplogoy? For the later it could help to use<scope>provided</scope>
for the remaining packages, like I did with the storm-core package. Let me know if this helps, if not please provide some more details.LikeLike
I have installed latest 2.2 sandbox and followed your steps but received this error on hive bolt and no message was stored in Hive table. Please find the error.
2015-05-12 05:17:31 h.metastore [INFO] Trying to connect to metastore with URI thrift://sandbox.hortonworks.com:9083
2015-05-12 05:17:31 h.metastore [INFO] Connected to metastore.
2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
2015-05-12 05:17:31 o.a.h.h.q.l.PerfLogger [INFO]
2015-05-12 05:17:31 STDIO [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
2015-05-12 05:17:31 o.a.h.h.q.Driver [ERROR] FAILED: NullPointerException Non-local session path expected to be non-null
java.lang.NullPointerException: Non-local session path expected to be non-null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.hadoop.hive.ql.session.SessionState.getHDFSSessionPath(SessionState.java:590)
at org.apache.hadoop.hive.ql.Context.(Context.java:129)
at org.apache.hadoop.hive.ql.Context.(Context.java:116)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:382)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:303)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1067)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1129)
LikeLike
Can you go through the log file starting before that Exception to see if there is another error occurring that explains why there is no local session?
Look for an Exception during Bolt start-up.
LikeLike
What helps to surface the error of this, is to extend the connection timeout of the Writer like so:
I will update the source later today.
The most common issue is not having sufficient access rights for hdfs://tmp/hive and file:///tmp/hive Set the access rights to 777 like this:
LikeLike
This worked.
LikeLike