written by Eric J. Ma on 2018-10-26

I learned a new thing about dask yesterday: pre-scattering data properly!

Turns out, you can pre-scatter your data across worker nodes, and have them access that data later when submitting functions to the scheduler.

## How-To

To do so, we first call on client.scatter, pass in the data that I want to scatter across all nodes, ensure that broadcasting is turned on (if and only if I am sure that all worker nodes will need it), and finally assign it to a new variable.

from dask_jobqueue import SGECluster

cluster = SGECluster(...)  # put parameters in there.
client = Client(cluster)


One key thing to remember here is to assign the result of client.scatter to a variable. This becomes a pointer that you pass into other functions that are submitted via the client.submit interface. Because this point is not immediately clear from the client.scatter docs, I put in a pull request (PR) to provide some just-in-time documentation, which just got merged this morning. By the way, not every PR has to be code - documentation help is always good!

Once we've scattered the data across our worker nodes and obtained a pointer for the scattered data, we can parallel submit our function across worker nodes.

Let's say we have a function, called func, that takes in the data variable and returns a number. The key characteristic of this function is that it takes anywhere from a few seconds to minutes to run, but I need it run many times (think hundreds to thousands of times).

In serial, I would usually do this as a list comprehension:

results = [func(data) for i in range(200)]


If done in parallel, I can now use the client object to submit the function across all worker nodes. For clarity, let me switch to a for-loop instead:

results = []
for i in range(200):
results.append(client.submit(func, data_future))
results = client.gather(results)


Because the client does not have to worry about sending the large data object across the network of cluster nodes, it is very fast to submit the functions to the scheduler, which then dispatches it to the worker nodes, which all know where data_future is on their own "virtual cluster" memory.

1. You need to know how much memory my data requires, and have to request for at least that amount of memory first per worker node at the the SGECluster instantiation step.