Running PySpark with Virtualenv

Controlling the environment of an application is vital for it’s functionality and stability. Especially in a distributed environment it is important for developers to have control over the version of dependencies. In such an scenario it’s a critical task to ensure possible conflicting requirements of multiple applications are not disturbing each other.

That is why frameworks like YARN ensure that each application is executed in a self-contained environment – typically in a Linux (Java) Container or Docker Container – that is controlled by the developer. In this post we show what this means for Python environments being used by Spark.

YARN Application Deployment

As mentioned earlier does YARN execute each application in a self-contained environment on each host. This ensures the execution in a controlled environment managed by individual developers. The way this works in a nutshell is that the dependency of an application are distributed to each node typically via HDFS.

YARN Shipping Applications
This figure simplifies the fact that HDFS is actually being used to distribute the application. See HDFS distributed cache for reference.

The files are uploaded to a staging folder /user/${username}/.${application} of the submitting user in HDFS. Because of the distributed architecture of HDFS it is ensured that multiple nodes have local copies of the files. In fact to ensure that a large fraction of the cluster has a local copy of application files and does not need to download them over the network, the HDFS replication factor is set much higher for this files than 3. Often a number between 10 and 20 is chosen for the replication factor.

During the preparation of the container on a node you will notice in logs similar commands to the below example are being executed:

The folder /hadoop/yarn/local/ is the configured location on each node where YARN stores it’s needed files and logs locally. Creating a symbolic link like this inside the container makes the content of the zip file available. It is being referenced as "pyspark.zip".

Using Virtualenv

For application developers this means that they can package and ship their controlled environment with each application. Other solutions like NFS or Amazon EFS shares are not needed, especially since solutions like shared folders makes for a bad architecture that is not designed to scale very well making the development of application less agile.

The following example demonstrate the use of pythons virutalenv together with a PySpark application. This sample application uses the NLTK package with the additional requirement of making tokenizer and tagger resources available to the application as well.

The sample application:

Preparing the sample input data

For our example we are using the provided samples of NLTK (http://www.nltk.org/nltk_data/) and upload them to HDFS:

No Hard (Absolute) Links!

Before we actually go and create our environment lets first take a quick moment to recap on how an environment is typically being composed. On a machine the environment is made out of variables linking to different target folders containing executable or other resource files. So if you execute a command it is either referenced from your PATH, PYTHON_LIBRARY, or any other defined variable. These variables link to files in directories like /usr/bin, /usr/local/bin or any other referenced location. They are called hard links or absolute reference as they start from root /.

Environments using hard links are not easily transportable as they make strict assumption about the the overall execution engine (your OS for example) they are being used in. Therefor it is necessary to use relative links in transportable/relocatable environment.

This is especially true for virtualenv as it creates hard links by default. By making the virtualenv relocatable it can be used in a application by referencing it from the application root . (current dir) instead of the overall root /.

Creating our relocatable environment:

Next we select the environment and install the required packages (ntlk + numpy in this case):

This also works for different python version 3.x or 2.x!

Zip it and Ship it!

Now that we have our relocatable environment all set we are able to package it and ship it as part of our sample PySpark job.

Making this available in during the execution of our application in a YARN container we have for one distribute the package and for second change the default environment of Spark for python to your location.

The variable controlling the python environment for python applications in Spark is named PYSPARK_PYTHON.

Our virtual environment is linked by NLTK that is why the path in PYSPARK_PYTHON is pointing to ./NLTK/content/of/zip/... . The exact command being executed during container creation is something like this:

Shipping additional resources with an application is controlled by the --files and --archives options as shown here.

The options being used here are documented in Spark Yarn Configuration and Spark Environment Variables for reference.

Packaging tokenizer and taggers

Doing just the above will unfortunately fail, because using the NLTK parser in the way we are using it in the example program has some additional dependencies. If you have followed the above steps executing submitting it to your YARN cluster will result in the following exception at container level:

The problem is that NLTK expects the follwoing resource in the  tokenizers/punkt/english.pickle to be available in either of the following locations:

The good thing about this is, that we by now should now how we can ship the required dependency to our application. We can do it the same way we did with our python environment.

Alternatively you can use or set an environment variable to control the path for resources.

For NLTK you can use the environment variable NLTK_DATA to control the path. Setting this in Spark can be done similar to the way we set PYSPARK_PYTHON:

Additionally YARN exposes the container path via the environment variable PWD. This can be used in NLTK as to add it to the search path as follows:

Again it is imporant to reensure yourself how the resource is going to be referenced. NLTK expects it by default in the current location under tokenizers/punkt/english.pickle that is why we navigate into the folder for packaging and reference the zip file wiht tokenizer.zip#tokenizer.

At a later point our program will expect a tagger in the same fashion already demonstrated in the above snippet.

The submission of your application becomes now:

The expected result should be something like the following:

And:

Further Readings

7 thoughts on “Running PySpark with Virtualenv”

  1. Thanks for the nice article!
    One remark: I stumbled at “hard links”. That term already exists, and means something entirely different, so I’d advise to remove it from this context. Cheers!

    1. Felix, thanks for taking the time to comment. You are absolutely right, “hard link” is really not used properly here. I’ll try to fix it.
      Thanks again

  2. Hi ! First thanks for the article. I have a little problem though.

    When I try to launch the script with this command:

    PYSPARK_PYTHON=~/benchmark_env/bin/python3 spark-submit –conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=~/benchmark_env/bin/python3 –master yarn –archives benchmark_env.zip –py-files filters.py sparker.py

    Nothing happens.

    But when I launch it without the first PYSPARK_PYTHON I get an import error telling me that one of the package I want to import is not found. I checked if the package not found is installed in the virtualenv and it is.

    Do you know what could be the problem ?

    Thanks a lot

    1. I think ~ in env variables could be an issue.
      Also, the package will be placed within the working directory on an arbitrary node, so at least for the 2nd PYSPARK_PYTHON it would be required to use the current dir ./ !?

      1. Thanks I tried to change it to the current dir but it still fails. 🙁

        How can I know where the python version I give to spark-submit with the PYSPARK_PYTHON variable is stored in the yarn app ? Because it cannot manage to find it when the app is launched

        I tried to launch the app with the following command:

        PYSPARK_PYTHON=./miniconda3/envs/sparkerConda/bin/python spark-submit –conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./miniconda3/envs/sparkerConda/bin/python –master yarn –archives dependencies.zip sparker.py

        I know that the 2nd PYSPARK_PYTHON is wrong but I don’t understand how to fix it.

  3. Thank you for the article. I’ve tried this approach, but one issue I’m having is that the workers can’t find the python executable in the virtualenv. Do you have any suggestions to remedy this? Also, could you explain “Our virtual environment is linked by NLTK” — why do you use ./NLTK/nltk_env… rather than directly using ./nltk_env?

    1. I figured out what you meant and got the workers to find the executable, but now I get:

      error while loading shared libraries: libpython3.4m.so.1.0: cannot open shared object file: No such file or directory

Leave a Reply

Your email address will not be published. Required fields are marked *