Eric J Ma's Website

More Dask: Pre-Scattering Data

written by Eric J. Ma on 2018-10-26 | tags: dask parallel data science optimization gridengine


I learned a new thing about dask yesterday: pre-scattering data properly!

Turns out, you can pre-scatter your data across worker nodes, and have them access that data later when submitting functions to the scheduler.

How-To

To do so, we first call on client.scatter, pass in the data that I want to scatter across all nodes, ensure that broadcasting is turned on (if and only if I am sure that all worker nodes will need it), and finally assign it to a new variable.

from dask_jobqueue import SGECluster
from dask.distributed import Client

cluster = SGECluster(...)  # put parameters in there.
client = Client(cluster)
data_future = client.scatter(data, broadcast=True)

One key thing to remember here is to assign the result of client.scatter to a variable. This becomes a pointer that you pass into other functions that are submitted via the client.submit interface. Because this point is not immediately clear from the client.scatter docs, I put in a pull request (PR) to provide some just-in-time documentation, which just got merged this morning. By the way, not every PR has to be code - documentation help is always good!

Once we've scattered the data across our worker nodes and obtained a pointer for the scattered data, we can parallel submit our function across worker nodes.

Let's say we have a function, called func, that takes in the data variable and returns a number. The key characteristic of this function is that it takes anywhere from a few seconds to minutes to run, but I need it run many times (think hundreds to thousands of times).

In serial, I would usually do this as a list comprehension:

results = [func(data) for i in range(200)]

If done in parallel, I can now use the client object to submit the function across all worker nodes. For clarity, let me switch to a for-loop instead:

results = []
for i in range(200):
    results.append(client.submit(func, data_future))
results = client.gather(results)

Because the client does not have to worry about sending the large data object across the network of cluster nodes, it is very fast to submit the functions to the scheduler, which then dispatches it to the worker nodes, which all know where data_future is on their own "virtual cluster" memory.

Advantages

By pre-scattering, we invest a bit of time pre-allocating memory on worker nodes to hold data that are relatively expensive to transfer. This time investment reaps dividends later when we are working with functions that operate on the data.

Cautions

Not really disadvantages (as I can't think of any), just some things to note:

  1. You need to know how much memory my data requires, and have to request for at least that amount of memory first per worker node at the the SGECluster instantiation step.
  2. Pre-scattering sometimes takes a bit of time, but I have not seen it take as much time as having the scheduler handle everything.

Acknowledgments

Special thanks goes to Matt Rocklin, who answered my question on StackOverflow, which in turn inspired this blog post.


Cite this blog post:
@article{
    ericmjl-2018-more-data,
    author = {Eric J. Ma},
    title = {More Dask: Pre-Scattering Data},
    year = {2018},
    month = {10},
    day = {26},
    howpublished = {\url{https://ericmjl.github.io}},
    journal = {Eric J. Ma's Blog},
    url = {https://ericmjl.github.io/blog/2018/10/26/more-dask-pre-scattering-data},
}
  

I send out a newsletter with tips and tools for data scientists. Come check it out at Substack.

If you would like to sponsor the coffee that goes into making my posts, please consider GitHub Sponsors!

Finally, I do free 30-minute GenAI strategy calls for teams that are looking to leverage GenAI for maximum impact. Consider booking a call on Calendly if you're interested!