If you look at the way TensorFlow distributes it’s calculation across a cluster of processes, you will quickly ask how to schedule resources as part of a training workflow on large scale infrastructure. Many have turned to Spark as a resource manager for TrndorFlow, At the beginning quite a lot of folks have answered this question by wrapping an additional computational framework around TensorFlow, degrading the former to a distribution framework. Examples of such approaches can be found here and here. Both of them turn to Spark, which just like TensorFlow, is a computational distributed framework turning a set of statements into a DAG of execution. While this certainly would works a more straight forward approach would be to turn to a cluster managers like Mesos, Kubernetes, or namely YARN to distribute the workloads of a DeepLearning networking. Such an approach is also the suggested solution you would find in the TensorFlow documentation:
Note: Manually specifying these cluster specifications can be tedious, especially for large clusters. We are working on tools for launching tasks programmatically, e.g. using a cluster manager like Kubernetes. If there are particular cluster managers for which you’d like to see support, please raise a GitHub issue.
Turning to YARN for these kind of workloads admittedly does not come natural for most developers. For most developers YARN is closely linked to MR, Spark and
their like and still not really seen as a general purpose cluster management system. Additionally it is often criticized for it’s tedious API.
some of these yarn APIs are only usable if you happen to know people on the West-Coast in person
— fs111 (@fs111) January 20, 2017
//platform.twitter.com/widgets.js
But YARN has been successfully used for various data applications all coexisting on the a shared infrastructure together with a vast amount of data. With TensorFlow on YARN you can start to leverage deep learning on top of your already existing data and workflows. Several on going efforts to support TensorFlow with YARN document the demand for a consolidated solution within the community.
YARN itself is undergoing some active development shaping it’s future as part of Hadoop 3.0. Noteworthy in the context of TensorFlow is the support for Docker containers and a simplified YARN API (Hadoop/YARN Assembly). In a recently published blog post here, Wangda Tan and Vinod Kumar Vavilapalli give a climbs of what TensorFlow on YARN will look like with these up coming changes to YARN.
This post uses Apache Slider, which will likely be discontinued in the future in favour of YARN Assembly, on top of a stable HDP distribution for training of a simple TensorFlow example. Part of this is available as of a Slider packages contributed through SLIDER-xxxx. The aim here is not to train a sophisticated model, but to demonstrate the feasibility of distributing TensorFlow on YARN today.
Resources needed for a distributed training workflow are a Parameters Services (PS) and Worker (worker). Additionally there is one special worker, necessary for example to execute all boot scraping steps required by all works just once, the Chiefworker (chiefworker). That is why the resource configuration for Slider will look similar like this:
{ "schema" : "http://example.org/specification/v2.0.0", "metadata" : { }, "global" : { }, "components": { "slider-appmaster": { }, "ps": { "yarn.role.priority": "1", "yarn.component.instances": "1", "yarn.memory": "1024", "yarn.vcores": "1", "yarn.container.failure.threshold": "0" }, "chiefworker": { "yarn.role.priority": "2", "yarn.component.instances": "1", "yarn.memory": "1024", "yarn.vcores": "1", "yarn.container.failure.threshold": "0" }, "worker": { "yarn.role.priority": "3", "yarn.component.instances": "2", "yarn.memory": "1024", "yarn.vcores": "1", "yarn.container.failure.threshold": "0" } } }
Distributing TensorFlow requires each process participating in the training to have a complete specification of the cluster available. With YARN we will not know at the beginning on which host or port each of the processes will run. A key aspect of the distribution therefor will be to wait until all the resources have been allocated in the cluster.
while num_launched < num_allocated: print format("Waiting for all ports({num_launched}/{num_allocated}) to be exported") time.sleep(5) ps_list, worker_list = functions.get_launched_instances() num_launched = len(ps_list) + len(worker_list)
Slider will help us with that as the allocated resources for each service can be queried via a REST API. In the above snipped, which will result in waiting for the complete allocation of containers, the function get_launched_instances() fetches the launched instances of parameters services and workers with three simple REST calls:
$http_base + '/ws/v1/slider/publisher/exports/ps’ $http_base + '/ws/v1/slider/publisher/exports/chiefworker’ $http_base + '/ws/v1/slider/publisher/exports/worker’
Once all the resources are available we can continue like with any other distributed scenario with TensorFlow. On all hosts the same script will be executed given the list of workers, a list of parameter servers, the index, and run mode.
# Build clusterSpec and command daemon_cmd = format("python {app_root}/{user_scripts_entry} " "--ps_hosts={ps_hosts} --worker_hosts={worker_hosts} --job_name={job_name} --task_index={task_index} " "--work_dir={app_root}") Execute(daemon_cmd)
Which training script we are executing is an application configuration parameter making it possible to distribute different kinds of training with the same Slider skeleton. The script is specified in appConfig.json as
"site.global.user.scripts.entry": "mnist1.py"
Running workflows like this on a shared infrastructure often require individual users to control the complete environment eg. version of TensorFlow in which they perform the training. In the above scenario the same TensorFlow package needs to be installed on all hosts. There are a couple of solution to deal with the demand to support control of the execution environment. One solution could be to distribute the complete environment as part of the distribution of the workload, which could here for example be achieved by using python virtual env as the environment. A often preferred solution to this would be the use of a container environment like Docker to run the script inside this environment. To use Docker here simply replace the daemon_cmd provided here with a proper docker run execution.
TensorFlow executes reportedly much faster when run on GPU cores. YARN offers labeling to support a scenario where for example the workers will be executed on hosts with GPUs, while the parameter servers get scheduled on none GPU host. Labeling the hosts with GPU as ‘gpu’ and adding “yarn.label.expression”:”gpu” to the resource description of the workers in resoources.json would ensure that the workers are scheduled on GPU hosts, while the other processes would be executed wherever available resources are there.