I learned a new thing about
dask yesterday: pre-scattering data properly!
Turns out, you can pre-scatter your data across worker nodes, and have them access that data later when submitting functions to the scheduler.
To do so, we first call on
client.scatter, pass in the data that I want to scatter across all nodes, ensure that broadcasting is turned on (if and only if I am sure that all worker nodes will need it), and finally assign it to a new variable.
from dask_jobqueue import SGECluster from dask.distributed import Client cluster = SGECluster(...) # put parameters in there. client = Client(cluster) data_future = client.scatter(data, broadcast=True)
One key thing to remember here is to assign the result of
client.scatter to a variable. This becomes a pointer that you pass into other functions that are submitted via the
client.submit interface. Because this point is not immediately clear from the
client.scatter docs, I put in a pull request (PR) to provide some just-in-time documentation, which just got merged this morning. By the way, not every PR has to be code - documentation help is always good!
Once we've scattered the data across our worker nodes and obtained a pointer for the scattered data, we can parallel submit our function across worker nodes.
Let's say we have a function, called
func, that takes in the
data variable and returns a number. The key characteristic of this function is that it takes anywhere from a few seconds to minutes to run, but I need it run many times (think hundreds to thousands of times).
In serial, I would usually do this as a list comprehension:
results = [func(data) for i in range(200)]
If done in parallel, I can now use the
client object to submit the function across all worker nodes. For clarity, let me switch to a
results =  for i in range(200): results.append(client.submit(func, data_future)) results = client.gather(results)
client does not have to worry about sending the large
data object across the network of cluster nodes, it is very fast to submit the functions to the scheduler, which then dispatches it to the worker nodes, which all know where
data_future is on their own "virtual cluster" memory.
By pre-scattering, we invest a bit of time pre-allocating memory on worker nodes to hold data that are relatively expensive to transfer. This time investment reaps dividends later when we are working with functions that operate on the data.
Not really disadvantages (as I can't think of any), just some things to note: