HiveSink for Flume

With the most recent release of HDP (v2.2.4) Hive Streaming is shipped as technical preview. It can for example be used with Storm to ingest streaming data collected from Kafka as demonstrated here. But it also still has some serious limitations and in case of Storm a major bug. Nevertheless Hive Streaming is likely to become the tool of choice when it comes to streamline data ingestion to Hadoop. So it is worth to explore already today.

Flume’s upcoming release 1.6 will contain a HiveSink capable of leveraging Hive Streaming for data ingestion. In the following post we will use it as a replacement for the HDFS sink used in a previous post here. Other then replacing the HDFS sink with a HiveSink none of the previous setup will change, except for Hive table schema which needs to be adjusted as part of the requirements that currently exist around Hive Streaming. So let’s get started by looking into these restrictions.

Hive Streaming Limitations

The only file format supported is ORC. So the original schema of the stocks table needs to be adjusted to reflect that:

As you can see from the schema the table now also is bucketed, which is required by Hive Streaming. Further more we need to set the following:

  1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
  2. hive.compactor.initiator.on = true
  3. hive.compactor.worker.threads > 0 (eg. 5)

Configuration diff as given by Ambari:

hive_streamin_conf

Also important to know is that the current Streaming API only supports delimited input data (CSV, tab seperated) or JSON (strict syntax).

Flume Hive Sink

For the Flume Hive Sink the following configurations with their defaults can or must be configured:

  • hive.metastore
  • hive.database
  • hive.table
  • hive.partition
  • hive.txnsPerBatchAsk = 100
  • batchSize = 15000
  • serializer (delimited | json)
  • serializer.delimiter = ,
  • serializer.fieldnames
  • batchSize = 15000
  • idleTimeout = 0
  • callTimeout = 10000
  • heartBeatInterval = 240
  • maxOpenConnections
  • useLocalTimeStamp
  • timeZone
  • roundUnit
  • round
  • hour
  • minute
  • second
  • roundValue

With the previous example we can use the following Flume configuration. The batch size and the transactions per batch are not set very high which probably be different in a production setup, but is also dependent on the data stream to expect.

Before starting a Flume agent with this configuration you might need to set HIVE_HOME  and HCAT_HOME as flume-ng will only put the required Hive jars into the classpath with this logic:

Setting them in my case was pretty straight forward:

Now we can start the flume agent, obviously after we have created the stocks table:

When working correctly you should be able to see output similar to this, once you copy the stocks data into the spooling directory:

Troubleshooting

If something goes wrong, for example with failing connection to the metastore please:

  1. Check the requirements posted here or on the Hive wiki.
  2. Also check that your schema is bucketed and read the Exception message carefully.
  3. Increase the timeout for the HiveWriter to connect to the Metastore and again read the Exception message carefully.
  4. Make hdfs://tmp/hive and file:///tmp/hive writable (eg. chmod 777)

A typical error message could look like this:

Further Readings

13 thoughts on “HiveSink for Flume”

  1. Hello,

    I have setup hive streaming from flume using the hive sink for weblogs. It is creating one delta file per insert for weblogs (avg ~20mil records a day). I added 5 compactors running every 5 seconds but it does not seem to be able to keep up with compacting/cleaning out the delta files as it runs out of memory.

    Any suggestions?

  2. I am facing this error any idea what I am doing wrong

    I have set HIVE_HOME and HADOOP_HOME

    under flum-env.sh

    2015-11-22 21:04:45,524 (conf-file-poller-0) [ERROR – org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)] Failed to start agent because dependencies were not found in classpath. Error follows.
    java.lang.NoClassDefFoundError: org/apache/hive/hcatalog/streaming/RecordWriter
    at org.apache.flume.sink.hive.HiveSink.createSerializer(HiveSink.java:220)
    at org.apache.flume.sink.hive.HiveSink.configure(HiveSink.java:203)
    at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: org.apache.hive.hcatalog.streaming.RecordWriter
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

      1. Hi Henning,

        Thanks for reply I could solve that problem by setting $HCAT_HOME .

        But now I am facing another problem see the below logs

        2015-11-23 18:45:19,274 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN – org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:323)] hdfs-Cluster1-sink : Failed connecting to EndPoint {metaStoreUri=’thrift://192.168.65.128:9083′, database=’default’, table=’hasan’, partitionVals=[] }
        org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri=’thrift://192.168.65.128:9083′, database=’default’, table=’hasan’, partitionVals=[] }
        at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:98)
        at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
        at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
        Caused by: org.apache.flume.sink.hive.HiveWriter$TxnBatchException: Failed acquiring Transaction Batch from EndPoint: {metaStoreUri=’thrift://192.168.65.128:9083′, database=’default’, table=’hasan’, partitionVals=[] }
        at org.apache.flume.sink.hive.HiveWriter.nextTxnBatch(HiveWriter.java:338)
        at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:90)
        … 6 more
        Caused by: org.apache.hive.hcatalog.streaming.TransactionBatchUnAvailable: Unable to acquire transaction batch on end point: {metaStoreUri=’thrift://192.168.65.128:9083′, database=’default’, table=’hasan’, partitionVals=[] }
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.(HiveEndPoint.java:511)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.(HiveEndPoint.java:461)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatchImpl(HiveEndPoint.java:345)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatch(HiveEndPoint.java:325)
        at org.apache.flume.sink.hive.HiveWriter$7.call(HiveWriter.java:332)
        at org.apache.flume.sink.hive.HiveWriter$7.call(HiveWriter.java:329)
        at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:366)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        … 1 more
        Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns
        at org.apache.thrift.TApplicationException.read(TApplicationException.java:108)
        at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_open_txns(ThriftHiveMetastore.java:3834)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.open_txns(ThriftHiveMetastore.java:3821)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.openTxns(HiveMetaStoreClient.java:1841)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152)
        at com.sun.proxy.$Proxy7.openTxns(Unknown Source)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.openTxnImpl(HiveEndPoint.java:520)
        at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.(HiveEndPoint.java:504)
        … 10 more
        2015-11-23 18:45:19,298 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR – org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
        org.apache.flume.EventDeliveryException: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri=’thrift://192.168.65.128:9083′, database=’default’, table=’hasan’, partitionVals=[] }
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:268)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
        Caused by: org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri=’thrift://192.168.65.128:9083′, database=’default’, table=’hasan’, partitionVals=[] }
        at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:98)
        at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
        at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:296)
        at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:254)
        … 3 more
        Caused by: org.apache.flume.sink.hive.HiveWriter$TxnBatchException: Failed acquiring Transaction Batch from EndPoint: {metaStoreUri=’thrift://192.168.65.128:9083′, database=’default’, table=’hasan’, partitionVals=[] }
        at org.apache.flume.sink.hive.HiveWriter.nextTxnBatch(HiveWriter.java:338)
        at org.apache.flume.sink.hive.HiveWriter.(HiveWriter.java:90)

        What could be wrong here

        1. Hasan, please check your hive.metastore settings. Since you have not set the hive.database, make sure the table hasan that you are trying to connect to exist in the default database.

  3. Hi henning,

    Im getting the following error when trying to use the hive sink. 16/04/07 17:42:26 INFO hive.metastore: Trying to connect to metastore with URI thrift://149.195.62.179:9083
    16/04/07 17:42:26 INFO hive.metastore: Connected to metastore.
    16/04/07 17:42:28 INFO hive.HiveWriter: Acquired Txn Batch TxnIds=[12043…13042] on endPoint = {metaStoreUri=’thrift://149.195.62.179:9083′, database=’default’, table=’flume_hive’, partitionVals=[] }. Switching to first txn
    16/04/07 17:42:31 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    org.apache.flume.EventDeliveryException: java.lang.ArrayIndexOutOfBoundsException: 1
    at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:268)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.reorderFields(DelimitedInputWriter.java:179)
    at org.apache.hive.hcatalog.streaming.DelimitedInputWriter.write(DelimitedInputWriter.java:212)
    at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.write(HiveEndPoint.java:629)
    at org.apache.flume.sink.hive.HiveDelimitedTextSerializer.write(HiveDelimitedTextSerializer.java:51)
    at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:153)
    at org.apache.flume.sink.hive.HiveWriter$1.call(HiveWriter.java:147)
    at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:366)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    … 1 more

    any idea on what could be wrong….it is accepting only 1 record from file….more than 1 record its failing with the above exception.

    1. Sashi you should check your serializer.fieldnames setting. The number of fields mentioned in it seems to be more than the actual fields in the record fed to Hive sink.

Leave a Reply