HDFS Spooling Directory with Spark

As Spark natively supports reading from any kind of Hadoop InputFormat, those data sources are also available to form DStreams for Spark Streaming applications. By using a simple HDFS file input format a HDFS directory can be turned into a spooling directory for data ingestion.

Files newly added to that directory in an atomic way (required) would be picked up by the running streaming context for processing. The data could for example be processed and stored in an external database like HBase or Hive.

Such a streaming application is actually quite simple to assemble, still having a huge impact on data ingestion approaches in Hadoop, because a typical landing zone could be obeyed.

Today we often find such a landing zone reside on an edge node close to the cluster running tools like Ab Ignition or Datastage that are only capable of offline processing, treating Hadopp as a “Yet-Another-Filesystem”. With Spark comes another alternative for such ETL workflows to be executed in parallel on the cluster itself.

A simple example of an spooling directory on HDFS could be established by the following code:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkFileExample {

  def main(args: Array[String]): Unit = {
    if(args.length < 1) {
      System.err.println("Usage: <log-dir>")
      System.exit(1)
    }

    val sparkConf = new SparkConf().setAppName("SpoolDirSpark")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val inputDirectory = args(0)

    val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory).map{ case (x, y) => (x.toString, y.toString) }

    lines.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

A fairly simple example printing each line in a file copied to the directory in HDFS to the screen.

Once build it could be executed as following, where /spark_log would be a directory in HDFS:

spark-submit --master yarn-client 
     --num-executors 2 
     --driver-memory 512m 
     --executor-memory 512m 
     --executor-cores 1 
     --class simpleexample.SparkFileExample 
     spark-streaming-simple-example-0.1-SNAPSHOT.jar /spark_log

A file uploaded to the named directory would be printed line by line to the terminal (Spark will not print all lines with the print statement):

Upload file to the dir:

$ hdfs dfs -put data/bmw_stocks.csv /spark_log

Spark Streaming output:

15/05/15 12:48:28 INFO JobScheduler: Added jobs for time 1431694108000 ms
15/05/15 12:48:28 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on one.hdp:52897 (size: 1917.0 B, free: 267.3 MB)
15/05/15 12:48:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on one.hdp:52897 (size: 40.3 KB, free: 267.2 MB)
15/05/15 12:48:29 INFO DAGScheduler: Stage 0 (print at SparkFileExample.scala:29) finished in 2.281 s
15/05/15 12:48:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2271 ms on one.hdp (1/1)
15/05/15 12:48:29 INFO YarnClientClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/05/15 12:48:29 INFO DAGScheduler: Job 37 finished: print at SparkFileExample.scala:29, took 2.424158 s
-------------------------------------------
Time: 1431694106000 ms
-------------------------------------------
(0,Date,Open,High,Low,Close,Volume,Adj Close)
(42,2015-05-07,103.60,105.25,102.00,104.60,2172400,101.70)
(96,2015-05-06,106.50,108.45,101.50,104.10,3585700,101.214)
(151,2015-05-05,106.70,109.30,106.00,106.50,2045100,103.547)
(206,2015-05-04,106.95,108.10,105.25,106.75,1417200,103.79)
(260,2015-05-01,106.10,106.10,106.10,106.10,000,103.158)
(311,2015-04-30,106.50,108.10,104.70,106.10,2293500,103.158)
(366,2015-04-29,110.60,110.75,105.45,106.35,2720600,103.401)
(421,2015-04-28,113.75,114.25,108.75,109.85,2062300,106.804)
(476,2015-04-27,111.50,113.40,110.00,113.00,1523500,109.867)
...

Storing any of that data to a external database for example Hive or HBase could be achieved by using the foreachRDD functionality:

Further Readings

Advertisement

3 thoughts on “HDFS Spooling Directory with Spark

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s