Python Virtualenv with Hadoop Streaming

If you are using Python with Hadoop Streaming a lot then you might know about the trouble of keeping all nodes up to date with required packages. A nice way to work around this is to use Virtualenv for each streaming project. Besides the hurdle of keeping all nodes in sync with the necessary libraries another advantage of using Virtualenv is the possibility to try different versions and setups within the same project seamlessly.

In this example we are going to create a Python job that counts the n-grams of hotel names in relation to the country the hotel is located in. Besides the use of a Virtualenv where we install NLTK, we are going to strive the use of Avro as an input for a Python streaming job, as well as secondary sorting with the use of KeyFieldBasedPartitioner and  KeyFieldBasedComparator .

Relocatable Environment

The brought idea is to package the virtual environment and distribute it together with the job. Afterwards it can be referenced from the Shebang of the mapper and reducer.

Usually Virtualenv creates paths absolute to the current system. To avoid this the created environment has to be made relocatable. The following commands creates a relocatable Virtualenv in the current directory:

On the system (CentOS) this is executed there exists both python 3 and 2.7. In this example we explicitly create a Python 2.7 environment. We make it relocatable afterwards and activate it using source. From here we are able to install all required packages in the currently activated environment. For our purpose it is enough to simply install NLTK.

Install NLTK:

Now that we have our environment setup we can package it for distribution. For this we zip all files within the created directory so that we don’t package the directory itself.

This setting can be distributed with our streaming job. In order of doing this we can for example upload the package to HDFS and include it as an archive to the job using  -archvies option.

Counting Hotel Name NGrams by Country

In this example job we are going to read the names of Hotels split them into ngrams and count them by country. The hotels are stored in a Avro file on the cluster. After reading them using  org.apache.avro.mapred.AvroAsTextInputFormat the input for the mapper will look something like this:

In the mapper we are going to split the name into tri-grams emitting them together with the ISO country code plus a count of 1. So that the output of our  token_mapper.py will look like this:

By looking at the output you can see that we applied a composite key of country code and tri-gram here with § being our delimiter. As we want to count the tri-grams by country this needs to be partitioned only be the country iso code for example DE. On the other hand since we want to count the trigrams at the reducer this entries have to be sorted by the country iso code AND the tri-gram. Both the sorting and partitioning can be achieved by applying a so called KeyFieldBasedPartitioner and KeyFieldBasedComparator. So that our job setting will look like this:

We can now assume a correctly sorted order in our reducer in addition to the fact that the tri-grams are distributed to the reducer using the country code only. With this in mind it makes it straightforward for us in counting the tri-grams by country. The resulting output should look like this:

Using the Distributed Environment

In addition to the prior posted job configuration it is important to also include our packaged environment. This is possible by providing the  -archives configuration together with the path in HDFS to our zip.

Notice the symlink (#) at the end of the last line? This is the name under which we can reference the Virtualenv in our scripts. It also means that in general we can provide for a single job different settings as long as we use a different identifier/symlink. The Shebang in our script will look like the following:

What happens here is that our package is distributed to each node that will execute our mapper and reducer scripts. We are then able to read the content of the package by the provided symlink – demoenv. In that we have our Python executable which we use to execute our scripts.

Last we are going to look at how we can use Avro files as an input to our streaming jobs. For streaming the Avro project provides a  AvroAsTextInputFormat class that reads the file with the given schema and exposes the input as plain JSON to the underlying mapper.

There also exist a  AvroTextOutputFormat  class which can be used to write Avro back to the cluster. Unfortunately it currently is only capable of writing the output with a byte-schema, which basically means it writes it out as a JSON string. There exists a open ticket to fix that here AVRO-1067.

To use the AvroAsTextInputFormat class we have to provide the required jars to job along with the -inputformat configuration. With the jars being in the same folder the configuration looks like this:

The Complete Job

For completeness I provide you here with the complete job I have used for this example including the code for the mapper and reducer:

Job Conf

Executable script to run the whole job:

Token Mapper

Token Reuducer

 Further Readings

7 thoughts on “Python Virtualenv with Hadoop Streaming”

  1. Wow, this is really nice! + the same exact magic worked in pig (I needed map-only), using

    set mapred.create.symlink yes;
    set mapred.cache.archives hdfs://[host]:8020/user/hk/virtualenv/demoenv.zip#demoenv;
    DEFINE CMD token_mapper.py ship(‘token_mapper.py’);
    A = load ‘/user/hk/hotels’ using PigStorage(‘,’);
    B = stream A through CMD;
    store B into ‘/user/hk/token_out’;

    see further details here: http://ragrawal.wordpress.com/2014/03/25/apache-pig-and-distributed-cache/

  2. This blog assumes that the client host and cluster hosts have the same python version installed on the same location. It is possible to distribute a different python version in a streaming job if you:
    – create and make relocatable the virtualenv with a python install that is the same as on the cluster
    – install the different python version on the client host without shared libraries (to be used with the virtualenv -p option)
    – you manually add missing python libs to the virtualenv (virtualenv a subset of libraries and refers to the common python install for others)

    1. I use virtualenv -p /path/to/python3.5 venv as some library required this version.

      But unfortunately, the hadoop streaming job report the error like no module named json..

      Should I install python3.5 in the cluster machines to share the common libs?

      I assumed the achieve zip file had included all the files to run python3.5.

      Thanks.

Leave a Reply

Your email address will not be published. Required fields are marked *