written by Eric J. Ma on 2018-10-26 | tags: dask parallel data science optimization gridengine
I learned a new thing about dask
yesterday: pre-scattering data properly!
Turns out, you can pre-scatter your data across worker nodes, and have them access that data later when submitting functions to the scheduler.
To do so, we first call on client.scatter
, pass in the data that I want to scatter across all nodes, ensure that broadcasting is turned on (if and only if I am sure that all worker nodes will need it), and finally assign it to a new variable.
from dask_jobqueue import SGECluster from dask.distributed import Client cluster = SGECluster(...) # put parameters in there. client = Client(cluster) data_future = client.scatter(data, broadcast=True)
One key thing to remember here is to assign the result of client.scatter
to a variable. This becomes a pointer that you pass into other functions that are submitted via the client.submit
interface. Because this point is not immediately clear from the client.scatter
docs, I put in a pull request (PR) to provide some just-in-time documentation, which just got merged this morning. By the way, not every PR has to be code - documentation help is always good!
Once we've scattered the data across our worker nodes and obtained a pointer for the scattered data, we can parallel submit our function across worker nodes.
Let's say we have a function, called func
, that takes in the data
variable and returns a number. The key characteristic of this function is that it takes anywhere from a few seconds to minutes to run, but I need it run many times (think hundreds to thousands of times).
In serial, I would usually do this as a list comprehension:
results = [func(data) for i in range(200)]
If done in parallel, I can now use the client
object to submit the function across all worker nodes. For clarity, let me switch to a for-loop
instead:
results = [] for i in range(200): results.append(client.submit(func, data_future)) results = client.gather(results)
Because the client
does not have to worry about sending the large data
object across the network of cluster nodes, it is very fast to submit the functions to the scheduler, which then dispatches it to the worker nodes, which all know where data_future
is on their own "virtual cluster" memory.
By pre-scattering, we invest a bit of time pre-allocating memory on worker nodes to hold data that are relatively expensive to transfer. This time investment reaps dividends later when we are working with functions that operate on the data.
Not really disadvantages (as I can't think of any), just some things to note:
SGECluster
instantiation step.Special thanks goes to Matt Rocklin, who answered my question on StackOverflow, which in turn inspired this blog post.
@article{
ericmjl-2018-more-data,
author = {Eric J. Ma},
title = {More Dask: Pre-Scattering Data},
year = {2018},
month = {10},
day = {26},
howpublished = {\url{https://ericmjl.github.io}},
journal = {Eric J. Ma's Blog},
url = {https://ericmjl.github.io/blog/2018/10/26/more-dask-pre-scattering-data},
}
I send out a newsletter with tips and tools for data scientists. Come check it out at Substack.
If you would like to sponsor the coffee that goes into making my posts, please consider GitHub Sponsors!
Finally, I do free 30-minute GenAI strategy calls for teams that are looking to leverage GenAI for maximum impact. Consider booking a call on Calendly if you're interested!