# Parallel Processing with Dask on GridEngine Clusters

written by on 2018-10-11 | tags: parallel dask gridengine data science optimization

I recently just figured out how to get this working... and it's awesome! :D

## Motivation

If I'm developing an analysis in the Jupyter notebook, and I have one semi-long-running function (e.g. takes dozens of seconds) that I need to run over tens to hundreds of thousands of similar inputs, it'll take ages for this to complete in serial. For a sense of scale, a function that takes ~20 seconds per call run serially over 10,000 similar inputs would take 200,000 seconds, which is 2 days of run-time (not including any other overhead). That's not feasible for interactive exploration of data. If I could somehow parallelize just the function over 500 compute nodes, we could take the time down to 7 minutes.

GridEngine-based compute clusters are one of many options for parallelizing work. During grad school at MIT, and at work at Novartis, the primary compute cluster environment that I've encountered has been GridEngine-based. However, because they are designed for batch jobs, as a computational scientist, we have to jump out of whatever development environment we're currently using, and move to custom scripts.

In order to do parallelism with traditional GridEngine systems, I would have to jump out of the notebook and start writing job submission scripts, which disrupts my workflow. I would be disrupting my thought process, and lose the interactivity that I might need to prototype my work faster.

dask, alongside dask-jobqueue enables computational scientists like myself to take advantage of existing GridEngine setups to do interactive, parallel work. As long as I have a Jupyter notebook server running on a GridEngine-connected compute node, I can submit functions to the GridEngine cluster and collect back those results to do further processing, in a fraction of the time that it would take, thus enabling me to do my science faster than if I did everything single core/single node.

In this blog post, I'd like to share an annotated, minimal setup for using Dask on a GridEngine cluster. (Because we use Dask, more complicated pipelines are possible as well - I would encourage you to read the Dask docs for more complex examples.) I will assume that you are working in a Jupyter notebook environment, and that the notebook you are working out of is hosted on a GridEngine-connected compute node, from which you are able to qsub tasks. Don't worry, you won't be qsub-ing anything though!

## Setup

To start, we need a cell that houses the following code block:

from dask_jobqueue import SGECluster

cluster = SGECluster(queue='default.q',
walltime="1500000",
processes=1,
memory='1GB',
cores=1,
env_extra=['source /path/to/custom/script.sh',
'export ENV_VARIABLE="SOMETHING"']
)


Here, we are instantiating an SGECluster object under the variable name cluster. What cluster stores is essentially a configuration for a block of worker nodes that you will be requesting. Under the hood, what dask-jobqueue is doing is submitting jobs to the GridEngine scheduler, which will block off a specified amount of compute resources (e.g. number of cores, amount of RAM, whether you want GPUs or not, etc.) for a pre-specified amount of time, on which Dask then starts a worker process to communicate with the head process coordinating tasks amongst workers.

As such, you do need to know two pieces of information:

1. queue: The queue that jobs are to be submitted to. Usually, it is named something like default.q, but you will need to obtain this through GridEngine. If you have the ability to view all jobs that are running, you can call qstat at the command line to see what queues are being used. Otherwise, you might have to ping your system administrator for this information.
2. walltime: You will also need to pre-estimate the wall clock time, in seconds, that you want the worker node to be alive for. It should be significantly longer than the expected time you think you will need, so that your function call doesn't timeout unexpectedly. I have defaulted to 1.5 million seconds, which is about 18 days of continual runtime. In practice, I usually kill those worker processes after just a few hours.

Besides that, you also need to specify the resources that you need per worker process. In my example above, I'm asking for each worker process to use only 1GB of RAM, 1 core, and to use only 1 process per worker (i.e. no multiprocessing, I think).

Finally, I can also specify extra environment setups that I will need. Because each worker process is a new process that has no knowledge of the parent process' environment, you might have to source some bash script, or activate a Python environment, or export some environment variable. This can be done under the env_extra keyword, which accepts a list of strings.

## Request for worker compute "nodes"

I put "nodes" in quotation marks, because they are effectively logical nodes, rather than actual compute nodes. (Technically, I think a compute node minimally means one physical hardware unit with CPUs and RAM).

In order to request for worker nodes to run your jobs, you need the next line of code:

cluster.scale(500)


With this line, under the hood, dask-jobqueue will start submitting 500 jobs, each requesting 1GB of RAM and 1 core, populating my compute environment according to the instructions I provided under env_extra.

At the end of this, I effectively have a 500-node cluster on the larger GridEngine cluster (let's call this a "virtual cluster"), each with 1GB of RAM and 1 core available to it, on which I can submit functions to run.

## Start a client process

In order to submit jobs to my virtual cluster, I have to instantiate a client that is connected to the cluster, and is responsible for sending functions there.

from dask.distributed import Client

client = Client(cluster)


## Compute!

With this setup complete (I have it stored as a TextExpander snippets), we can now start submitting functions to the virtual cluster!

To simulate this, let's define a square-rooting function that takes 2-3 seconds to run each time it is called, and returns the square of its inputs. This simulates a function call that is computationally semi-expensive to run a few times, but because call on this hundreds of thousands of time, the total running time to run it serially would be too much.

from time import sleep
from math import sqrt
from random import random
def slow_sqrt(x):
"""
Simulates the run time needed for a semi-expensive function call.
"""
assert x > 0
# define sleeping time in seconds, between 2-3 seconds.
t = 2 + random()
sleep(t)

return sqrt(x)


### Serial Execution

In a naive, serial setting, we would call on the function in a for-loop:

results = []
for i in range(10000):
results.append(slow_sqrt(i))


This would take us anywhere between 20,000 to 30,000 seconds (approximately 8 hours, basically).

### Parallel Execution

In order to execute this in parallel instead, we could do one of the following three ways:

#### map

sq_roots = client.map(slow_sqrt, range(10000))
sq_roots = client.gather(sq_roots)


#### for-loop

sq_roots = []
for i in range(10000):
sq_roots.append(client.submit(slow_sqrt, i, restart=20))  # submit the function as first argument, then rest of arguments
sq_roots = client.gather(sq_roots)


#### delayed

sq_roots = []
for i in range(10000):
sq_roots.append(delayed(slow_sqrt)(i))
sq_roots = compute(*sq_roots)


I have some comments on each of the three methods, each of which I have used.

First off, each of them do require us to change the code that we would have written in serial. This little bit of overhead is the only tradeoff we really need to make in order to gain parallelism.

In terms of readability, all of them are quite readable, though in my case, I tend to favour the for-loop with client.submit. Here is why.

For readability, the for-loop explicitly indicates that we are looping over something. It's probably more easy for novices to approach my code that way.

For debuggability, client.submit returns a Futures object (same goes for client.map). A "Futures" object might be confusing at first, so let me start by demystifying that. A Futures object promises that the result that is computed from slow_sqrt will exist, and actually contains a ton of diagnostic information, including the type of the object (which can be useful for diagnosing whether my function actually ran correctly). In addition to that, I can call on Futures.result() to inspect the actual result (in this case, sq_roots[0].result()). This is good for debugging the function call, in case there are issues when scaling up. (At work, I was pinging a database in parallel, and sometimes the ping would fail; debugging led me to include some failsafe code, including retries and sleeps with random lengths to stagger out database calls.)

Finally, the Futures interface is non-blocking on my Jupyter notebook session. Once I've submitted the jobs, I can continue with other development work in my notebook in later cells, and check back when the Dask dashboard indicates that the jobs are done.

That said, I like the delayed interface as well. Once I was done debugging and confident that my own data pipeline at work wouldn't encounter the failure modes I was seeing, I switched over to the delayed interface and scaled up my analysis. I was willing to trade in the interactivity using the Futures interface for the automation provided by the delayed interface. (I also first used Dask on a single node through the delayed interface as well).

Of course, there's something also to be said for the simplicity of two lines of code for parallelism (with the client.map example).

The final line in each of the code blocks allows us to "gather" the results back into my coordinator node's memory, thus completing the function call and giving us the result we needed.

## Conclusions

That concludes it! The two key ideas illustrated in this blog post were:

1. To set up a virtual cluster on a GridEngine system, we essentially harness the existing job submission system to generate workers that listen for tasks.
2. A useful programming pattern is to submit functions using the client object using client.submit(func, *args, **kwargs). This requires minimal changes from serial code.

## Practical Tips

Here's some tips for doing parallel processing, which I've learned over the years.

Firstly, never prematurely parallelize. It's as bad as prematurely optimizing code. If your code is running slowly, check first to make sure that there aren't algorithmic complexity issues, or bandwidths being clogged up (e.g. I/O bound). As the Dask docs state, it is easier to achieve those gains first before doing parallelization.

Secondly, when developing parallel workflows, make sure to test the pipeline on subsets of input data first, and slowly scale up. It is during this period that you can also profile memory usage to check to see if you need to request for more RAM per worker.

Thirdly, for GridEngine clusters, it is usually easier to request for many small worker nodes that consume few cores and small amounts of RAM. If your job is trivially parallelizable, this may be a good thing.

Fourthly, it's useful to have realistic expectations on the kinds of speed-ups you can expect to gain. At work, through some ad-hoc profiling, I quickly came to the realization that concurrent database pings were the most likely bottleneck in my code's speed, and that nothing apart from increasing the number of concurrent database pings allowed would make my parallel code go faster.

Finally, on a shared cluster, be respectful of others' usage. Don't request for unreasonable amounts of compute time. And when you're confirmed done with your analysis work, remember to shut down the virtual cluster! :)