Spark Streaming with Python

Streaming applications in Spark can be written in Scala, Java and Python giving developers the possibility to reuse existing code. An important note about Python in general with Spark is that it lacks behind the development of the other APIs by several months. For Spark Streaming only basic input sources are supported. Sources like Flume and Kafka might not be supported. For now only text file and text socket inputs are supported (Kafka support is available with Spark 1.3). A general fileStream is not supported just textFileStream.

Text file and socket input are fairly flexible input formats that is why for some use cases related to Hadoop this is fairly sufficient. PySpark also supports Hadoop compatible OutputFormats so that storing data to HBase would also be possible. Similar to the spooling directory use case with Python we are able to establish a file based stream input like this:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: hdfs_streaming.py <dir>"
        exit(-1)

    sc = SparkContext(appName="HDFSWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.textFileStream(sys.argv[1])
    counts = lines.flatMap(lambda line: line.split(" "))
                  .map(lambda x: (x, 1))
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

To store values in HBase an example setup like the one below can be used:

conf = {"hbase.zookeeper.quorum": host,
        "hbase.mapred.outputtable": table,
        "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
        "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
    conf=conf,
    keyConverter=keyConv,
    valueConverter=valueConv)

Further Readings

 

Advertisement

One thought on “Spark Streaming with Python

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s