Broadcast Join with Spark

With a broadcast join one side of the join equation is being materialized and send to all mappers. It is therefore considered as a map-side join which can bring significant performance improvement by omitting the required sort-and-shuffle phase during a reduce step. In this Post we are going to discuss the possibility for broadcast joins in Spark DataFrame and RDD API in Scala.

To improve performance of join operations in Spark developers can decide to materialize one side of the join equation for a map-only join avoiding an expensive sort an shuffle phase. The table is being send to all mappers as a file and joined during the read operation of the parts of the other table. As the data set is getting materialized and send over the network it does only bring significant performance improvement, if it considerable small. Another constraint is that it also needs to fit completely into memory of each executor. Not to forget it also needs to fit into the memory of the Driver!

In Spark broadcast variables are shared among executors using the Torrent protocol. The Torrent protocol is a Peer-to-Peer protocol which is know to perform very well for distributing data sets across multiple peers. The advantage of the Torrent protocol is that peers share blocks of a file among each other not relying on a central entity holding all the blocks.

In a previous blog post we’ve already discussed the different Hive Join Strategies available for MapReduce processing. This also applies for Spark as for this blog post we only discuss the broadcast join. The bucket join discussed for Hive is another quick map-side only join and would relate to the co-partition join strategy available for Spark

Sample Date

For this example we are using a simple data set of employee to department relationship. Employees of a imaginary company are assigned to one department they work for which is referenced by an ID (depID). As we have just a few departments but multiple employees we will pick the department table for broadcasting.

We can use Spark RDD parallelize to quickly turn our sample data into a RDD and with toDF to DataFrame as well:

DataFrame API

The DataFrame API of Spark makes it very concise to create a broadcast variable out of the department DataFrame.

Below is the output of our broadcast join:

You should be able to find the broadcast happening during the execution of the job in the log files. An example output is given below:

Scala API

In Scala we have first to create a pair RDD based from our input file, which will give us the possibility to broadcast the departments table as a Map for quick lookup based on the department id.

The broadcast variable can further be used during our map phase returning a joined data set or None:

The output should look similar to this:

Further Reading

 

Leave a Reply