Python Virtualenv with Hadoop Streaming

If you are using Python with Hadoop Streaming a lot then you might know about the trouble of keeping all nodes up to date with required packages. A nice way to work around this is to use Virtualenv for each streaming project. Besides the hurdle of keeping all nodes in sync with the necessary libraries another advantage of using Virtualenv is the possibility to try different versions and setups within the same project seamlessly.

In this example we are going to create a Python job that counts the n-grams of hotel names in relation to the country the hotel is located in. Besides the use of a Virtualenv where we install NLTK, we are going to strive the use of Avro as an input for a Python streaming job, as well as secondary sorting with the use of KeyFieldBasedPartitioner and KeyFieldBasedComparator .

Relocatable Environment

The brought idea is to package the virtual environment and distribute it together with the job. Afterwards it can be referenced from the Shebang of the mapper and reducer.

Usually Virtualenv creates paths absolute to the current system. To avoid this the created environment has to be made relocatable. The following commands creates a relocatable Virtualenv in the current directory:

[hk@host]$ virtualenv-2.7 demoenv
New python executable in demoenv/bin/python2.7 ... done.
[hk@host]$ virtualenv-2.7 --relocatable demoenv
[hk@host]$ source demoenv/bin/activate
(demoenv)[hk@host]$ 

On the system (CentOS) this is executed there exists both python 3 and 2.7. In this example we explicitly create a Python 2.7 environment. We make it relocatable afterwards and activate it using source. From here we are able to install all required packages in the currently activated environment. For our purpose it is enough to simply install NLTK.

Install NLTK:

(demoenv)[hk@host]$ pip install nltk

Now that we have our environment setup we can package it for distribution. For this we zip all files within the created directory so that we don’t package the directory itself.

(demoenv)[hk@host]$ cd demoenv
(demoenv)[hk@host]$ zip -r ../demoenv.zip *
(demoenv)[hk@host]$ cd ..

This setting can be distributed with our streaming job. In order of doing this we can for example upload the package to HDFS and include it as an archive to the job using -archvies option.

(demoenv)[hk@host]$ hdfs dfs -put demoenv.zip /user/hk/virtualenv/

Counting Hotel Name NGrams by Country

In this example job we are going to read the names of Hotels split them into ngrams and count them by country. The hotels are stored in a Avro file on the cluster. After reading them using org.apache.avro.mapred.AvroAsTextInputFormat the input for the mapper will look something like this:

{"name": "Hotel München", "iso2_country": "DE"}
{"name": "Hotel Leipzig", "iso2_country": "DE"}
{"name": "Hostel Hamburg", "iso2_country": "DE"}
{"name": "Hotel Palama", "iso2_country": "ES"}
{"name": "Motel Barcelona", "iso2_country": "ES"}
{"name": "Holiday INN Paris", "iso2_country": "FR"}

In the mapper we are going to split the name into tri-grams emitting them together with the ISO country code plus a count of 1. So that the output of our token_mapper.py will look like this:

DE§  h   1
DE§ ho  1
DE§hot  1
DE§ote  1
DE§tel  1
DE§el   1
DE§l m  1
DE§ mü  1
DE§mün  1

By looking at the output you can see that we applied a composite key of country code and tri-gram here with § being our delimiter. As we want to count the tri-grams by country this needs to be partitioned only be the country iso code for example DE. On the other hand since we want to count the trigrams at the reducer this entries have to be sorted by the country iso code AND the tri-gram. Both the sorting and partitioning can be achieved by applying a so called KeyFieldBasedPartitioner and KeyFieldBasedComparator. So that our job setting will look like this:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar 
-D mapred.job.name="tokenize names" 
-D mapred.reduce.tasks=5 
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator 
-D map.output.key.field.separator=§ 
-D mapred.text.key.comparator.options=-k1,2 
-D mapred.text.key.partitioner.options=-k1 
-files token_reducer.py,token_mapper.py 
....
-inputformat org.apache.avro.mapred.AvroAsTextInputFormat 
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

We can now assume a correctly sorted order in our reducer in addition to the fact that the tri-grams are distributed to the reducer using the country code only. With this in mind it makes it straightforward for us in counting the tri-grams by country. The resulting output should look like this:

CN    ng  22547
US  ote 23811
US  tel 24144
US  el  24843
US   ho 25291
CN  l   32464
US  nn  34155
US  inn 35099
US   in 35810
CN  hot 38213
CN  ote 38855
CN  el  40884
CN  tel 41121
CN   ho 45821

Using the Distributed Environment

In addition to the prior posted job configuration it is important to also include our packaged environment. This is possible by providing the -archives configuration together with the path in HDFS to our zip.

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar 
....
-archives hdfs://[host]:8020/user/hk/virtualenv/demoenv.zip#demoenv 

Notice the symlink (#) at the end of the last line? This is the name under which we can reference the Virtualenv in our scripts. It also means that in general we can provide for a single job different settings as long as we use a different identifier/symlink. The Shebang in our script will look like the following:

#!./demoenv/bin/python
# -*- coding: utf-8 -*-

import logging
import nltk
....

What happens here is that our package is distributed to each node that will execute our mapper and reducer scripts. We are then able to read the content of the package by the provided symlink – demoenv. In that we have our Python executable which we use to execute our scripts.

Last we are going to look at how we can use Avro files as an input to our streaming jobs. For streaming the Avro project provides a AvroAsTextInputFormat class that reads the file with the given schema and exposes the input as plain JSON to the underlying mapper.

There also exist a AvroTextOutputFormat  class which can be used to write Avro back to the cluster. Unfortunately it currently is only capable of writing the output with a byte-schema, which basically means it writes it out as a JSON string. There exists a open ticket to fix that here AVRO-1067.

To use the AvroAsTextInputFormat class we have to provide the required jars to job along with the -inputformat configuration. With the jars being in the same folder the configuration looks like this:

hadoop jar ....
-libjars avro-1.7.6.jar,avro-mapred-1.7.6-hadoop2.jar 
-inputformat org.apache.avro.mapred.AvroAsTextInputFormat 
...

The Complete Job

For completeness I provide you here with the complete job I have used for this example including the code for the mapper and reducer:

Job Conf

Executable script to run the whole job:

#!/bin/bash
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar 
-D mapred.job.name="tokenize names" 
-D mapred.reduce.tasks=5 
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator 
-D map.output.key.field.separator=§ 
-D mapred.text.key.comparator.options=-k1,2 
-D mapred.text.key.partitioner.options=-k1 
-files token_reducer.py,token_mapper.py 
-libjars avro-1.7.6.jar,avro-mapred-1.7.6-hadoop2.jar 
-archives hdfs://[host]:8020/user/hk/virtualenv/demoenv.zip#demoenv 
-input /user/hk/hotels 
-output /user/hk/token_out 
-mapper token_mapper.py 
-reducer token_reducer.py 
-inputformat org.apache.avro.mapred.AvroAsTextInputFormat 
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

Token Mapper

#!./demoenv/bin/python
# -*- coding: utf-8 -*-

import logging
import nltk
from sys import stdin, stdout
from json import loads

if __name__ == '__main__':
  for line in stdin:
    try:
      hotel = loads(line)
      hotel_name = hotel.get('name', None)
      if hotel_name is not None:
        name = hotel_name.replace(u'§', '').lower()
        name_chars = list(name)
        ngr = nltk.ngrams(name_chars, 3, pad_left=True, pad_right=True, pad_symbol=' ')
        for t in ngr:
          stdout.write(u'{0}§{1}t{2}n'.format(hotel.get('iso2_country', u'N/A'), u''.join(t), 1).encode('utf8'))
    except Exception as e:
      logging.error(e) <br>

Token Reuducer

#!./demoenv/bin/python
# -*- coding: utf-8 -*-

import logging
from sys import stdin, stdout
from json import loads

current_iso2 = None
current_ngram = None
ngram_cnt = 0

class Reducer:
  
  def __init__(self):
    self.current_iso2 = None
    self.current_ngram = None
    self.ngram_cnt = 0
  
  def print_result(self, iso2, ngram):
    stdout.write(u'{0}t{1}t{2}n'.format(
                    self.current_iso2, 
                    self.current_ngram, 
                    self.ngram_cnt))
    self.current_iso2 = iso2
    self.current_ngram = ngram
    self.ngram_cnt = 1

  def reduce(self):
    iso2 = ''
    for line in stdin:
      key, value = line.strip().split('t', 1)
      iso2, ngram = key.split(u'§', 1)
      if self.current_iso2 == iso2:
        if self.current_ngram == ngram:
          self.ngram_cnt += int(value)
        else:
          self.print_result(iso2, ngram)
      else:
        if self.current_iso2 is not None:
          self.print_result(iso2, ngram)
        else:
          self.current_iso2 = iso2
          self.current_ngram = ngram
          self.ngram_cnt = 1
    self.print_result(iso2, ngram)

if __name__ == '__main__':
  import codecs
  stdout = codecs.getwriter('utf-8')(stdout)
  stdin = codecs.getreader('utf-8')(stdin)
  app = Reducer()
  app.reduce()

 Further Readings

8 thoughts on “Python Virtualenv with Hadoop Streaming

  1. Wow, this is really nice! + the same exact magic worked in pig (I needed map-only), using

    set mapred.create.symlink yes;
    set mapred.cache.archives hdfs://[host]:8020/user/hk/virtualenv/demoenv.zip#demoenv;
    DEFINE CMD `token_mapper.py` ship(‘token_mapper.py’);
    A = load ‘/user/hk/hotels’ using PigStorage(‘,’);
    B = stream A through CMD;
    store B into ‘/user/hk/token_out’;

    see further details here: http://ragrawal.wordpress.com/2014/03/25/apache-pig-and-distributed-cache/

    Like

  2. This blog assumes that the client host and cluster hosts have the same python version installed on the same location. It is possible to distribute a different python version in a streaming job if you:
    – create and make relocatable the virtualenv with a python install that is the same as on the cluster
    – install the different python version on the client host without shared libraries (to be used with the virtualenv -p option)
    – you manually add missing python libs to the virtualenv (virtualenv a subset of libraries and refers to the common python install for others)

    Like

    1. I use virtualenv -p /path/to/python3.5 venv as some library required this version.

      But unfortunately, the hadoop streaming job report the error like no module named json..

      Should I install python3.5 in the cluster machines to share the common libs?

      I assumed the achieve zip file had included all the files to run python3.5.

      Thanks.

      Like

Leave a comment