Streaming applications in Spark can be written in Scala, Java and Python giving developers the possibility to reuse existing code. An important note about Python in general with Spark is that it lacks behind the development of the other APIs by several months. For Spark Streaming only basic input sources are supported. Sources like Flume and Kafka might not be supported. For now only text file and text socket inputs are supported (Kafka support is available with Spark 1.3). A general fileStream is not supported just textFileStream.
Text file and socket input are fairly flexible input formats that is why for some use cases related to Hadoop this is fairly sufficient. PySpark also supports Hadoop compatible OutputFormats so that storing data to HBase would also be possible. Similar to the spooling directory use case with Python we are able to establish a file based stream input like this:
import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext if __name__ == "__main__": if len(sys.argv) != 2: print >> sys.stderr, "Usage: hdfs_streaming.py <dir>" exit(-1) sc = SparkContext(appName="HDFSWordCount") ssc = StreamingContext(sc, 1) lines = ssc.textFileStream(sys.argv[1]) counts = lines.flatMap(lambda line: line.split(" ")) .map(lambda x: (x, 1)) .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()
To store values in HBase an example setup like the one below can be used:
conf = {"hbase.zookeeper.quorum": host, "hbase.mapred.outputtable": table, "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( conf=conf, keyConverter=keyConv, valueConverter=valueConv)
Further Readings
Von @jonbros: Spark Streaming with Python http://t.co/TXZpPebQ3T #IronBloggerMUC
LikeLike