As Spark natively supports reading from any kind of Hadoop InputFormat, those data sources are also available to form DStreams for Spark Streaming applications. By using a simple HDFS file input format a HDFS directory can be turned into a spooling directory for data ingestion.
Files newly added to that directory in an atomic way (required) would be picked up by the running streaming context for processing. The data could for example be processed and stored in an external database like HBase or Hive.
Such a streaming application is actually quite simple to assemble, still having a huge impact on data ingestion approaches in Hadoop, because a typical landing zone could be obeyed.
Today we often find such a landing zone reside on an edge node close to the cluster running tools like Ab Ignition or Datastage that are only capable of offline processing, treating Hadopp as a “Yet-Another-Filesystem”. With Spark comes another alternative for such ETL workflows to be executed in parallel on the cluster itself.
A simple example of an spooling directory on HDFS could be established by the following code:
import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkFileExample { def main(args: Array[String]): Unit = { if(args.length < 1) { System.err.println("Usage: <log-dir>") System.exit(1) } val sparkConf = new SparkConf().setAppName("SpoolDirSpark") val ssc = new StreamingContext(sparkConf, Seconds(2)) val inputDirectory = args(0) val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](inputDirectory).map{ case (x, y) => (x.toString, y.toString) } lines.print() ssc.start() ssc.awaitTermination() } }
A fairly simple example printing each line in a file copied to the directory in HDFS to the screen.
Once build it could be executed as following, where /spark_log would be a directory in HDFS:
spark-submit --master yarn-client --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 1 --class simpleexample.SparkFileExample spark-streaming-simple-example-0.1-SNAPSHOT.jar /spark_log
A file uploaded to the named directory would be printed line by line to the terminal (Spark will not print all lines with the print statement):
Upload file to the dir:
$ hdfs dfs -put data/bmw_stocks.csv /spark_log
Spark Streaming output:
15/05/15 12:48:28 INFO JobScheduler: Added jobs for time 1431694108000 ms 15/05/15 12:48:28 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on one.hdp:52897 (size: 1917.0 B, free: 267.3 MB) 15/05/15 12:48:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on one.hdp:52897 (size: 40.3 KB, free: 267.2 MB) 15/05/15 12:48:29 INFO DAGScheduler: Stage 0 (print at SparkFileExample.scala:29) finished in 2.281 s 15/05/15 12:48:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2271 ms on one.hdp (1/1) 15/05/15 12:48:29 INFO YarnClientClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/05/15 12:48:29 INFO DAGScheduler: Job 37 finished: print at SparkFileExample.scala:29, took 2.424158 s ------------------------------------------- Time: 1431694106000 ms ------------------------------------------- (0,Date,Open,High,Low,Close,Volume,Adj Close) (42,2015-05-07,103.60,105.25,102.00,104.60,2172400,101.70) (96,2015-05-06,106.50,108.45,101.50,104.10,3585700,101.214) (151,2015-05-05,106.70,109.30,106.00,106.50,2045100,103.547) (206,2015-05-04,106.95,108.10,105.25,106.75,1417200,103.79) (260,2015-05-01,106.10,106.10,106.10,106.10,000,103.158) (311,2015-04-30,106.50,108.10,104.70,106.10,2293500,103.158) (366,2015-04-29,110.60,110.75,105.45,106.35,2720600,103.401) (421,2015-04-28,113.75,114.25,108.75,109.85,2062300,106.804) (476,2015-04-27,111.50,113.40,110.00,113.00,1523500,109.867) ...
Storing any of that data to a external database for example Hive or HBase could be achieved by using the foreachRDD functionality:
HDFS Spooling Directory with Spark http://t.co/F1hwGGq29b
LikeLike
Von @jonbros: HDFS Spooling Directory with Spark http://t.co/CSgDDqNZ0h #IronBloggerMUC
LikeLike
HDFS Spooling Directory with Spark – http://t.co/kj2VL2ahc4
LikeLike