Simple Spark Streaming & Kafka Example in a Zeppelin Notebook

Apache Zeppelin is a web-based, multi-purpose notebook for data discovery, prototyping, reporting, and visualization. With it’s Spark interpreter Zeppelin can also be used for rapid prototyping of streaming applications in addition to streaming-based reports.

In this post we will walk through a simple example of creating a Spark Streaming application based on Apache Kafka.

Creating a Notebook

For our example we first need to create a new notebook, which we’ll name “Simple Spark Streaming Kafka Example”:

Naming the notebook:

Adding Dependencies to Spark Interpreter

For our Kafka example we rely on dependencies not necessarly included with the SparkContext created by the Zeppelin interpreter. Zeppelin allows to import arbitrary packages available in any Maven repository. For any none standard/public repository it needs be configured with Zeppelin.

We can reach the interpreter setting through the notebook options as shown in the picture above. Here we can set the default interpreter for our notebook, but we can also enter the settings page of all interpreters:

Find the Spark interpreter in the list of available Zeppelin interpreters for editing:

Scroll to the bottom were you should find the appropriate Dependencies sections, where you can add additional packages:

This are the packages needed depending on your Kafka distribution as well as the Scala release you are using for Spark.

org.apache.spark:spark-streaming-kafka_2.10:1.6.2
org.apache.kafka:kafka_2.10:0.8.2.2
org.apache.kafka:kafka-clients:0.8.2.2

Save and restart the interpreter.

Preparing Kafka Topic

For this example we create a simple topic named “spark-test-topic” with just one partition:

$ cd /usr/hdp/current/kafka-broker/
$ bin/kafka-topics.sh --create 
> --topic spark-test-topic 
> --zookeeper node1.hdp:2181 
> --partitions 1 
> --replication-factor 1
Created topic "spark-test-topic".

In our example we will use a Spark Streaming app to read the key-value messages send to this topic’s partition and split it by space to simply print out the individual words of one interval.

For this we don’t need a specific producer but simply reuse the existing console producer, which sends each line after a line break to the topic:

$ bin/kafka-console-producer.sh --broker-list node1.hdp:6667 --topic spark-test-topic
word word hello

Simple Spark Streaming Application

For our example we do need a couple of dependencies not already imported into the created SparkContext. We do need the Kafka message serializes as well as the KafkaUtils of the Kafka streaming package for Spark.

import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

Our SparkStreamingContext will only require ERROR logging to prevent INFO logging output. The context will run in a 5 second interval:

sc.setLogLevel("ERROR")  // prevent INFO logging from polution output

val ssc = new StreamingContext(sc, Seconds(5))    // creating the StreamingContext with 5 seconds interval

For our streaming context we need to know the Kafka configuration of the topic to subscribe to, which we will hold in our kafkaConf variable:

val kafkaConf = Map(
    "metadata.broker.list" -> "node1.hdp:6667",
    "zookeeper.connect" -> "node1.hdp:2181",
    "group.id" -> "kafka-streaming-example",
    "zookeeper.connection.timeout.ms" -> "1000"
)

Finally we are creating a D-Stream and map it to individual words separated by space based :

val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
    ssc,
    kafkaConf,
    Map("spark-test-topic" -> 1),   // subscripe to topic and partition 1
    StorageLevel.MEMORY_ONLY
)

val words = lines.flatMap{ case(x, y) => y.split(" ")}

words.print()

The complete notebook:

The complete code example:

%spark
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

// prevent INFO logging from pollution output
sc.setLogLevel("ERROR")

// creating the StreamingContext with 5 seconds interval
val ssc = new StreamingContext(sc, Seconds(5))

val kafkaConf = Map(
    "metadata.broker.list" -> "node1.hdp:6667",
    "zookeeper.connect" -> "node1.hdp:2181",
    "group.id" -> "kafka-streaming-example",
    "zookeeper.connection.timeout.ms" -> "1000"
)

val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
    ssc,
    kafkaConf,
    Map("spark-test-topic" -> 1),   // subscripe to topic and partition 1
    StorageLevel.MEMORY_ONLY
)

val words = lines.flatMap{ case(x, y) => y.split(" ")}

words.print()

ssc.start()

Done.

Further Reading

9 thoughts on “Simple Spark Streaming & Kafka Example in a Zeppelin Notebook

  1. Hi !

    Thank you very much for your guideline, I find it very useful.
    Although I can run all the code, I think I am missing something else because I can not see the output of the script on my Zeppelin Notebook. In fact, I try to run the same code on the spark-shell and it does not print out any result neither.

    First I though it was due to communications issues, however my Zeppelin can (docker container) can reach Spark, Kafka and Zookeeper (also other containers).
    My second though is that I connects but it does not get the data inside.
    Kafka works fine. I can use producers and consumers in other applications.

    Would you have any idea why is this happening?

    Find below my versions:
    Spark: 2.1.0
    Kafka: 0.9.0.1
    Zeppelin: 0.7
    Zookeeper: 3.4.9
    org.apache.spark:spark-streaming-kafka_2.11:2.1.0
    org.apache.kafka:kafka_2.11:0.9.0.1
    org.apache.kafka:kafka-clients:0.9.0.1

    Thank you very much !!

    Like

  2. Yeah, thank you for the wonderful tutorial!

    I also got the the same problem: no output.

    I added ssc.awaitTermination() but that did not help.

    Any suggestion?

    Thanks

    John Lee

    Like

  3. The same issue for me as well . However we I run the the notebook, there are no errors, it just hangs, although the topic has data streaming and I can see that in the kafka consumer console. Any help would be great.

    Like

  4. The same issue here. After adding ssc.awaitTermination() at the end, I can see a timestamp printed per 5 second, but obviously it doesn’t read messages from Kafka. With the exactly same code in Eclipse Scala IDE, it can be run with messages consumed from Kafka successfully. Weird. Any advice?

    Like

    1. You need just to rewrite the words in the kafka console producer because the consumer can listen only to the newest ones (after its start).
      Try to write:
      word1 word2 word3
      and tell me if it works.

      Like

  5. thanks for you wonderful guide
    however i have same problem like other, but when i submit job from spark-submit its work but with zeppline, there is not thing for output

    Like

Leave a comment