Hadoop File Ingest and Hive

In the beginning of all Hadoop adventures is the task of ingesting data to HDFS preferably today being queried for analysis by Hive at any point in time. High chances are that most enterprise data today at the beginning of any Hadoop project resides inside of RDBMS systems. Sqoop is the tool of choice within the Hadoop ecosystem for these kind of data. It is also quite convenient to use with Hive directly.

As most business is inherently event driven and more and more electronic devices are being used to track this events, ingesting a stream of data to Hadoop is a common demand. A tool like Kafka would be used for data ingestion into Hadoop in such a scenario of stream processing.

None of the methods mentioned above consider the sheer amount of data stored in files today. Not to mention the files newly created day by day. While WebHDFS or direct HDFS sure are convenient method for file ingestion they often require direct access to the cluster or a huge landing zone also with direct access to HDFS. A continues data ingest is also not supported.

For such scenarios Apache Flume sure would be a good option. Flume is capable of dealing with various continues data sources. Sources can be piped together over several nodes through channels writing data into various sink. In this post we look at the possibility to define a local directory where files can be dropped off, while Flume monitors for new files in that directory to sink to HDFS.

The Use Case

For this demo we take stock data from Yahoo finance stored as CSV files, similar to the scenario described here. The stock prices are supposed to be stored in HDFS in a way we can easily query them from Hive. From the schema below you can see that data will be partitioned by year, so the directory structure will look something like this:  /ingest/stocks/year=2014. Hive schema:

Overview of the approach:

Flume Ingest DataHere no user needs direct access to the landing zone nor does the landing zone need to be sized for huge data as we continuously store data to HDFS. We could define multiple of such host in an effort to scale out the data ingestion if needed.

Flume File Ingest

As source we define a local directory on one of the node we have Flume installed. We use the spooldir type and connect to a memory channel.

What is missing from here are two things. For one each CSV file contains a header “Date,Open,High,Low,Close,Volume,Adj Close” which we would want to avoid adding to the data. Also from our original design we would want the data to be partitioned by the year, but how can we extract that information from the data?

For such a case Flume offers so called Interceptors that act on the data read and can extract or act on them. In this demo we design to interceptors for the source. One interceptor will make sure we skip the header line of each file. The other interceptor uses a regex to extract the year form the line that can later be used to store it in the right partition.

At last the data is read from the channel by a sink an writing to HDFS using the year as the partition.

This configuration can be executed with:

Hive External Table

After the data gets stored to HDFS using the above schema Hive can be used to query the data.

Unfortunately if we would want to query the data from here it would return no result.

The problem is that we need to create the partition manually so that Hive is able to understand the data structure. After adding a partition like below the data can be queried.

Alternatively you can set hive.exec.dynamic.partition=true;  to be able to query the partitioned immediately.

6 thoughts on “Hadoop File Ingest and Hive”

Leave a Reply