Hadoop File Ingest and Hive

In the beginning of all Hadoop adventures is the task of ingesting data to HDFS preferably today being queried for analysis by Hive at any point in time. High chances are that most enterprise data today at the beginning of any Hadoop project resides inside of RDBMS systems. Sqoop is the tool of choice within the Hadoop ecosystem for these kind of data. It is also quite convenient to use with Hive directly.

As most business is inherently event driven and more and more electronic devices are being used to track this events, ingesting a stream of data to Hadoop is a common demand. A tool like Kafka would be used for data ingestion into Hadoop in such a scenario of stream processing.

None of the methods mentioned above consider the sheer amount of data stored in files today. Not to mention the files newly created day by day. While WebHDFS or direct HDFS sure are convenient method for file ingestion they often require direct access to the cluster or a huge landing zone also with direct access to HDFS. A continues data ingest is also not supported.

For such scenarios Apache Flume sure would be a good option. Flume is capable of dealing with various continues data sources. Sources can be piped together over several nodes through channels writing data into various sink. In this post we look at the possibility to define a local directory where files can be dropped off, while Flume monitors for new files in that directory to sink to HDFS.

The Use Case

For this demo we take stock data from Yahoo finance stored as CSV files, similar to the scenario described here. The stock prices are supposed to be stored in HDFS in a way we can easily query them from Hive. From the schema below you can see that data will be partitioned by year, so the directory structure will look something like this: /ingest/stocks/year=2014. Hive schema:

DROP TABLE IF EXISTS stocks;
CREATE EXTERNAL TABLE stocks (
    date STRING,
    open DOUBLE,
    high DOUBLE,
    low DOUBLE,
    close DOUBLE,
    volume BIGINT,
    adj_close DOUBLE)
PARTITIONED BY(year STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS SEQUENCEFILE
LOCATION '/ingest/stocks';

Overview of the approach:

Flume Ingest DataHere no user needs direct access to the landing zone nor does the landing zone need to be sized for huge data as we continuously store data to HDFS. We could define multiple of such host in an effort to scale out the data ingestion if needed.

Flume File Ingest

As source we define a local directory on one of the node we have Flume installed. We use the spooldir type and connect to a memory channel.

flume-hive-ingest.sources.src1.type = spooldir
flume-hive-ingest.sources.src1.channels = chan1
flume-hive-ingest.sources.src1.spoolDir = /vagrant/flume_log

What is missing from here are two things. For one each CSV file contains a header “Date,Open,High,Low,Close,Volume,Adj Close” which we would want to avoid adding to the data. Also from our original design we would want the data to be partitioned by the year, but how can we extract that information from the data?

For such a case Flume offers so called Interceptors that act on the data read and can extract or act on them. In this demo we design to interceptors for the source. One interceptor will make sure we skip the header line of each file. The other interceptor uses a regex to extract the year form the line that can later be used to store it in the right partition.

flume-hive-ingest.sources.src1.interceptors = skipHeadI dateI

flume-hive-ingest.sources.src1.interceptors.skipHeadI.type = regex_filter
flume-hive-ingest.sources.src1.interceptors.skipHeadI.regex = ^Date.*
flume-hive-ingest.sources.src1.interceptors.skipHeadI.excludeEvents = true

flume-hive-ingest.sources.src1.interceptors.dateI.type = regex_extractor
flume-hive-ingest.sources.src1.interceptors.dateI.regex = ^(d+)-.*
flume-hive-ingest.sources.src1.interceptors.dateI.serializers = y
flume-hive-ingest.sources.src1.interceptors.dateI.serializers.y.name = year

At last the data is read from the channel by a sink an writing to HDFS using the year as the partition.

flume-hive-ingest.sinks.sink1.type = hdfs
flume-hive-ingest.sinks.sink1.channel = chan1
flume-hive-ingest.sinks.sink1.hdfs.path = /ingest/stocks/year=%{year}
flume-hive-ingest.sinks.sink1.hdfs.filePrefix = stocks-
flume-hive-ingest.sinks.sink1.hdfs.rollInterval = 0
flume-hive-ingest.sinks.sink1.hdfs.rollSize = 1024
flume-hive-ingest.sinks.sink1.hdfs.rollCount = 0
flume-hive-ingest.sinks.sink1.hdfs.idleTimeout = 0
flume-hive-ingest.sinks.sink1.hdfs.batchSize = 100
flume-hive-ingest.sinks.sink1.hdfs.threadsPoolSize = 2
flume-hive-ingest.sinks.sink1.hdfs.round = true
flume-hive-ingest.sinks.sink1.hdfs.roundUnit = hour

This configuration can be executed with:

flume-ng agent -f data/flume-file-ingest.conf -n flume-hive-ingest

Hive External Table

After the data gets stored to HDFS using the above schema Hive can be used to query the data.

drwxrwxrwx   - vagrant hdfs          0 2015-05-10 18:37 /ingest/stocks/2014
-rw-r--r--   3 vagrant hdfs        988 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066137
-rw-r--r--   3 vagrant hdfs       1543 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066138
-rw-r--r--   3 vagrant hdfs       1544 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066139
-rw-r--r--   3 vagrant hdfs       1546 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066140
-rw-r--r--   3 vagrant hdfs       1542 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066141
-rw-r--r--   3 vagrant hdfs       1198 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066142
-rw-r--r--   3 vagrant hdfs       1543 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066143
-rw-r--r--   3 vagrant hdfs       1539 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066144
-rw-r--r--   3 vagrant hdfs       1544 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066145
-rw-r--r--   3 vagrant hdfs       1544 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066146
-rw-r--r--   3 vagrant hdfs       1199 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066147
-rw-r--r--   3 vagrant hdfs       1542 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066148
-rw-r--r--   3 vagrant hdfs       1335 2015-05-10 18:37 /ingest/stocks/2014/stocks-.1431283066149.tmp
drwxrwxrwx   - vagrant hdfs          0 2015-05-10 18:37 /ingest/stocks/2015
-rw-r--r--   3 vagrant hdfs       1541 2015-05-10 18:37 /ingest/stocks/2015/stocks-.1431283063899
-rw-r--r--   3 vagrant hdfs       1607 2015-05-10 18:37 /ingest/stocks/2015/stocks-.1431283063900
-rw-r--r--   3 vagrant hdfs       1541 2015-05-10 18:37 /ingest/stocks/2015/stocks-.1431283063901
-rw-r--r--   3 vagrant hdfs       1543 2015-05-10 18:37 /ingest/stocks/2015/stocks-.1431283063902
-rw-r--r--   3 vagrant hdfs        233 2015-05-10 18:37 /ingest/stocks/2015/stocks-.1431283063903.tmp

Unfortunately if we would want to query the data from here it would return no result.

hive> SELECT * FROM stocks;
SELECT * FROM stocks;
OK
Time taken: 1.038 seconds

The problem is that we need to create the partition manually so that Hive is able to understand the data structure. After adding a partition like below the data can be queried.

hive> ALTER TABLE stocks ADD PARTITION (year='2015');         
ALTER TABLE stocks ADD PARTITION (year='2015');
OK
Time taken: 0.53 seconds

Alternatively you can set hive.exec.dynamic.partition=true;  to be able to query the partitioned immediately.

Advertisements

6 thoughts on “Hadoop File Ingest and Hive

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s